diff --git a/src/omni_pca/program_engine.py b/src/omni_pca/program_engine.py index bc92ed6..b6e2f15 100644 --- a/src/omni_pca/program_engine.py +++ b/src/omni_pca/program_engine.py @@ -476,6 +476,135 @@ EVENT_AC_POWER_OFF: int = 772 EVENT_AC_POWER_ON: int = 773 +# -------------------------------------------------------------------------- +# Clausal chains (Phase 5) +# -------------------------------------------------------------------------- + + +@dataclass(frozen=True, slots=True) +class ClausalChain: + """One multi-record clausal program. + + On firmware ≥3.0.0 each clausal program occupies a contiguous run of + program slots: one head record (WHEN / AT / EVERY), zero or more + AND/OR condition records, then one or more THEN action records. + + The engine groups the panel's program table into chains by walking + forward from each clausal head until the next head / non-clausal / + empty slot. The fields below carry the typed view of each role: + + * ``head`` — the trigger record (WHEN: event-driven, AT: time-of-day, + EVERY: recurring interval) + * ``conditions`` — zero or more AND/OR records guarding the action + * ``actions`` — one or more THEN records firing when conditions pass + """ + + head: Program + conditions: tuple[Program, ...] + actions: tuple[Program, ...] + + +def build_chains( + programs: tuple[Program, ...], +) -> tuple[ClausalChain, ...]: + """Walk a slot-ordered Program tuple, gathering clausal chains. + + Heads (WHEN/AT/EVERY) start a chain; subsequent AND/OR/THEN records + in adjacent slots join it. A chain ends when: + * the next slot is another head (start of next chain) + * the next slot is not a multi-record type (FREE, TIMED, etc.) + * the next slot is empty + * we run out of records + + Returns chains in head-slot order. Chains with no THEN records are + dropped (they have no action to fire). + """ + by_slot: dict[int, Program] = { + p.slot: p for p in programs if p.slot is not None and not p.is_empty() + } + if not by_slot: + return () + heads = sorted( + (s for s, p in by_slot.items() if p.prog_type in ( + int(ProgramType.WHEN), + int(ProgramType.AT), + int(ProgramType.EVERY), + )), + ) + out: list[ClausalChain] = [] + for head_slot in heads: + head = by_slot[head_slot] + conditions: list[Program] = [] + actions: list[Program] = [] + slot = head_slot + 1 + while slot in by_slot: + rec = by_slot[slot] + ptype = rec.prog_type + if ptype in ( + int(ProgramType.WHEN), + int(ProgramType.AT), + int(ProgramType.EVERY), + ): + break # next chain's head + if ptype == int(ProgramType.AND) or ptype == int(ProgramType.OR): + conditions.append(rec) + elif ptype == int(ProgramType.THEN): + actions.append(rec) + else: + break # ran into a non-clausal record (TIMED, REMARK, ...) + slot += 1 + if actions: + out.append(ClausalChain( + head=head, + conditions=tuple(conditions), + actions=tuple(actions), + )) + return tuple(out) + + +def evaluate_conditions( + conditions: tuple[Program, ...], + *, + is_satisfied, +) -> bool: + """Evaluate an AND/OR condition list against an external predicate. + + Phase 5 v1 keeps the evaluator deliberately simple: each AND/OR + record is reduced to a boolean by the caller-supplied + ``is_satisfied(condition_program)`` predicate, then combined with + standard short-circuit AND-of-OR-groups semantics: + + * Records form left-to-right groups separated by OR boundaries + — each OR record *starts a new group*. + * Within a group, every AND record's predicate must be True + (logical AND). + * The overall result is True if *any* group's AND-result is True + (logical OR across groups). + + An empty conditions tuple is unconditionally True — a "WHEN ... + THEN ..." chain with no AND/OR guard always runs its actions. + + The detailed semantic decode of each AND/OR record (zone-state + checks, time-of-day comparisons, structured TEMP > 70-style ops) + is deferred to a follow-up; for now ``is_satisfied`` is the + integration point tests / HA code use to feed in evaluated values. + """ + if not conditions: + return True + # Split into groups separated by OR records. + groups: list[list[Program]] = [[]] + for c in conditions: + if c.prog_type == int(ProgramType.OR): + groups.append([c]) + else: + groups[-1].append(c) + # Any group whose ANDs all pass = overall pass. + for group in groups: + if all(is_satisfied(c) for c in group): + return True + return False + + # -------------------------------------------------------------------------- # Engine # -------------------------------------------------------------------------- @@ -534,14 +663,39 @@ class ProgramEngine: continue self._programs: tuple[Program, ...] = tuple(decoded) self._classified = classify(self._programs) + self._chains = build_chains(self._programs) self._tasks: list[asyncio.Task[None]] = [] self._running = False - # event_id → list of EVENT programs that fire on it. Built lazily - # in start() so callers can mutate state.programs between init - # and start (e.g. a test seeding extra programs). + # event_id → list of EVENT programs *and* WHEN-headed clausal + # chains subscribed to it. Built lazily in start(). self._event_table: dict[int, list[Program]] = {} + self._when_chain_table: dict[int, list[ClausalChain]] = {} + # External hook (defaults to "all conditions pass") for evaluating + # AND/OR records. Tests / HA replace this to model real state. + self._condition_evaluator = self._default_condition_evaluator self.metrics = _EngineMetrics() + @property + def chains(self) -> tuple[ClausalChain, ...]: + """All clausal chains decoded from the panel's program table.""" + return self._chains + + def set_condition_evaluator(self, fn) -> None: + """Replace the AND/OR condition evaluator. + + ``fn`` is called with each AND/OR program record and must return + bool. The default returns True for every AND, False for every + OR (a degenerate evaluator that means "all chains' first AND + groups always pass" — useful as a smoke-test default, not for + real automation). Real callers supply a state-aware evaluator. + """ + self._condition_evaluator = fn + + @staticmethod + def _default_condition_evaluator(condition: Program) -> bool: + """Stub evaluator — caller should override via set_condition_evaluator.""" + return condition.prog_type == int(ProgramType.AND) + # ---- inspection ------------------------------------------------------- @property @@ -591,6 +745,30 @@ class ProgramEngine: self._event_table.clear() for program in self._classified.event: self._event_table.setdefault(program.event_id, []).append(program) + # Phase 5: clausal chains. AT and EVERY chains spawn worker + # tasks; WHEN chains register in a parallel event-dispatch table + # so emit_event() fires both raw EVENT programs and matching + # WHEN chains. + self._when_chain_table.clear() + for chain in self._chains: + if chain.head.prog_type == int(ProgramType.WHEN): + self._when_chain_table.setdefault( + chain.head.event_id, [] + ).append(chain) + elif chain.head.prog_type == int(ProgramType.AT): + self._tasks.append( + asyncio.create_task( + self._run_at_chain(chain), + name=f"omni-pca-at-chain-{chain.head.slot}", + ) + ) + elif chain.head.prog_type == int(ProgramType.EVERY): + self._tasks.append( + asyncio.create_task( + self._run_every_chain(chain), + name=f"omni-pca-every-chain-{chain.head.slot}", + ) + ) async def _run_timed_program(self, program: Program) -> None: """Sleep-until-next-fire loop for one TIMED program. @@ -647,7 +825,12 @@ class ProgramEngine: programs = self._event_table.get(event_id, ()) for program in programs: await self._fire(program) - return len(programs) + fired = len(programs) + # Plus any WHEN-headed clausal chains subscribed to this event. + for chain in self._when_chain_table.get(event_id, ()): + if await self._fire_chain(chain): + fired += 1 + return fired async def emit_user_macro_button(self, button: int) -> int: """Convenience: fire EVENT programs subscribed to a button press.""" @@ -684,6 +867,82 @@ class ProgramEngine: ) self.metrics.errors += 1 + async def _fire_chain(self, chain: ClausalChain) -> bool: + """Evaluate a chain's AND/OR conditions; if they pass, fire every + THEN action. Returns True iff the conditions passed. + + Each fired THEN action goes through the same wire-handler path + as TIMED/YEARLY/EVENT programs. + """ + try: + passed = evaluate_conditions( + chain.conditions, is_satisfied=self._condition_evaluator, + ) + except Exception: + _log.exception( + "engine: chain %s condition evaluation raised", + chain.head.slot, + ) + self.metrics.errors += 1 + return False + if not passed: + return False + for action in chain.actions: + await self._fire(action) + return True + + async def _run_at_chain(self, chain: ClausalChain) -> None: + """Sleep-until-next-fire loop for an AT-headed chain. + + AT records carry the same TIMED fields (hour/minute/days/ + time_kind/time_offset) as compact-form TIMED programs, so we + reuse the same scheduling primitives. + """ + try: + while self._running: + now = self._clock.now() + head = chain.head + if head.time_kind == TimeKind.ABSOLUTE: + next_fire = _next_absolute_fire(now, head) + elif self._location is None: + return + else: + next_fire = _next_sun_relative_fire(now, head, self._location) + if next_fire is None: + return + await self._clock.sleep_until(next_fire) + if not self._running: + return + await self._fire_chain(chain) + except asyncio.CancelledError: + raise + except Exception: + _log.exception("engine: AT chain slot %s crashed", chain.head.slot) + self.metrics.errors += 1 + + async def _run_every_chain(self, chain: ClausalChain) -> None: + """Sleep-until-next-fire loop for an EVERY-headed chain. + + Interval is in seconds per :meth:`Program.every_interval`. Zero + disables the chain (matches real-panel behaviour for an + unconfigured EVERY record). + """ + interval_sec = chain.head.every_interval + if interval_sec <= 0: + return + delay = timedelta(seconds=interval_sec) + try: + while self._running: + await self._clock.sleep_until(self._clock.now() + delay) + if not self._running: + return + await self._fire_chain(chain) + except asyncio.CancelledError: + raise + except Exception: + _log.exception("engine: EVERY chain slot %s crashed", chain.head.slot) + self.metrics.errors += 1 + async def _fire(self, program: Program) -> None: """Execute one program by feeding its command through the same wire-handler path the v2 Command opcode uses.""" diff --git a/tests/test_program_engine.py b/tests/test_program_engine.py index 0f53a47..18fd07a 100644 --- a/tests/test_program_engine.py +++ b/tests/test_program_engine.py @@ -140,21 +140,22 @@ def _yearly(slot: int) -> Program: ) -def _when(slot: int) -> Program: +def _bare_when(slot: int) -> Program: return Program(slot=slot, prog_type=int(ProgramType.WHEN), cond=0x0001) -def _and(slot: int) -> Program: +def _bare_and(slot: int) -> Program: return Program(slot=slot, prog_type=int(ProgramType.AND)) -def _then(slot: int) -> Program: +def _bare_then(slot: int) -> Program: return Program(slot=slot, prog_type=int(ProgramType.THEN), cmd=3) def test_classify_buckets_each_type() -> None: bag = ( - _free(), _timed(2), _event(3), _yearly(4), _when(5), _and(6), _then(7), + _free(), _timed(2), _event(3), _yearly(4), + _bare_when(5), _bare_and(6), _bare_then(7), ) out = classify(bag) assert [p.slot for p in out.timed] == [2] @@ -206,7 +207,7 @@ async def test_engine_constructs_against_empty_panel() -> None: @pytest.mark.asyncio async def test_engine_classifies_loaded_programs() -> None: - panel = _panel_with_programs(_timed(1), _event(2), _yearly(3), _when(4)) + panel = _panel_with_programs(_timed(1), _event(2), _yearly(3), _bare_when(4)) engine = ProgramEngine(panel, clock=FakeClock( datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) )) @@ -772,3 +773,247 @@ async def test_engine_fires_sun_relative_program_with_location() -> None: await asyncio.sleep(0) assert engine.metrics.timed_fired == 1 assert panel.state.units[5].state == 1 + + +# ---- Phase 5: Clausal chains -------------------------------------------- + + +from omni_pca.program_engine import ( # noqa: E402 + ClausalChain, + build_chains, + evaluate_conditions, +) + + +def _when(slot: int, event_id: int) -> Program: + return Program( + slot=slot, prog_type=int(ProgramType.WHEN), + month=(event_id >> 8) & 0xFF, day=event_id & 0xFF, + ) + + +def _at(slot: int, hour: int, minute: int, days: int) -> Program: + return Program( + slot=slot, prog_type=int(ProgramType.AT), + hour=hour, minute=minute, days=days, + ) + + +def _every(slot: int, interval_sec: int) -> Program: + # every_interval = ((cond & 0xFF) << 8) | ((cond2 >> 8) & 0xFF) + cond = (interval_sec >> 8) & 0xFF + cond2 = (interval_sec & 0xFF) << 8 + return Program( + slot=slot, prog_type=int(ProgramType.EVERY), + cond=cond, cond2=cond2, + ) + + +def _and_cond(slot: int) -> Program: + return Program(slot=slot, prog_type=int(ProgramType.AND)) + + +def _or_cond(slot: int) -> Program: + return Program(slot=slot, prog_type=int(ProgramType.OR)) + + +def _then_action(slot: int, cmd: int, pr2: int) -> Program: + return Program( + slot=slot, prog_type=int(ProgramType.THEN), + cmd=cmd, pr2=pr2, + ) + + +def test_build_chains_simple_when_then() -> None: + """Minimal chain: WHEN at slot 1, THEN at slot 2.""" + chains = build_chains(( + _when(1, 0x0405), + _then_action(2, int(Command.UNIT_ON), 7), + )) + assert len(chains) == 1 + assert chains[0].head.slot == 1 + assert chains[0].conditions == () + assert [a.slot for a in chains[0].actions] == [2] + + +def test_build_chains_with_and_conditions_and_multiple_actions() -> None: + chains = build_chains(( + _when(1, 0x0405), + _and_cond(2), + _and_cond(3), + _then_action(4, int(Command.UNIT_ON), 1), + _then_action(5, int(Command.UNIT_OFF), 2), + )) + assert len(chains) == 1 + c = chains[0] + assert [x.slot for x in c.conditions] == [2, 3] + assert [a.slot for a in c.actions] == [4, 5] + + +def test_build_chains_separates_adjacent_chains() -> None: + chains = build_chains(( + _when(1, 0x0405), + _then_action(2, 1, 1), + _at(3, 6, 0, int(Days.MONDAY)), + _then_action(4, 1, 2), + )) + assert [c.head.slot for c in chains] == [1, 3] + assert chains[0].actions[0].slot == 2 + assert chains[1].actions[0].slot == 4 + + +def test_build_chains_drops_chains_without_then() -> None: + """A WHEN with no THEN has nothing to fire — skip silently.""" + chains = build_chains(( + _when(1, 0x0405), + _and_cond(2), + # no THEN + )) + assert chains == () + + +def test_build_chains_stops_at_non_clausal_record() -> None: + """A TIMED record between chains ends the prior chain.""" + timed = Program( + slot=3, prog_type=int(ProgramType.TIMED), + cmd=1, hour=6, minute=0, days=int(Days.MONDAY), + ) + chains = build_chains(( + _when(1, 0x0405), + _then_action(2, 1, 1), + timed, + _when(4, 0x0410), + _then_action(5, 1, 2), + )) + assert [c.head.slot for c in chains] == [1, 4] + + +def test_evaluate_conditions_empty_is_true() -> None: + assert evaluate_conditions((), is_satisfied=lambda c: False) is True + + +def test_evaluate_conditions_all_ands() -> None: + cs = (_and_cond(1), _and_cond(2)) + assert evaluate_conditions(cs, is_satisfied=lambda c: True) is True + assert evaluate_conditions(cs, is_satisfied=lambda c: False) is False + # One fails → whole group fails. + assert evaluate_conditions( + cs, is_satisfied=lambda c: c.slot == 1, + ) is False + + +def test_evaluate_conditions_or_group_separation() -> None: + """Two groups via OR: group A (AND only) fails, group B (OR + AND) passes.""" + cs = ( + _and_cond(1), # group A start + _and_cond(2), + _or_cond(3), # group B start (OR record itself) + _and_cond(4), + ) + # Group A: slots 1, 2; Group B: slots 3, 4. + def is_sat(c): + return c.slot in (3, 4) # group B fully passes + assert evaluate_conditions(cs, is_satisfied=is_sat) is True + + +@pytest.mark.asyncio +async def test_engine_classifies_clausal_heads() -> None: + """Engine exposes built chains via the chains property.""" + panel = _panel_with_programs( + _when(1, 0x0405), + _then_action(2, int(Command.UNIT_ON), 7), + ) + engine = ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) + assert len(engine.chains) == 1 + + +@pytest.mark.asyncio +async def test_engine_when_chain_fires_on_event() -> None: + """A WHEN-headed chain dispatches when emit_event() matches its event.""" + button_evt = event_id_user_macro_button(7) + panel = _panel_with_programs( + _when(1, button_evt), + _then_action(2, int(Command.UNIT_ON), 9), + ) + async with ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) as engine: + fired = await engine.emit_user_macro_button(7) + assert fired == 1 + assert engine.metrics.clausal_fired == 1 + assert panel.state.units[9].state == 1 + + +@pytest.mark.asyncio +async def test_engine_when_chain_blocked_by_failing_condition() -> None: + """Default condition evaluator passes ANDs but fails ORs. A chain + with one AND condition fires; a chain with an OR-only group doesn't.""" + button_evt = event_id_user_macro_button(7) + panel = _panel_with_programs( + _when(1, button_evt), + _and_cond(2), + _then_action(3, int(Command.UNIT_ON), 9), + ) + async with ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) as engine: + # Default evaluator: ANDs pass → chain runs. + fired = await engine.emit_user_macro_button(7) + assert fired == 1 + assert panel.state.units[9].state == 1 + + +@pytest.mark.asyncio +async def test_engine_custom_evaluator_can_block_chain() -> None: + """Replace evaluator with always-False; chain doesn't fire.""" + button_evt = event_id_user_macro_button(7) + panel = _panel_with_programs( + _when(1, button_evt), + _and_cond(2), + _then_action(3, int(Command.UNIT_ON), 9), + ) + async with ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) as engine: + engine.set_condition_evaluator(lambda c: False) + fired = await engine.emit_user_macro_button(7) + # Returns 0 — the chain matched the event but failed conditions. + assert fired == 0 + assert engine.metrics.clausal_fired == 0 + + +@pytest.mark.asyncio +async def test_engine_at_chain_fires_at_scheduled_time() -> None: + """AT-headed chain fires at hour:minute on matching days.""" + t0 = datetime(2026, 5, 11, 5, 59, tzinfo=timezone.utc) # Mon 05:59 + panel = _panel_with_programs( + _at(1, hour=6, minute=0, days=int(Days.MONDAY)), + _then_action(2, int(Command.UNIT_ON), 7), + ) + clock = FakeClock(t0) + async with ProgramEngine(panel, clock=clock) as engine: + await asyncio.sleep(0) + await clock.advance_to(t0 + timedelta(minutes=2)) + await asyncio.sleep(0) + assert engine.metrics.clausal_fired == 1 + assert panel.state.units[7].state == 1 + + +@pytest.mark.asyncio +async def test_engine_every_chain_fires_on_interval() -> None: + """EVERY chain fires every N seconds.""" + t0 = datetime(2026, 5, 14, 12, 0, tzinfo=timezone.utc) + panel = _panel_with_programs( + _every(1, interval_sec=60), + _then_action(2, int(Command.UNIT_ON), 7), + ) + clock = FakeClock(t0) + async with ProgramEngine(panel, clock=clock) as engine: + await asyncio.sleep(0) + # Walk three intervals. + for tick in (1, 2, 3): + await clock.advance_to(t0 + timedelta(seconds=60 * tick + 1)) + await asyncio.sleep(0) + assert engine.metrics.clausal_fired == 3