diff --git a/src/omni_pca/program_engine.py b/src/omni_pca/program_engine.py index b6e2f15..cf67ad1 100644 --- a/src/omni_pca/program_engine.py +++ b/src/omni_pca/program_engine.py @@ -48,7 +48,16 @@ from dataclasses import dataclass, field from datetime import date, datetime, time, timedelta, timezone from typing import TYPE_CHECKING -from .programs import Days, Program, ProgramType, TimeKind +from .programs import ( + CondArgType, + CondOP, + Days, + MiscConditional, + Program, + ProgramCond, + ProgramType, + TimeKind, +) _log = logging.getLogger(__name__) @@ -605,6 +614,357 @@ def evaluate_conditions( return False +# -------------------------------------------------------------------------- +# State-aware AND/OR evaluator +# -------------------------------------------------------------------------- + + +class _UnsupportedCondition(Exception): + """Raised internally when an AND/OR record encodes a check we don't + yet evaluate. The caller (StateEvaluator) treats this as False — + "we can't prove this condition", so the chain stays guarded — and + logs once so the next semantic-decode pass has a punch list. + """ + + +class StateEvaluator: + """Decode AND/OR records against a :class:`MockState` snapshot. + + Each AND/OR :class:`Program` record encodes either: + + * **Traditional** (``and_op == 0``): a compact bit-packed condition + in the ``cond`` u16 selecting a family (ProgramCond: ZONE / CTRL / + TIME / SEC / OTHER) with an inline operand. Decoded per + ``clsText.GetConditionalText`` (clsText.cs:2224-2274). + + * **Structured** (``and_op > 0``): a ``Arg1 OP Arg2`` triple where + Arg1 and Arg2 are typed references (Zone / Unit / Thermostat / + TimeDate / Constant / …) plus per-type field selectors. The + operator is one of :class:`CondOP` (EQ / NE / LT / GT / …). + + Coverage in this initial cut: + + Traditional: + * ZONE family — zone secure / not-ready against MockZoneState + * CTRL family — unit on / off against MockUnitState + * SEC family — area in security-mode against MockAreaState + * OTHER family: ``NEVER`` (always false), ``LIGHT``/``DARK`` + approximated via the engine's sun events when location is set, + ``AC_POWER_OFF``/``ON`` (no model — return False), + ``BATTERY_LOW``/``OK`` (no model — return False) + * TIME family (time-clock enabled/disabled) — no MockState + slot for time-clock toggles, returns False + + Structured: + * Zone.CurrentState / ArmingState (== const) + * Unit.CurrentState / Level (== / >= / <= const) + * Thermostat.CurrentTemp / Humidity / setpoints (numeric compare) + * TimeDate.Month / Day / DayOfWeek / Hour / Minute (numeric compare) + + Anything else returns ``False`` and logs at DEBUG once per + condition class — keeps a chain *guarded* rather than fired-too- + eagerly when we can't yet decode the predicate. + + Time-related comparisons need a :class:`Clock`; pass one to honor + Hour/Minute/DayOfWeek predicates correctly. Without a clock those + return False. + """ + + def __init__( + self, + state, + *, + clock: Clock | None = None, + location: PanelLocation | None = None, + ) -> None: + self._state = state + self._clock = clock + self._location = location + + def __call__(self, condition: Program) -> bool: + """Evaluate one AND/OR record against the bound MockState. + + Treated as a plain predicate so callers can pass this instance + directly to ``ProgramEngine.set_condition_evaluator``. + """ + try: + if condition.and_op == CondOP.ARG1_TRADITIONAL: + return self._eval_traditional(condition) + return self._eval_structured(condition) + except _UnsupportedCondition as err: + _log.debug("evaluator: unsupported condition: %s", err) + return False + except Exception: + _log.exception("evaluator: condition evaluation crashed") + return False + + # ---- Traditional ------------------------------------------------------ + + def _eval_traditional(self, c: Program) -> bool: + """Decode a Traditional AND record. + + Per ``clsConditionLine.Cond`` (clsConditionLine.cs:17-33), the + compact-form cond is split across two AND-record slots: + + * disk byte 1 (= ``and_family``) carries the compact's high byte + (the family + selector encoding from GetConditionalText) + * disk bytes 3-4 (= ``and_arg1_ix``, ``and_instance`` derived + from ``cond2 >> 8``) carry the compact's low byte (the + object index, shifted into the high half) + + Family decoding mirrors clsText.GetConditionalText (clsText.cs: + 2224-2274): + + family & 0xFC == 0x00 → OTHER (low 4 bits = MiscConditional) + family & 0xFC == 0x04 → ZONE (bit 0x02 = NOT_READY, else SECURE) + family & 0xFC == 0x08 → CTRL (bit 0x02 = ON, else OFF) + family & 0xFC == 0x0C → TIME (bit 0x02 = ENABLED, else DIS.) + family >= 0x10 → SEC (high nibble = mode, low = area) + """ + family = c.and_family + instance = c.and_instance + family_major = family & 0xFC + secondary = bool(family & 0x02) # selector bit within the family + if family_major == 0x00: + return self._eval_other(family & 0x0F) + if family_major == ProgramCond.ZONE: + return self._eval_traditional_zone(instance, want_not_ready=secondary) + if family_major == ProgramCond.CTRL: + return self._eval_traditional_ctrl(instance, want_on=secondary) + if family_major == ProgramCond.TIME: + # Time-clock enabled / disabled — MockState doesn't model + # the enable bit, so we conservatively report disabled. + return False + # 0x10 and above: SEC family — high nibble = mode, low nibble = area. + return self._eval_traditional_sec( + area=family & 0x0F, mode=(family >> 4) & 0x07, + ) + + def _eval_traditional_zone(self, zone_num: int, *, want_not_ready: bool) -> bool: + """SECURE matches ``current_state == 0``; NOT_READY matches any + nonzero current_state (per the panel's display semantics).""" + zone = self._state.zones.get(zone_num) + if zone is None: + # Undefined zone reads as SECURE (matches real-panel behaviour + # when a programmed zone slot doesn't exist). + return not want_not_ready + if want_not_ready: + return zone.current_state != 0 + return zone.current_state == 0 + + def _eval_traditional_ctrl(self, unit_num: int, *, want_on: bool) -> bool: + """ON matches any nonzero ``state``; OFF matches ``state == 0``. + + MockUnitState.state encodes 0=off, 1=on, 100..200=dim level — + all nonzero values count as "on" for this predicate, which + matches the panel's binary on/off display. + """ + unit = self._state.units.get(unit_num) + if unit is None: + return not want_on # missing unit reads as OFF + on = unit.state != 0 + return on == want_on + + def _eval_traditional_sec(self, *, area: int, mode: int) -> bool: + """Area in security-mode N. ``area == 0`` means "any area in + this mode" per GetConditionalText:2262 — without a multi-area + model we approximate by checking area 1.""" + if area == 0: + area = 1 + a = self._state.areas.get(area) + if a is None: + return False + return a.mode == mode + + def _eval_other(self, misc_code: int) -> bool: + """OTHER family: low 4 bits = enuMiscConditional.""" + try: + cat = MiscConditional(misc_code) + except ValueError: + raise _UnsupportedCondition(f"unknown misc condition {misc_code}") + if cat == MiscConditional.NONE: + return True + if cat == MiscConditional.NEVER: + return False + if cat in (MiscConditional.LIGHT, MiscConditional.DARK): + light = self._is_light_outside() + if light is None: + # Can't determine — be conservative, don't fire either way. + return False + return light if cat == MiscConditional.LIGHT else not light + # PHONE_*, AC_POWER_*, BATTERY_*, ENERGY_COST_* — no MockState + # model for any of these yet. Conservatively False. + return False + + def _is_light_outside(self) -> bool | None: + """Approximate "is the sun up" against the engine's PanelLocation + + clock. Returns None when clock or location is missing + (caller decides what to do with the indeterminate result).""" + if self._clock is None or self._location is None: + return None + try: + now = self._clock.now() + sunrise, sunset = _sun_events(self._location, now.date()) + except Exception: + return None + return sunrise <= now <= sunset + + # ---- Structured ------------------------------------------------------- + + def _eval_structured(self, c: Program) -> bool: + """Evaluate ``Arg1 OP Arg2`` style conditions. + + Resolves Arg1 and Arg2 to numeric values via :meth:`_resolve_arg` + then compares with the operator. Comparison operators are + straightforward integer math; AND/OR/XOR at this layer are + treated as bitwise reductions on the resolved values (matches + the C# operator semantics for those uncommon op codes). + """ + op = c.and_op + arg1 = self._resolve_arg( + c.and_arg1_argtype, c.and_arg1_ix, c.and_arg1_field, + ) + arg2 = self._resolve_arg( + c.and_arg2_argtype, c.and_arg2_ix, c.and_arg2_field, + ) + if arg1 is None or arg2 is None: + return False + if op == CondOP.ARG1_EQ_ARG2: + return arg1 == arg2 + if op == CondOP.ARG1_NE_ARG2: + return arg1 != arg2 + if op == CondOP.ARG1_LT_ARG2: + return arg1 < arg2 + if op == CondOP.ARG1_GT_ARG2: + return arg1 > arg2 + if op == CondOP.ARG1_ODD: + return (arg1 & 1) == 1 + if op == CondOP.ARG1_EVEN: + return (arg1 & 1) == 0 + # MULTIPLE / IN / NOT_IN are bitfield checks the panel uses for + # day-of-week and area-set tests. arg2 is the bitmask. + if op == CondOP.ARG1_MULTIPLE_ARG2: + return arg2 != 0 and (arg1 % arg2) == 0 + if op == CondOP.ARG1_IN_ARG2: + return bool(arg1 & arg2) + if op == CondOP.ARG1_NOT_IN_ARG2: + return not bool(arg1 & arg2) + raise _UnsupportedCondition(f"unknown structured op {op}") + + def _resolve_arg( + self, argtype: int, ix: int, field: int, + ) -> int | None: + """Return the numeric value of one Arg side, or None if it can't + be resolved (unknown type, missing object, missing clock). + """ + if argtype == CondArgType.CONSTANT: + return ix + if argtype == CondArgType.ZONE: + return self._resolve_zone_field(ix, field) + if argtype == CondArgType.UNIT: + return self._resolve_unit_field(ix, field) + if argtype == CondArgType.THERMOSTAT: + return self._resolve_thermostat_field(ix, field) + if argtype == CondArgType.AREA: + return self._resolve_area_field(ix, field) + if argtype == CondArgType.TIME_DATE: + return self._resolve_timedate_field(field) + # USER_SETTING, AUXILLARY, AUDIO, ACCESS_CONTROL, MESSAGE, + # SYSTEM — no MockState models for these. Treat as unresolved + # so the comparison returns False. + raise _UnsupportedCondition(f"unsupported argtype {argtype}") + + def _resolve_zone_field(self, ix: int, field: int) -> int | None: + zone = self._state.zones.get(ix) + if zone is None: + return None + # enuZoneField: LoopReading=1, CurrentState=2, ArmingState=3, AlarmState=4 + if field == 1: + return zone.loop + if field == 2: + return zone.current_state + if field == 3: + return zone.arming_state + if field == 4: + return zone.latched_state + raise _UnsupportedCondition(f"zone field {field}") + + def _resolve_unit_field(self, ix: int, field: int) -> int | None: + unit = self._state.units.get(ix) + if unit is None: + return None + # enuUnitField: CurrentState=1, PreviousState=2, Timer=3, Level=4 + if field == 1: + # 0 = off, 1 = on, 100..200 = dim. The panel treats anything + # non-zero as "on" at this granularity. + return 1 if unit.state != 0 else 0 + if field == 3: + return unit.time_remaining + if field == 4: + # Level: panel returns 0..100% — derive from state byte. + if unit.state >= 100: + return unit.state - 100 + return 100 if unit.state == 1 else 0 + raise _UnsupportedCondition(f"unit field {field}") + + def _resolve_thermostat_field(self, ix: int, field: int) -> int | None: + t = self._state.thermostats.get(ix) + if t is None: + return None + # enuThermostatField map (subset MockState has data for): + # CurrentTemp=1, HeatSetpt=2, CoolSetpt=3, SystemMode=4, FanMode=5, + # HoldMode=6, Humidity=9, HumidifySetpoint=10, DehumidifySetpoint=11, + # OutdoorTemperature=12 + return { + 1: t.temperature_raw, + 2: t.heat_setpoint_raw, + 3: t.cool_setpoint_raw, + 4: t.system_mode, + 5: t.fan_mode, + 6: t.hold_mode, + 9: t.humidity_raw, + 10: t.humidify_setpoint_raw, + 11: t.dehumidify_setpoint_raw, + 12: t.outdoor_temperature_raw, + }.get(field, None) + + def _resolve_area_field(self, ix: int, field: int) -> int | None: + area = self._state.areas.get(ix) + if area is None: + return None + # No enuAreaField source — be permissive: field 1 = mode. + if field == 1: + return area.mode + raise _UnsupportedCondition(f"area field {field}") + + def _resolve_timedate_field(self, field: int) -> int | None: + if self._clock is None: + return None + now = self._clock.now() + # enuTimeDateField: Date=1, Year=2, Month=3, Day=4, DayOfWeek=5, + # Time=6, DST_Flag=7, Hour=8, Minute=9, SunriseSunset=10. + if field == 2: + return now.year + if field == 3: + return now.month + if field == 4: + return now.day + if field == 5: + # DayOfWeek: panel uses 1=Mon..7=Sun per clsHAC. Python + # weekday() returns Mon=0..Sun=6 — add 1. + return now.weekday() + 1 + if field == 6: + # Time-of-day encoded as (hour * 60 + minute) — minutes since + # midnight. Matches the C# packing in GetComplexConditionText. + return now.hour * 60 + now.minute + if field == 8: + return now.hour + if field == 9: + return now.minute + # Date / DST_Flag / SunriseSunset — not modelled here. + raise _UnsupportedCondition(f"timedate field {field}") + + # -------------------------------------------------------------------------- # Engine # -------------------------------------------------------------------------- @@ -687,10 +1047,27 @@ class ProgramEngine: bool. The default returns True for every AND, False for every OR (a degenerate evaluator that means "all chains' first AND groups always pass" — useful as a smoke-test default, not for - real automation). Real callers supply a state-aware evaluator. + real automation). + + For real automation, call :meth:`use_state_evaluator` instead + (or build your own :class:`StateEvaluator` and pass it here). """ self._condition_evaluator = fn + def use_state_evaluator(self) -> "StateEvaluator": + """Install a :class:`StateEvaluator` bound to this engine's + ``MockState``, clock, and location. Returns the new evaluator + so the caller can introspect it. + + Equivalent to ``engine.set_condition_evaluator( + StateEvaluator(panel.state, clock=clock, location=loc))``. + """ + evaluator = StateEvaluator( + self._panel.state, clock=self._clock, location=self._location, + ) + self._condition_evaluator = evaluator + return evaluator + @staticmethod def _default_condition_evaluator(condition: Program) -> bool: """Stub evaluator — caller should override via set_condition_evaluator.""" diff --git a/tests/test_program_engine.py b/tests/test_program_engine.py index 18fd07a..05137de 100644 --- a/tests/test_program_engine.py +++ b/tests/test_program_engine.py @@ -19,7 +19,13 @@ from omni_pca.program_engine import ( RealClock, classify, ) -from omni_pca.programs import Days, Program, ProgramType +from omni_pca.programs import ( + CondArgType, + CondOP, + Days, + Program, + ProgramType, +) CONTROLLER_KEY = bytes(range(16)) @@ -1017,3 +1023,332 @@ async def test_engine_every_chain_fires_on_interval() -> None: await clock.advance_to(t0 + timedelta(seconds=60 * tick + 1)) await asyncio.sleep(0) assert engine.metrics.clausal_fired == 3 + + +# ---- Phase 6: detailed AND/OR semantics (StateEvaluator) ---------------- + + +from omni_pca.mock_panel import ( # noqa: E402 + MockAreaState, + MockThermostatState, + MockUnitState, + MockZoneState, +) +from omni_pca.program_engine import StateEvaluator # noqa: E402 + + +def _and_traditional( + slot: int, family: int, instance: int = 0, +) -> Program: + """Build a Traditional (OP=0) AND record. + + Per clsConditionLine.Cond synthesis (clsConditionLine.cs:17-33), + the family byte lives in disk byte 1 = ``and_family`` (Python's + ``cond & 0xFF``), and the object instance lives in disk byte 3 + = ``and_instance`` (Python's ``(cond2 >> 8) & 0xFF``). + """ + return Program( + slot=slot, prog_type=int(ProgramType.AND), + cond=family & 0xFF, # byte 1 = family; byte 2 (OP) = 0 + cond2=(instance & 0xFF) << 8, # byte 3 = instance; byte 4 = 0 + ) + + +def _and_structured( + slot: int, + op: int, + arg1_type: int, arg1_ix: int, arg1_field: int, + arg2_type: int, arg2_ix: int, arg2_field: int = 0, +) -> Program: + """Build a Structured (OP>0) AND record. + + Field layout per programs.py decoders: + cond high byte (>>8 & 0xFF) = op (and_op) + cond low byte (& 0xFF) = arg1_argtype (and_arg1_argtype) + cond2 = arg1_ix (and_arg1_ix) + cmd = arg1_field (and_arg1_field) + par = arg2_argtype (and_arg2_argtype) + pr2 = arg2_ix (and_arg2_ix) + month = arg2_field (and_arg2_field) + """ + return Program( + slot=slot, prog_type=int(ProgramType.AND), + cond=(op << 8) | arg1_type, + cond2=arg1_ix, + cmd=arg1_field, + par=arg2_type, + pr2=arg2_ix, + month=arg2_field, + ) + + +def _state_with(**kwargs) -> MockState: + return MockState(**kwargs) + + +# ---- Traditional ZONE family ------------------------------------------- + + +def test_state_evaluator_zone_secure_passes_when_state_zero() -> None: + state = _state_with(zones={ + 7: MockZoneState(name="FRONT DOOR", current_state=0), + }) + ev = StateEvaluator(state) + # ZONE family = 0x04 (secure variant); instance = 7 + cond = _and_traditional(1, family=0x04, instance=7) + assert ev(cond) is True + + +def test_state_evaluator_zone_secure_fails_when_tripped() -> None: + state = _state_with(zones={ + 7: MockZoneState(name="FRONT DOOR", current_state=1), # not-ready + }) + ev = StateEvaluator(state) + cond = _and_traditional(1, family=0x04, instance=7) + assert ev(cond) is False + + +def test_state_evaluator_zone_not_ready_passes_when_tripped() -> None: + state = _state_with(zones={ + 7: MockZoneState(name="FRONT DOOR", current_state=1), + }) + ev = StateEvaluator(state) + # family 0x06 = ZONE + NOT_READY selector bit + cond = _and_traditional(1, family=0x06, instance=7) + assert ev(cond) is True + + +def test_state_evaluator_zone_undefined_is_secure() -> None: + """Undefined zone reads as SECURE — matches real-panel behaviour + when a programmed zone slot doesn't exist.""" + ev = StateEvaluator(_state_with()) # no zones + secure_check = _and_traditional(1, family=0x04, instance=99) + not_ready_check = _and_traditional(2, family=0x06, instance=99) + assert ev(secure_check) is True + assert ev(not_ready_check) is False + + +# ---- Traditional CTRL family -------------------------------------------- + + +def test_state_evaluator_unit_on_passes_when_state_one() -> None: + state = _state_with(units={5: MockUnitState(name="LAMP", state=1)}) + ev = StateEvaluator(state) + # CTRL family + ON selector = 0x0A + cond = _and_traditional(1, family=0x0A, instance=5) + assert ev(cond) is True + + +def test_state_evaluator_unit_off_passes_when_state_zero() -> None: + state = _state_with(units={5: MockUnitState(name="LAMP", state=0)}) + ev = StateEvaluator(state) + # CTRL family + OFF selector = 0x08 + cond = _and_traditional(1, family=0x08, instance=5) + assert ev(cond) is True + + +def test_state_evaluator_unit_on_for_dimmed_unit() -> None: + """Dim level 50% → state=150; ON predicate should pass.""" + state = _state_with(units={5: MockUnitState(state=150)}) + ev = StateEvaluator(state) + cond = _and_traditional(1, family=0x0A, instance=5) + assert ev(cond) is True + + +# ---- Traditional SEC family --------------------------------------------- + + +def test_state_evaluator_security_mode_match() -> None: + """SEC family: family byte = (mode << 4) | area. Area 1 in mode 2.""" + state = _state_with(areas={1: MockAreaState(name="MAIN", mode=2)}) + ev = StateEvaluator(state) + cond = _and_traditional(1, family=(2 << 4) | 1) # mode 2, area 1 = 0x21 + assert ev(cond) is True + # Area in different mode → fails. + cond_wrong = _and_traditional(1, family=(3 << 4) | 1) # mode 3 + assert ev(cond_wrong) is False + + +# ---- Traditional OTHER family ------------------------------------------- + + +def test_state_evaluator_never_is_always_false() -> None: + """MiscConditional.NEVER (= 1) → always False, regardless of state.""" + ev = StateEvaluator(_state_with()) + # OTHER family + NEVER misc = 0x01 + cond = _and_traditional(1, family=0x01) + assert ev(cond) is False + + +def test_state_evaluator_dark_without_location_is_false() -> None: + ev = StateEvaluator(_state_with()) # no location + # OTHER family + DARK misc = 0x03 + cond = _and_traditional(1, family=0x03) + assert ev(cond) is False + + +# ---- Structured: Zone fields -------------------------------------------- + + +def test_state_evaluator_structured_zone_current_state_eq() -> None: + """Zone 5 CurrentState == 1 (not-ready) — structured form.""" + state = _state_with(zones={5: MockZoneState(current_state=1)}) + ev = StateEvaluator(state) + # EQ, Arg1=ZONE.CurrentState(2), Arg1IX=5, Arg2=CONSTANT, Arg2IX=1 + cond = _and_structured( + slot=1, op=int(CondOP.ARG1_EQ_ARG2), + arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2, + arg2_type=int(CondArgType.CONSTANT), arg2_ix=1, + ) + assert ev(cond) is True + + +def test_state_evaluator_structured_zone_state_ne() -> None: + state = _state_with(zones={5: MockZoneState(current_state=0)}) + ev = StateEvaluator(state) + cond = _and_structured( + slot=1, op=int(CondOP.ARG1_NE_ARG2), + arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2, + arg2_type=int(CondArgType.CONSTANT), arg2_ix=0, + ) + assert ev(cond) is False # state IS 0 + + +# ---- Structured: Thermostat fields -------------------------------------- + + +def test_state_evaluator_structured_thermostat_temp_gt() -> None: + """TEMPERATURE > 75 — structured comparison.""" + # Thermostat raw temperature 168 ~ 76°F on the Omni linear scale, + # but we compare raw bytes here. Use a raw temp clearly above the + # constant. + state = _state_with(thermostats={ + 1: MockThermostatState(temperature_raw=80), + }) + ev = StateEvaluator(state) + cond = _and_structured( + slot=1, op=int(CondOP.ARG1_GT_ARG2), + arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1, + arg2_type=int(CondArgType.CONSTANT), arg2_ix=75, + ) + assert ev(cond) is True + # And < should fail. + cond_lt = _and_structured( + slot=2, op=int(CondOP.ARG1_LT_ARG2), + arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1, + arg2_type=int(CondArgType.CONSTANT), arg2_ix=75, + ) + assert ev(cond_lt) is False + + +# ---- Structured: TimeDate fields ---------------------------------------- + + +def test_state_evaluator_structured_timedate_hour_compare() -> None: + """Current hour == 22 — uses the clock.""" + clock = FakeClock(datetime(2026, 5, 14, 22, 30, tzinfo=timezone.utc)) + ev = StateEvaluator(_state_with(), clock=clock) + cond = _and_structured( + slot=1, op=int(CondOP.ARG1_EQ_ARG2), + arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8, # Hour + arg2_type=int(CondArgType.CONSTANT), arg2_ix=22, + ) + assert ev(cond) is True + + +def test_state_evaluator_structured_timedate_dayofweek() -> None: + """DayOfWeek == 4 (Thursday).""" + clock = FakeClock(datetime(2026, 5, 14, 12, 0, tzinfo=timezone.utc)) # Thu + ev = StateEvaluator(_state_with(), clock=clock) + cond = _and_structured( + slot=1, op=int(CondOP.ARG1_EQ_ARG2), + arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=5, # DOW + arg2_type=int(CondArgType.CONSTANT), arg2_ix=4, + ) + assert ev(cond) is True + + +def test_state_evaluator_structured_timedate_without_clock_is_false() -> None: + """No clock → TimeDate predicates resolve to None → comparison False.""" + ev = StateEvaluator(_state_with()) # no clock + cond = _and_structured( + slot=1, op=int(CondOP.ARG1_EQ_ARG2), + arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8, + arg2_type=int(CondArgType.CONSTANT), arg2_ix=22, + ) + assert ev(cond) is False + + +# ---- Engine integration -------------------------------------------------- + + +@pytest.mark.asyncio +async def test_engine_use_state_evaluator_gates_real_conditions() -> None: + """End-to-end: WHEN + AND IF UNIT 3 ON + THEN UNIT 9 ON. + Chain fires only when unit 3 is on at the time the event arrives.""" + button_evt = event_id_user_macro_button(5) + # WHEN button 5; AND IF unit 3 ON; THEN unit 9 ON. + when = _when(1, button_evt) + and_cond = _and_traditional(2, family=0x0A, instance=3) # CTRL + ON, unit 3 + then = _then_action(3, int(Command.UNIT_ON), 9) + + # Start with unit 3 OFF. + panel = MockPanel( + controller_key=CONTROLLER_KEY, + state=MockState( + programs={ + 1: when.encode_wire_bytes(), + 2: and_cond.encode_wire_bytes(), + 3: then.encode_wire_bytes(), + }, + units={3: MockUnitState(state=0)}, + ), + ) + async with ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) as engine: + engine.use_state_evaluator() + # Unit 3 is OFF — chain blocked. + fired = await engine.emit_user_macro_button(5) + assert fired == 0 + assert 9 not in panel.state.units or panel.state.units[9].state == 0 + + # Turn unit 3 ON, re-emit — chain fires. + panel.state.units[3].state = 1 + fired = await engine.emit_user_macro_button(5) + assert fired == 1 + assert panel.state.units[9].state == 1 + + +@pytest.mark.asyncio +async def test_engine_state_evaluator_or_alternative() -> None: + """WHEN + AND IF zone 5 secure + OR + AND IF zone 6 secure + THEN. + Fires if either zone is secure.""" + button_evt = event_id_user_macro_button(5) + when = _when(1, button_evt) + and_z5 = _and_traditional(2, family=0x04, instance=5) # ZONE 5 secure + or_break = _or_cond(3) # OR boundary + and_z6 = _and_traditional(4, family=0x04, instance=6) # ZONE 6 secure + then = _then_action(5, int(Command.UNIT_ON), 10) + + # Zone 5 tripped, Zone 6 secure → group A fails, group B passes. + panel = MockPanel( + controller_key=CONTROLLER_KEY, + state=MockState( + programs={ + p.slot: p.encode_wire_bytes() + for p in (when, and_z5, or_break, and_z6, then) + }, + zones={ + 5: MockZoneState(current_state=1), + 6: MockZoneState(current_state=0), + }, + ), + ) + async with ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) as engine: + engine.use_state_evaluator() + fired = await engine.emit_user_macro_button(5) + assert fired == 1 # OR-alternative passed + assert panel.state.units[10].state == 1