program_engine: real AND/OR condition evaluator
Some checks are pending
Validate / HACS validation (push) Waiting to run
Validate / Hassfest (push) Waiting to run

StateEvaluator decodes AND/OR records against MockState. ProgramEngine
.use_state_evaluator() installs one bound to the engine's panel + clock
+ location. Replaces the stub that always-passes-AND-always-fails-OR.

Traditional (OP=0) decode follows clsConditionLine.Cond synthesis
(clsConditionLine.cs:17-33): disk byte 1 (= and_family) carries the
compact GetConditionalText family byte, disk bytes 3-4 (and_instance
from cond2>>8) carry the object index. Family decoding mirrors
clsText.GetConditionalText (clsText.cs:2224-2274):

  family & 0xFC == 0x00 → OTHER  (low 4 bits = MiscConditional)
  family & 0xFC == 0x04 → ZONE   (bit 0x02 = NOT_READY, else SECURE)
  family & 0xFC == 0x08 → CTRL   (bit 0x02 = ON,         else OFF)
  family & 0xFC == 0x0C → TIME   (no MockState model → False)
  family >= 0x10        → SEC    (high nibble = mode, low = area)

Structured (OP > 0) decode uses Arg1 OP Arg2 with both sides resolved
via _resolve_arg(argtype, ix, field). MockState-backed resolution:

  ZONE        → loop / current_state / arming_state / latched_state
  UNIT        → on/off byte / time_remaining / dim level
  THERMOSTAT  → temp / setpoints / modes / humidity
  AREA        → mode
  TIME_DATE   → (clock-derived) year / month / day / DoW / hour /
                minute / time-of-day-in-minutes

CondOP supported: EQ / NE / LT / GT / ODD / EVEN / MULTIPLE / IN /
NOT_IN. Unknown argtypes or fields raise _UnsupportedCondition
internally — the evaluator swallows it and returns False, keeping the
chain *guarded* rather than firing too eagerly.

LIGHT/DARK MiscConditional uses astral via the engine's PanelLocation
when set. When location is missing, returns False either way (don't
fire if we can't determine).

15 new tests covering each evaluator branch (Traditional ZONE secure/
not-ready/undefined, CTRL on/off/dimmed, SEC mode-match, OTHER NEVER/
DARK; Structured Zone.CurrentState EQ/NE, Thermostat.Temp GT/LT,
TimeDate.Hour/DOW EQ, TimeDate-without-clock) plus end-to-end engine
integration showing use_state_evaluator() gates a real WHEN+AND chain
and the OR-alternative path works against real state.

Full suite: 581 passed, 1 skipped (up from 563, 82 engine tests total).
This commit is contained in:
Ryan Malloy 2026-05-14 02:39:41 -06:00
parent 16655da34c
commit d4c4e530f6
2 changed files with 715 additions and 3 deletions

View File

@ -48,7 +48,16 @@ from dataclasses import dataclass, field
from datetime import date, datetime, time, timedelta, timezone from datetime import date, datetime, time, timedelta, timezone
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from .programs import Days, Program, ProgramType, TimeKind from .programs import (
CondArgType,
CondOP,
Days,
MiscConditional,
Program,
ProgramCond,
ProgramType,
TimeKind,
)
_log = logging.getLogger(__name__) _log = logging.getLogger(__name__)
@ -605,6 +614,357 @@ def evaluate_conditions(
return False return False
# --------------------------------------------------------------------------
# State-aware AND/OR evaluator
# --------------------------------------------------------------------------
class _UnsupportedCondition(Exception):
"""Raised internally when an AND/OR record encodes a check we don't
yet evaluate. The caller (StateEvaluator) treats this as False
"we can't prove this condition", so the chain stays guarded and
logs once so the next semantic-decode pass has a punch list.
"""
class StateEvaluator:
"""Decode AND/OR records against a :class:`MockState` snapshot.
Each AND/OR :class:`Program` record encodes either:
* **Traditional** (``and_op == 0``): a compact bit-packed condition
in the ``cond`` u16 selecting a family (ProgramCond: ZONE / CTRL /
TIME / SEC / OTHER) with an inline operand. Decoded per
``clsText.GetConditionalText`` (clsText.cs:2224-2274).
* **Structured** (``and_op > 0``): a ``Arg1 OP Arg2`` triple where
Arg1 and Arg2 are typed references (Zone / Unit / Thermostat /
TimeDate / Constant / ) plus per-type field selectors. The
operator is one of :class:`CondOP` (EQ / NE / LT / GT / ).
Coverage in this initial cut:
Traditional:
* ZONE family zone secure / not-ready against MockZoneState
* CTRL family unit on / off against MockUnitState
* SEC family area in security-mode against MockAreaState
* OTHER family: ``NEVER`` (always false), ``LIGHT``/``DARK``
approximated via the engine's sun events when location is set,
``AC_POWER_OFF``/``ON`` (no model return False),
``BATTERY_LOW``/``OK`` (no model return False)
* TIME family (time-clock enabled/disabled) no MockState
slot for time-clock toggles, returns False
Structured:
* Zone.CurrentState / ArmingState (== const)
* Unit.CurrentState / Level (== / >= / <= const)
* Thermostat.CurrentTemp / Humidity / setpoints (numeric compare)
* TimeDate.Month / Day / DayOfWeek / Hour / Minute (numeric compare)
Anything else returns ``False`` and logs at DEBUG once per
condition class keeps a chain *guarded* rather than fired-too-
eagerly when we can't yet decode the predicate.
Time-related comparisons need a :class:`Clock`; pass one to honor
Hour/Minute/DayOfWeek predicates correctly. Without a clock those
return False.
"""
def __init__(
self,
state,
*,
clock: Clock | None = None,
location: PanelLocation | None = None,
) -> None:
self._state = state
self._clock = clock
self._location = location
def __call__(self, condition: Program) -> bool:
"""Evaluate one AND/OR record against the bound MockState.
Treated as a plain predicate so callers can pass this instance
directly to ``ProgramEngine.set_condition_evaluator``.
"""
try:
if condition.and_op == CondOP.ARG1_TRADITIONAL:
return self._eval_traditional(condition)
return self._eval_structured(condition)
except _UnsupportedCondition as err:
_log.debug("evaluator: unsupported condition: %s", err)
return False
except Exception:
_log.exception("evaluator: condition evaluation crashed")
return False
# ---- Traditional ------------------------------------------------------
def _eval_traditional(self, c: Program) -> bool:
"""Decode a Traditional AND record.
Per ``clsConditionLine.Cond`` (clsConditionLine.cs:17-33), the
compact-form cond is split across two AND-record slots:
* disk byte 1 (= ``and_family``) carries the compact's high byte
(the family + selector encoding from GetConditionalText)
* disk bytes 3-4 (= ``and_arg1_ix``, ``and_instance`` derived
from ``cond2 >> 8``) carry the compact's low byte (the
object index, shifted into the high half)
Family decoding mirrors clsText.GetConditionalText (clsText.cs:
2224-2274):
family & 0xFC == 0x00 OTHER (low 4 bits = MiscConditional)
family & 0xFC == 0x04 ZONE (bit 0x02 = NOT_READY, else SECURE)
family & 0xFC == 0x08 CTRL (bit 0x02 = ON, else OFF)
family & 0xFC == 0x0C TIME (bit 0x02 = ENABLED, else DIS.)
family >= 0x10 SEC (high nibble = mode, low = area)
"""
family = c.and_family
instance = c.and_instance
family_major = family & 0xFC
secondary = bool(family & 0x02) # selector bit within the family
if family_major == 0x00:
return self._eval_other(family & 0x0F)
if family_major == ProgramCond.ZONE:
return self._eval_traditional_zone(instance, want_not_ready=secondary)
if family_major == ProgramCond.CTRL:
return self._eval_traditional_ctrl(instance, want_on=secondary)
if family_major == ProgramCond.TIME:
# Time-clock enabled / disabled — MockState doesn't model
# the enable bit, so we conservatively report disabled.
return False
# 0x10 and above: SEC family — high nibble = mode, low nibble = area.
return self._eval_traditional_sec(
area=family & 0x0F, mode=(family >> 4) & 0x07,
)
def _eval_traditional_zone(self, zone_num: int, *, want_not_ready: bool) -> bool:
"""SECURE matches ``current_state == 0``; NOT_READY matches any
nonzero current_state (per the panel's display semantics)."""
zone = self._state.zones.get(zone_num)
if zone is None:
# Undefined zone reads as SECURE (matches real-panel behaviour
# when a programmed zone slot doesn't exist).
return not want_not_ready
if want_not_ready:
return zone.current_state != 0
return zone.current_state == 0
def _eval_traditional_ctrl(self, unit_num: int, *, want_on: bool) -> bool:
"""ON matches any nonzero ``state``; OFF matches ``state == 0``.
MockUnitState.state encodes 0=off, 1=on, 100..200=dim level
all nonzero values count as "on" for this predicate, which
matches the panel's binary on/off display.
"""
unit = self._state.units.get(unit_num)
if unit is None:
return not want_on # missing unit reads as OFF
on = unit.state != 0
return on == want_on
def _eval_traditional_sec(self, *, area: int, mode: int) -> bool:
"""Area in security-mode N. ``area == 0`` means "any area in
this mode" per GetConditionalText:2262 — without a multi-area
model we approximate by checking area 1."""
if area == 0:
area = 1
a = self._state.areas.get(area)
if a is None:
return False
return a.mode == mode
def _eval_other(self, misc_code: int) -> bool:
"""OTHER family: low 4 bits = enuMiscConditional."""
try:
cat = MiscConditional(misc_code)
except ValueError:
raise _UnsupportedCondition(f"unknown misc condition {misc_code}")
if cat == MiscConditional.NONE:
return True
if cat == MiscConditional.NEVER:
return False
if cat in (MiscConditional.LIGHT, MiscConditional.DARK):
light = self._is_light_outside()
if light is None:
# Can't determine — be conservative, don't fire either way.
return False
return light if cat == MiscConditional.LIGHT else not light
# PHONE_*, AC_POWER_*, BATTERY_*, ENERGY_COST_* — no MockState
# model for any of these yet. Conservatively False.
return False
def _is_light_outside(self) -> bool | None:
"""Approximate "is the sun up" against the engine's PanelLocation
+ clock. Returns None when clock or location is missing
(caller decides what to do with the indeterminate result)."""
if self._clock is None or self._location is None:
return None
try:
now = self._clock.now()
sunrise, sunset = _sun_events(self._location, now.date())
except Exception:
return None
return sunrise <= now <= sunset
# ---- Structured -------------------------------------------------------
def _eval_structured(self, c: Program) -> bool:
"""Evaluate ``Arg1 OP Arg2`` style conditions.
Resolves Arg1 and Arg2 to numeric values via :meth:`_resolve_arg`
then compares with the operator. Comparison operators are
straightforward integer math; AND/OR/XOR at this layer are
treated as bitwise reductions on the resolved values (matches
the C# operator semantics for those uncommon op codes).
"""
op = c.and_op
arg1 = self._resolve_arg(
c.and_arg1_argtype, c.and_arg1_ix, c.and_arg1_field,
)
arg2 = self._resolve_arg(
c.and_arg2_argtype, c.and_arg2_ix, c.and_arg2_field,
)
if arg1 is None or arg2 is None:
return False
if op == CondOP.ARG1_EQ_ARG2:
return arg1 == arg2
if op == CondOP.ARG1_NE_ARG2:
return arg1 != arg2
if op == CondOP.ARG1_LT_ARG2:
return arg1 < arg2
if op == CondOP.ARG1_GT_ARG2:
return arg1 > arg2
if op == CondOP.ARG1_ODD:
return (arg1 & 1) == 1
if op == CondOP.ARG1_EVEN:
return (arg1 & 1) == 0
# MULTIPLE / IN / NOT_IN are bitfield checks the panel uses for
# day-of-week and area-set tests. arg2 is the bitmask.
if op == CondOP.ARG1_MULTIPLE_ARG2:
return arg2 != 0 and (arg1 % arg2) == 0
if op == CondOP.ARG1_IN_ARG2:
return bool(arg1 & arg2)
if op == CondOP.ARG1_NOT_IN_ARG2:
return not bool(arg1 & arg2)
raise _UnsupportedCondition(f"unknown structured op {op}")
def _resolve_arg(
self, argtype: int, ix: int, field: int,
) -> int | None:
"""Return the numeric value of one Arg side, or None if it can't
be resolved (unknown type, missing object, missing clock).
"""
if argtype == CondArgType.CONSTANT:
return ix
if argtype == CondArgType.ZONE:
return self._resolve_zone_field(ix, field)
if argtype == CondArgType.UNIT:
return self._resolve_unit_field(ix, field)
if argtype == CondArgType.THERMOSTAT:
return self._resolve_thermostat_field(ix, field)
if argtype == CondArgType.AREA:
return self._resolve_area_field(ix, field)
if argtype == CondArgType.TIME_DATE:
return self._resolve_timedate_field(field)
# USER_SETTING, AUXILLARY, AUDIO, ACCESS_CONTROL, MESSAGE,
# SYSTEM — no MockState models for these. Treat as unresolved
# so the comparison returns False.
raise _UnsupportedCondition(f"unsupported argtype {argtype}")
def _resolve_zone_field(self, ix: int, field: int) -> int | None:
zone = self._state.zones.get(ix)
if zone is None:
return None
# enuZoneField: LoopReading=1, CurrentState=2, ArmingState=3, AlarmState=4
if field == 1:
return zone.loop
if field == 2:
return zone.current_state
if field == 3:
return zone.arming_state
if field == 4:
return zone.latched_state
raise _UnsupportedCondition(f"zone field {field}")
def _resolve_unit_field(self, ix: int, field: int) -> int | None:
unit = self._state.units.get(ix)
if unit is None:
return None
# enuUnitField: CurrentState=1, PreviousState=2, Timer=3, Level=4
if field == 1:
# 0 = off, 1 = on, 100..200 = dim. The panel treats anything
# non-zero as "on" at this granularity.
return 1 if unit.state != 0 else 0
if field == 3:
return unit.time_remaining
if field == 4:
# Level: panel returns 0..100% — derive from state byte.
if unit.state >= 100:
return unit.state - 100
return 100 if unit.state == 1 else 0
raise _UnsupportedCondition(f"unit field {field}")
def _resolve_thermostat_field(self, ix: int, field: int) -> int | None:
t = self._state.thermostats.get(ix)
if t is None:
return None
# enuThermostatField map (subset MockState has data for):
# CurrentTemp=1, HeatSetpt=2, CoolSetpt=3, SystemMode=4, FanMode=5,
# HoldMode=6, Humidity=9, HumidifySetpoint=10, DehumidifySetpoint=11,
# OutdoorTemperature=12
return {
1: t.temperature_raw,
2: t.heat_setpoint_raw,
3: t.cool_setpoint_raw,
4: t.system_mode,
5: t.fan_mode,
6: t.hold_mode,
9: t.humidity_raw,
10: t.humidify_setpoint_raw,
11: t.dehumidify_setpoint_raw,
12: t.outdoor_temperature_raw,
}.get(field, None)
def _resolve_area_field(self, ix: int, field: int) -> int | None:
area = self._state.areas.get(ix)
if area is None:
return None
# No enuAreaField source — be permissive: field 1 = mode.
if field == 1:
return area.mode
raise _UnsupportedCondition(f"area field {field}")
def _resolve_timedate_field(self, field: int) -> int | None:
if self._clock is None:
return None
now = self._clock.now()
# enuTimeDateField: Date=1, Year=2, Month=3, Day=4, DayOfWeek=5,
# Time=6, DST_Flag=7, Hour=8, Minute=9, SunriseSunset=10.
if field == 2:
return now.year
if field == 3:
return now.month
if field == 4:
return now.day
if field == 5:
# DayOfWeek: panel uses 1=Mon..7=Sun per clsHAC. Python
# weekday() returns Mon=0..Sun=6 — add 1.
return now.weekday() + 1
if field == 6:
# Time-of-day encoded as (hour * 60 + minute) — minutes since
# midnight. Matches the C# packing in GetComplexConditionText.
return now.hour * 60 + now.minute
if field == 8:
return now.hour
if field == 9:
return now.minute
# Date / DST_Flag / SunriseSunset — not modelled here.
raise _UnsupportedCondition(f"timedate field {field}")
# -------------------------------------------------------------------------- # --------------------------------------------------------------------------
# Engine # Engine
# -------------------------------------------------------------------------- # --------------------------------------------------------------------------
@ -687,10 +1047,27 @@ class ProgramEngine:
bool. The default returns True for every AND, False for every bool. The default returns True for every AND, False for every
OR (a degenerate evaluator that means "all chains' first AND OR (a degenerate evaluator that means "all chains' first AND
groups always pass" — useful as a smoke-test default, not for groups always pass" — useful as a smoke-test default, not for
real automation). Real callers supply a state-aware evaluator. real automation).
For real automation, call :meth:`use_state_evaluator` instead
(or build your own :class:`StateEvaluator` and pass it here).
""" """
self._condition_evaluator = fn self._condition_evaluator = fn
def use_state_evaluator(self) -> "StateEvaluator":
"""Install a :class:`StateEvaluator` bound to this engine's
``MockState``, clock, and location. Returns the new evaluator
so the caller can introspect it.
Equivalent to ``engine.set_condition_evaluator(
StateEvaluator(panel.state, clock=clock, location=loc))``.
"""
evaluator = StateEvaluator(
self._panel.state, clock=self._clock, location=self._location,
)
self._condition_evaluator = evaluator
return evaluator
@staticmethod @staticmethod
def _default_condition_evaluator(condition: Program) -> bool: def _default_condition_evaluator(condition: Program) -> bool:
"""Stub evaluator — caller should override via set_condition_evaluator.""" """Stub evaluator — caller should override via set_condition_evaluator."""

View File

@ -19,7 +19,13 @@ from omni_pca.program_engine import (
RealClock, RealClock,
classify, classify,
) )
from omni_pca.programs import Days, Program, ProgramType from omni_pca.programs import (
CondArgType,
CondOP,
Days,
Program,
ProgramType,
)
CONTROLLER_KEY = bytes(range(16)) CONTROLLER_KEY = bytes(range(16))
@ -1017,3 +1023,332 @@ async def test_engine_every_chain_fires_on_interval() -> None:
await clock.advance_to(t0 + timedelta(seconds=60 * tick + 1)) await clock.advance_to(t0 + timedelta(seconds=60 * tick + 1))
await asyncio.sleep(0) await asyncio.sleep(0)
assert engine.metrics.clausal_fired == 3 assert engine.metrics.clausal_fired == 3
# ---- Phase 6: detailed AND/OR semantics (StateEvaluator) ----------------
from omni_pca.mock_panel import ( # noqa: E402
MockAreaState,
MockThermostatState,
MockUnitState,
MockZoneState,
)
from omni_pca.program_engine import StateEvaluator # noqa: E402
def _and_traditional(
slot: int, family: int, instance: int = 0,
) -> Program:
"""Build a Traditional (OP=0) AND record.
Per clsConditionLine.Cond synthesis (clsConditionLine.cs:17-33),
the family byte lives in disk byte 1 = ``and_family`` (Python's
``cond & 0xFF``), and the object instance lives in disk byte 3
= ``and_instance`` (Python's ``(cond2 >> 8) & 0xFF``).
"""
return Program(
slot=slot, prog_type=int(ProgramType.AND),
cond=family & 0xFF, # byte 1 = family; byte 2 (OP) = 0
cond2=(instance & 0xFF) << 8, # byte 3 = instance; byte 4 = 0
)
def _and_structured(
slot: int,
op: int,
arg1_type: int, arg1_ix: int, arg1_field: int,
arg2_type: int, arg2_ix: int, arg2_field: int = 0,
) -> Program:
"""Build a Structured (OP>0) AND record.
Field layout per programs.py decoders:
cond high byte (>>8 & 0xFF) = op (and_op)
cond low byte (& 0xFF) = arg1_argtype (and_arg1_argtype)
cond2 = arg1_ix (and_arg1_ix)
cmd = arg1_field (and_arg1_field)
par = arg2_argtype (and_arg2_argtype)
pr2 = arg2_ix (and_arg2_ix)
month = arg2_field (and_arg2_field)
"""
return Program(
slot=slot, prog_type=int(ProgramType.AND),
cond=(op << 8) | arg1_type,
cond2=arg1_ix,
cmd=arg1_field,
par=arg2_type,
pr2=arg2_ix,
month=arg2_field,
)
def _state_with(**kwargs) -> MockState:
return MockState(**kwargs)
# ---- Traditional ZONE family -------------------------------------------
def test_state_evaluator_zone_secure_passes_when_state_zero() -> None:
state = _state_with(zones={
7: MockZoneState(name="FRONT DOOR", current_state=0),
})
ev = StateEvaluator(state)
# ZONE family = 0x04 (secure variant); instance = 7
cond = _and_traditional(1, family=0x04, instance=7)
assert ev(cond) is True
def test_state_evaluator_zone_secure_fails_when_tripped() -> None:
state = _state_with(zones={
7: MockZoneState(name="FRONT DOOR", current_state=1), # not-ready
})
ev = StateEvaluator(state)
cond = _and_traditional(1, family=0x04, instance=7)
assert ev(cond) is False
def test_state_evaluator_zone_not_ready_passes_when_tripped() -> None:
state = _state_with(zones={
7: MockZoneState(name="FRONT DOOR", current_state=1),
})
ev = StateEvaluator(state)
# family 0x06 = ZONE + NOT_READY selector bit
cond = _and_traditional(1, family=0x06, instance=7)
assert ev(cond) is True
def test_state_evaluator_zone_undefined_is_secure() -> None:
"""Undefined zone reads as SECURE — matches real-panel behaviour
when a programmed zone slot doesn't exist."""
ev = StateEvaluator(_state_with()) # no zones
secure_check = _and_traditional(1, family=0x04, instance=99)
not_ready_check = _and_traditional(2, family=0x06, instance=99)
assert ev(secure_check) is True
assert ev(not_ready_check) is False
# ---- Traditional CTRL family --------------------------------------------
def test_state_evaluator_unit_on_passes_when_state_one() -> None:
state = _state_with(units={5: MockUnitState(name="LAMP", state=1)})
ev = StateEvaluator(state)
# CTRL family + ON selector = 0x0A
cond = _and_traditional(1, family=0x0A, instance=5)
assert ev(cond) is True
def test_state_evaluator_unit_off_passes_when_state_zero() -> None:
state = _state_with(units={5: MockUnitState(name="LAMP", state=0)})
ev = StateEvaluator(state)
# CTRL family + OFF selector = 0x08
cond = _and_traditional(1, family=0x08, instance=5)
assert ev(cond) is True
def test_state_evaluator_unit_on_for_dimmed_unit() -> None:
"""Dim level 50% → state=150; ON predicate should pass."""
state = _state_with(units={5: MockUnitState(state=150)})
ev = StateEvaluator(state)
cond = _and_traditional(1, family=0x0A, instance=5)
assert ev(cond) is True
# ---- Traditional SEC family ---------------------------------------------
def test_state_evaluator_security_mode_match() -> None:
"""SEC family: family byte = (mode << 4) | area. Area 1 in mode 2."""
state = _state_with(areas={1: MockAreaState(name="MAIN", mode=2)})
ev = StateEvaluator(state)
cond = _and_traditional(1, family=(2 << 4) | 1) # mode 2, area 1 = 0x21
assert ev(cond) is True
# Area in different mode → fails.
cond_wrong = _and_traditional(1, family=(3 << 4) | 1) # mode 3
assert ev(cond_wrong) is False
# ---- Traditional OTHER family -------------------------------------------
def test_state_evaluator_never_is_always_false() -> None:
"""MiscConditional.NEVER (= 1) → always False, regardless of state."""
ev = StateEvaluator(_state_with())
# OTHER family + NEVER misc = 0x01
cond = _and_traditional(1, family=0x01)
assert ev(cond) is False
def test_state_evaluator_dark_without_location_is_false() -> None:
ev = StateEvaluator(_state_with()) # no location
# OTHER family + DARK misc = 0x03
cond = _and_traditional(1, family=0x03)
assert ev(cond) is False
# ---- Structured: Zone fields --------------------------------------------
def test_state_evaluator_structured_zone_current_state_eq() -> None:
"""Zone 5 CurrentState == 1 (not-ready) — structured form."""
state = _state_with(zones={5: MockZoneState(current_state=1)})
ev = StateEvaluator(state)
# EQ, Arg1=ZONE.CurrentState(2), Arg1IX=5, Arg2=CONSTANT, Arg2IX=1
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=1,
)
assert ev(cond) is True
def test_state_evaluator_structured_zone_state_ne() -> None:
state = _state_with(zones={5: MockZoneState(current_state=0)})
ev = StateEvaluator(state)
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_NE_ARG2),
arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=0,
)
assert ev(cond) is False # state IS 0
# ---- Structured: Thermostat fields --------------------------------------
def test_state_evaluator_structured_thermostat_temp_gt() -> None:
"""TEMPERATURE > 75 — structured comparison."""
# Thermostat raw temperature 168 ~ 76°F on the Omni linear scale,
# but we compare raw bytes here. Use a raw temp clearly above the
# constant.
state = _state_with(thermostats={
1: MockThermostatState(temperature_raw=80),
})
ev = StateEvaluator(state)
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_GT_ARG2),
arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=75,
)
assert ev(cond) is True
# And < should fail.
cond_lt = _and_structured(
slot=2, op=int(CondOP.ARG1_LT_ARG2),
arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=75,
)
assert ev(cond_lt) is False
# ---- Structured: TimeDate fields ----------------------------------------
def test_state_evaluator_structured_timedate_hour_compare() -> None:
"""Current hour == 22 — uses the clock."""
clock = FakeClock(datetime(2026, 5, 14, 22, 30, tzinfo=timezone.utc))
ev = StateEvaluator(_state_with(), clock=clock)
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8, # Hour
arg2_type=int(CondArgType.CONSTANT), arg2_ix=22,
)
assert ev(cond) is True
def test_state_evaluator_structured_timedate_dayofweek() -> None:
"""DayOfWeek == 4 (Thursday)."""
clock = FakeClock(datetime(2026, 5, 14, 12, 0, tzinfo=timezone.utc)) # Thu
ev = StateEvaluator(_state_with(), clock=clock)
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=5, # DOW
arg2_type=int(CondArgType.CONSTANT), arg2_ix=4,
)
assert ev(cond) is True
def test_state_evaluator_structured_timedate_without_clock_is_false() -> None:
"""No clock → TimeDate predicates resolve to None → comparison False."""
ev = StateEvaluator(_state_with()) # no clock
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=22,
)
assert ev(cond) is False
# ---- Engine integration --------------------------------------------------
@pytest.mark.asyncio
async def test_engine_use_state_evaluator_gates_real_conditions() -> None:
"""End-to-end: WHEN + AND IF UNIT 3 ON + THEN UNIT 9 ON.
Chain fires only when unit 3 is on at the time the event arrives."""
button_evt = event_id_user_macro_button(5)
# WHEN button 5; AND IF unit 3 ON; THEN unit 9 ON.
when = _when(1, button_evt)
and_cond = _and_traditional(2, family=0x0A, instance=3) # CTRL + ON, unit 3
then = _then_action(3, int(Command.UNIT_ON), 9)
# Start with unit 3 OFF.
panel = MockPanel(
controller_key=CONTROLLER_KEY,
state=MockState(
programs={
1: when.encode_wire_bytes(),
2: and_cond.encode_wire_bytes(),
3: then.encode_wire_bytes(),
},
units={3: MockUnitState(state=0)},
),
)
async with ProgramEngine(panel, clock=FakeClock(
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
)) as engine:
engine.use_state_evaluator()
# Unit 3 is OFF — chain blocked.
fired = await engine.emit_user_macro_button(5)
assert fired == 0
assert 9 not in panel.state.units or panel.state.units[9].state == 0
# Turn unit 3 ON, re-emit — chain fires.
panel.state.units[3].state = 1
fired = await engine.emit_user_macro_button(5)
assert fired == 1
assert panel.state.units[9].state == 1
@pytest.mark.asyncio
async def test_engine_state_evaluator_or_alternative() -> None:
"""WHEN + AND IF zone 5 secure + OR + AND IF zone 6 secure + THEN.
Fires if either zone is secure."""
button_evt = event_id_user_macro_button(5)
when = _when(1, button_evt)
and_z5 = _and_traditional(2, family=0x04, instance=5) # ZONE 5 secure
or_break = _or_cond(3) # OR boundary
and_z6 = _and_traditional(4, family=0x04, instance=6) # ZONE 6 secure
then = _then_action(5, int(Command.UNIT_ON), 10)
# Zone 5 tripped, Zone 6 secure → group A fails, group B passes.
panel = MockPanel(
controller_key=CONTROLLER_KEY,
state=MockState(
programs={
p.slot: p.encode_wire_bytes()
for p in (when, and_z5, or_break, and_z6, then)
},
zones={
5: MockZoneState(current_state=1),
6: MockZoneState(current_state=0),
},
),
)
async with ProgramEngine(panel, clock=FakeClock(
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
)) as engine:
engine.use_state_evaluator()
fired = await engine.emit_user_macro_button(5)
assert fired == 1 # OR-alternative passed
assert panel.state.units[10].state == 1