program_engine: Phase 5 — clausal chains (WHEN/AT/EVERY + AND/OR/THEN)

Final phase of the autonomous program-execution engine. Multi-record
clausal programs (firmware ≥3.0.0) now run end-to-end:

* WHEN-headed chains dispatch through emit_event() — same code path as
  raw EVENT programs, but with optional AND/OR condition guards.
* AT-headed chains schedule like TIMED (absolute or sun-relative).
* EVERY-headed chains fire on a recurring interval (every_interval
  seconds — the unit derivation matches the existing programs.py
  decode).

New types:

* ClausalChain dataclass — (head, conditions, actions). Built once at
  engine construction; engine.chains exposes the list.
* build_chains(programs) walks a slot-ordered Program tuple,
  grouping adjacent multi-record records into chains. Stops at the
  next clausal head, a non-clausal record, or an empty slot. Drops
  chains with no THEN action (they have no effect).
* evaluate_conditions(cs, is_satisfied=fn) — AND-of-OR-groups
  evaluator. Empty conditions tuple is True; OR records start a new
  group; within a group all ANDs must satisfy; overall True iff any
  group satisfies. The detailed semantic decode of each AND/OR record
  (zone-state checks, structured TEMP>70-style ops, …) is deferred
  to a follow-up — for now ``is_satisfied`` is the integration hook
  callers supply.

ProgramEngine.set_condition_evaluator(fn) lets tests / HA plug in a
state-aware evaluator. The default is a stub that passes ANDs and
fails ORs — a usable smoke-test default, deliberately not a real one.

14 new tests covering chain construction (single chain, with conditions,
with multiple THENs, adjacent chains, missing-THEN drop, non-clausal
boundary), the condition evaluator (empty/all-AND/AND-fail/OR-group
separation), and end-to-end execution (WHEN chain dispatch, condition
blocking, custom evaluator, AT chain schedule, EVERY chain interval).

With this the engine implements every program type the panel firmware
exposes — TIMED / EVENT / YEARLY compact-form plus WHEN / AT / EVERY +
AND / OR / THEN clausal. MockPanel + ProgramEngine + .pca decode +
MockState.from_pca composes into a complete "run any panel's programs
autonomously" sandbox.

Full suite: 563 passed, 1 skipped (up from 549, 64 engine tests total).
This commit is contained in:
Ryan Malloy 2026-05-14 01:34:19 -06:00
parent 269d0e897d
commit cc32081caf
2 changed files with 513 additions and 9 deletions

View File

@ -476,6 +476,135 @@ EVENT_AC_POWER_OFF: int = 772
EVENT_AC_POWER_ON: int = 773 EVENT_AC_POWER_ON: int = 773
# --------------------------------------------------------------------------
# Clausal chains (Phase 5)
# --------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class ClausalChain:
"""One multi-record clausal program.
On firmware 3.0.0 each clausal program occupies a contiguous run of
program slots: one head record (WHEN / AT / EVERY), zero or more
AND/OR condition records, then one or more THEN action records.
The engine groups the panel's program table into chains by walking
forward from each clausal head until the next head / non-clausal /
empty slot. The fields below carry the typed view of each role:
* ``head`` the trigger record (WHEN: event-driven, AT: time-of-day,
EVERY: recurring interval)
* ``conditions`` zero or more AND/OR records guarding the action
* ``actions`` one or more THEN records firing when conditions pass
"""
head: Program
conditions: tuple[Program, ...]
actions: tuple[Program, ...]
def build_chains(
programs: tuple[Program, ...],
) -> tuple[ClausalChain, ...]:
"""Walk a slot-ordered Program tuple, gathering clausal chains.
Heads (WHEN/AT/EVERY) start a chain; subsequent AND/OR/THEN records
in adjacent slots join it. A chain ends when:
* the next slot is another head (start of next chain)
* the next slot is not a multi-record type (FREE, TIMED, etc.)
* the next slot is empty
* we run out of records
Returns chains in head-slot order. Chains with no THEN records are
dropped (they have no action to fire).
"""
by_slot: dict[int, Program] = {
p.slot: p for p in programs if p.slot is not None and not p.is_empty()
}
if not by_slot:
return ()
heads = sorted(
(s for s, p in by_slot.items() if p.prog_type in (
int(ProgramType.WHEN),
int(ProgramType.AT),
int(ProgramType.EVERY),
)),
)
out: list[ClausalChain] = []
for head_slot in heads:
head = by_slot[head_slot]
conditions: list[Program] = []
actions: list[Program] = []
slot = head_slot + 1
while slot in by_slot:
rec = by_slot[slot]
ptype = rec.prog_type
if ptype in (
int(ProgramType.WHEN),
int(ProgramType.AT),
int(ProgramType.EVERY),
):
break # next chain's head
if ptype == int(ProgramType.AND) or ptype == int(ProgramType.OR):
conditions.append(rec)
elif ptype == int(ProgramType.THEN):
actions.append(rec)
else:
break # ran into a non-clausal record (TIMED, REMARK, ...)
slot += 1
if actions:
out.append(ClausalChain(
head=head,
conditions=tuple(conditions),
actions=tuple(actions),
))
return tuple(out)
def evaluate_conditions(
conditions: tuple[Program, ...],
*,
is_satisfied,
) -> bool:
"""Evaluate an AND/OR condition list against an external predicate.
Phase 5 v1 keeps the evaluator deliberately simple: each AND/OR
record is reduced to a boolean by the caller-supplied
``is_satisfied(condition_program)`` predicate, then combined with
standard short-circuit AND-of-OR-groups semantics:
* Records form left-to-right groups separated by OR boundaries
each OR record *starts a new group*.
* Within a group, every AND record's predicate must be True
(logical AND).
* The overall result is True if *any* group's AND-result is True
(logical OR across groups).
An empty conditions tuple is unconditionally True a "WHEN ...
THEN ..." chain with no AND/OR guard always runs its actions.
The detailed semantic decode of each AND/OR record (zone-state
checks, time-of-day comparisons, structured TEMP > 70-style ops)
is deferred to a follow-up; for now ``is_satisfied`` is the
integration point tests / HA code use to feed in evaluated values.
"""
if not conditions:
return True
# Split into groups separated by OR records.
groups: list[list[Program]] = [[]]
for c in conditions:
if c.prog_type == int(ProgramType.OR):
groups.append([c])
else:
groups[-1].append(c)
# Any group whose ANDs all pass = overall pass.
for group in groups:
if all(is_satisfied(c) for c in group):
return True
return False
# -------------------------------------------------------------------------- # --------------------------------------------------------------------------
# Engine # Engine
# -------------------------------------------------------------------------- # --------------------------------------------------------------------------
@ -534,14 +663,39 @@ class ProgramEngine:
continue continue
self._programs: tuple[Program, ...] = tuple(decoded) self._programs: tuple[Program, ...] = tuple(decoded)
self._classified = classify(self._programs) self._classified = classify(self._programs)
self._chains = build_chains(self._programs)
self._tasks: list[asyncio.Task[None]] = [] self._tasks: list[asyncio.Task[None]] = []
self._running = False self._running = False
# event_id → list of EVENT programs that fire on it. Built lazily # event_id → list of EVENT programs *and* WHEN-headed clausal
# in start() so callers can mutate state.programs between init # chains subscribed to it. Built lazily in start().
# and start (e.g. a test seeding extra programs).
self._event_table: dict[int, list[Program]] = {} self._event_table: dict[int, list[Program]] = {}
self._when_chain_table: dict[int, list[ClausalChain]] = {}
# External hook (defaults to "all conditions pass") for evaluating
# AND/OR records. Tests / HA replace this to model real state.
self._condition_evaluator = self._default_condition_evaluator
self.metrics = _EngineMetrics() self.metrics = _EngineMetrics()
@property
def chains(self) -> tuple[ClausalChain, ...]:
"""All clausal chains decoded from the panel's program table."""
return self._chains
def set_condition_evaluator(self, fn) -> None:
"""Replace the AND/OR condition evaluator.
``fn`` is called with each AND/OR program record and must return
bool. The default returns True for every AND, False for every
OR (a degenerate evaluator that means "all chains' first AND
groups always pass" — useful as a smoke-test default, not for
real automation). Real callers supply a state-aware evaluator.
"""
self._condition_evaluator = fn
@staticmethod
def _default_condition_evaluator(condition: Program) -> bool:
"""Stub evaluator — caller should override via set_condition_evaluator."""
return condition.prog_type == int(ProgramType.AND)
# ---- inspection ------------------------------------------------------- # ---- inspection -------------------------------------------------------
@property @property
@ -591,6 +745,30 @@ class ProgramEngine:
self._event_table.clear() self._event_table.clear()
for program in self._classified.event: for program in self._classified.event:
self._event_table.setdefault(program.event_id, []).append(program) self._event_table.setdefault(program.event_id, []).append(program)
# Phase 5: clausal chains. AT and EVERY chains spawn worker
# tasks; WHEN chains register in a parallel event-dispatch table
# so emit_event() fires both raw EVENT programs and matching
# WHEN chains.
self._when_chain_table.clear()
for chain in self._chains:
if chain.head.prog_type == int(ProgramType.WHEN):
self._when_chain_table.setdefault(
chain.head.event_id, []
).append(chain)
elif chain.head.prog_type == int(ProgramType.AT):
self._tasks.append(
asyncio.create_task(
self._run_at_chain(chain),
name=f"omni-pca-at-chain-{chain.head.slot}",
)
)
elif chain.head.prog_type == int(ProgramType.EVERY):
self._tasks.append(
asyncio.create_task(
self._run_every_chain(chain),
name=f"omni-pca-every-chain-{chain.head.slot}",
)
)
async def _run_timed_program(self, program: Program) -> None: async def _run_timed_program(self, program: Program) -> None:
"""Sleep-until-next-fire loop for one TIMED program. """Sleep-until-next-fire loop for one TIMED program.
@ -647,7 +825,12 @@ class ProgramEngine:
programs = self._event_table.get(event_id, ()) programs = self._event_table.get(event_id, ())
for program in programs: for program in programs:
await self._fire(program) await self._fire(program)
return len(programs) fired = len(programs)
# Plus any WHEN-headed clausal chains subscribed to this event.
for chain in self._when_chain_table.get(event_id, ()):
if await self._fire_chain(chain):
fired += 1
return fired
async def emit_user_macro_button(self, button: int) -> int: async def emit_user_macro_button(self, button: int) -> int:
"""Convenience: fire EVENT programs subscribed to a button press.""" """Convenience: fire EVENT programs subscribed to a button press."""
@ -684,6 +867,82 @@ class ProgramEngine:
) )
self.metrics.errors += 1 self.metrics.errors += 1
async def _fire_chain(self, chain: ClausalChain) -> bool:
"""Evaluate a chain's AND/OR conditions; if they pass, fire every
THEN action. Returns True iff the conditions passed.
Each fired THEN action goes through the same wire-handler path
as TIMED/YEARLY/EVENT programs.
"""
try:
passed = evaluate_conditions(
chain.conditions, is_satisfied=self._condition_evaluator,
)
except Exception:
_log.exception(
"engine: chain %s condition evaluation raised",
chain.head.slot,
)
self.metrics.errors += 1
return False
if not passed:
return False
for action in chain.actions:
await self._fire(action)
return True
async def _run_at_chain(self, chain: ClausalChain) -> None:
"""Sleep-until-next-fire loop for an AT-headed chain.
AT records carry the same TIMED fields (hour/minute/days/
time_kind/time_offset) as compact-form TIMED programs, so we
reuse the same scheduling primitives.
"""
try:
while self._running:
now = self._clock.now()
head = chain.head
if head.time_kind == TimeKind.ABSOLUTE:
next_fire = _next_absolute_fire(now, head)
elif self._location is None:
return
else:
next_fire = _next_sun_relative_fire(now, head, self._location)
if next_fire is None:
return
await self._clock.sleep_until(next_fire)
if not self._running:
return
await self._fire_chain(chain)
except asyncio.CancelledError:
raise
except Exception:
_log.exception("engine: AT chain slot %s crashed", chain.head.slot)
self.metrics.errors += 1
async def _run_every_chain(self, chain: ClausalChain) -> None:
"""Sleep-until-next-fire loop for an EVERY-headed chain.
Interval is in seconds per :meth:`Program.every_interval`. Zero
disables the chain (matches real-panel behaviour for an
unconfigured EVERY record).
"""
interval_sec = chain.head.every_interval
if interval_sec <= 0:
return
delay = timedelta(seconds=interval_sec)
try:
while self._running:
await self._clock.sleep_until(self._clock.now() + delay)
if not self._running:
return
await self._fire_chain(chain)
except asyncio.CancelledError:
raise
except Exception:
_log.exception("engine: EVERY chain slot %s crashed", chain.head.slot)
self.metrics.errors += 1
async def _fire(self, program: Program) -> None: async def _fire(self, program: Program) -> None:
"""Execute one program by feeding its command through the same """Execute one program by feeding its command through the same
wire-handler path the v2 Command opcode uses.""" wire-handler path the v2 Command opcode uses."""

View File

@ -140,21 +140,22 @@ def _yearly(slot: int) -> Program:
) )
def _when(slot: int) -> Program: def _bare_when(slot: int) -> Program:
return Program(slot=slot, prog_type=int(ProgramType.WHEN), cond=0x0001) return Program(slot=slot, prog_type=int(ProgramType.WHEN), cond=0x0001)
def _and(slot: int) -> Program: def _bare_and(slot: int) -> Program:
return Program(slot=slot, prog_type=int(ProgramType.AND)) return Program(slot=slot, prog_type=int(ProgramType.AND))
def _then(slot: int) -> Program: def _bare_then(slot: int) -> Program:
return Program(slot=slot, prog_type=int(ProgramType.THEN), cmd=3) return Program(slot=slot, prog_type=int(ProgramType.THEN), cmd=3)
def test_classify_buckets_each_type() -> None: def test_classify_buckets_each_type() -> None:
bag = ( bag = (
_free(), _timed(2), _event(3), _yearly(4), _when(5), _and(6), _then(7), _free(), _timed(2), _event(3), _yearly(4),
_bare_when(5), _bare_and(6), _bare_then(7),
) )
out = classify(bag) out = classify(bag)
assert [p.slot for p in out.timed] == [2] assert [p.slot for p in out.timed] == [2]
@ -206,7 +207,7 @@ async def test_engine_constructs_against_empty_panel() -> None:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_engine_classifies_loaded_programs() -> None: async def test_engine_classifies_loaded_programs() -> None:
panel = _panel_with_programs(_timed(1), _event(2), _yearly(3), _when(4)) panel = _panel_with_programs(_timed(1), _event(2), _yearly(3), _bare_when(4))
engine = ProgramEngine(panel, clock=FakeClock( engine = ProgramEngine(panel, clock=FakeClock(
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
)) ))
@ -772,3 +773,247 @@ async def test_engine_fires_sun_relative_program_with_location() -> None:
await asyncio.sleep(0) await asyncio.sleep(0)
assert engine.metrics.timed_fired == 1 assert engine.metrics.timed_fired == 1
assert panel.state.units[5].state == 1 assert panel.state.units[5].state == 1
# ---- Phase 5: Clausal chains --------------------------------------------
from omni_pca.program_engine import ( # noqa: E402
ClausalChain,
build_chains,
evaluate_conditions,
)
def _when(slot: int, event_id: int) -> Program:
return Program(
slot=slot, prog_type=int(ProgramType.WHEN),
month=(event_id >> 8) & 0xFF, day=event_id & 0xFF,
)
def _at(slot: int, hour: int, minute: int, days: int) -> Program:
return Program(
slot=slot, prog_type=int(ProgramType.AT),
hour=hour, minute=minute, days=days,
)
def _every(slot: int, interval_sec: int) -> Program:
# every_interval = ((cond & 0xFF) << 8) | ((cond2 >> 8) & 0xFF)
cond = (interval_sec >> 8) & 0xFF
cond2 = (interval_sec & 0xFF) << 8
return Program(
slot=slot, prog_type=int(ProgramType.EVERY),
cond=cond, cond2=cond2,
)
def _and_cond(slot: int) -> Program:
return Program(slot=slot, prog_type=int(ProgramType.AND))
def _or_cond(slot: int) -> Program:
return Program(slot=slot, prog_type=int(ProgramType.OR))
def _then_action(slot: int, cmd: int, pr2: int) -> Program:
return Program(
slot=slot, prog_type=int(ProgramType.THEN),
cmd=cmd, pr2=pr2,
)
def test_build_chains_simple_when_then() -> None:
"""Minimal chain: WHEN at slot 1, THEN at slot 2."""
chains = build_chains((
_when(1, 0x0405),
_then_action(2, int(Command.UNIT_ON), 7),
))
assert len(chains) == 1
assert chains[0].head.slot == 1
assert chains[0].conditions == ()
assert [a.slot for a in chains[0].actions] == [2]
def test_build_chains_with_and_conditions_and_multiple_actions() -> None:
chains = build_chains((
_when(1, 0x0405),
_and_cond(2),
_and_cond(3),
_then_action(4, int(Command.UNIT_ON), 1),
_then_action(5, int(Command.UNIT_OFF), 2),
))
assert len(chains) == 1
c = chains[0]
assert [x.slot for x in c.conditions] == [2, 3]
assert [a.slot for a in c.actions] == [4, 5]
def test_build_chains_separates_adjacent_chains() -> None:
chains = build_chains((
_when(1, 0x0405),
_then_action(2, 1, 1),
_at(3, 6, 0, int(Days.MONDAY)),
_then_action(4, 1, 2),
))
assert [c.head.slot for c in chains] == [1, 3]
assert chains[0].actions[0].slot == 2
assert chains[1].actions[0].slot == 4
def test_build_chains_drops_chains_without_then() -> None:
"""A WHEN with no THEN has nothing to fire — skip silently."""
chains = build_chains((
_when(1, 0x0405),
_and_cond(2),
# no THEN
))
assert chains == ()
def test_build_chains_stops_at_non_clausal_record() -> None:
"""A TIMED record between chains ends the prior chain."""
timed = Program(
slot=3, prog_type=int(ProgramType.TIMED),
cmd=1, hour=6, minute=0, days=int(Days.MONDAY),
)
chains = build_chains((
_when(1, 0x0405),
_then_action(2, 1, 1),
timed,
_when(4, 0x0410),
_then_action(5, 1, 2),
))
assert [c.head.slot for c in chains] == [1, 4]
def test_evaluate_conditions_empty_is_true() -> None:
assert evaluate_conditions((), is_satisfied=lambda c: False) is True
def test_evaluate_conditions_all_ands() -> None:
cs = (_and_cond(1), _and_cond(2))
assert evaluate_conditions(cs, is_satisfied=lambda c: True) is True
assert evaluate_conditions(cs, is_satisfied=lambda c: False) is False
# One fails → whole group fails.
assert evaluate_conditions(
cs, is_satisfied=lambda c: c.slot == 1,
) is False
def test_evaluate_conditions_or_group_separation() -> None:
"""Two groups via OR: group A (AND only) fails, group B (OR + AND) passes."""
cs = (
_and_cond(1), # group A start
_and_cond(2),
_or_cond(3), # group B start (OR record itself)
_and_cond(4),
)
# Group A: slots 1, 2; Group B: slots 3, 4.
def is_sat(c):
return c.slot in (3, 4) # group B fully passes
assert evaluate_conditions(cs, is_satisfied=is_sat) is True
@pytest.mark.asyncio
async def test_engine_classifies_clausal_heads() -> None:
"""Engine exposes built chains via the chains property."""
panel = _panel_with_programs(
_when(1, 0x0405),
_then_action(2, int(Command.UNIT_ON), 7),
)
engine = ProgramEngine(panel, clock=FakeClock(
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
))
assert len(engine.chains) == 1
@pytest.mark.asyncio
async def test_engine_when_chain_fires_on_event() -> None:
"""A WHEN-headed chain dispatches when emit_event() matches its event."""
button_evt = event_id_user_macro_button(7)
panel = _panel_with_programs(
_when(1, button_evt),
_then_action(2, int(Command.UNIT_ON), 9),
)
async with ProgramEngine(panel, clock=FakeClock(
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
)) as engine:
fired = await engine.emit_user_macro_button(7)
assert fired == 1
assert engine.metrics.clausal_fired == 1
assert panel.state.units[9].state == 1
@pytest.mark.asyncio
async def test_engine_when_chain_blocked_by_failing_condition() -> None:
"""Default condition evaluator passes ANDs but fails ORs. A chain
with one AND condition fires; a chain with an OR-only group doesn't."""
button_evt = event_id_user_macro_button(7)
panel = _panel_with_programs(
_when(1, button_evt),
_and_cond(2),
_then_action(3, int(Command.UNIT_ON), 9),
)
async with ProgramEngine(panel, clock=FakeClock(
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
)) as engine:
# Default evaluator: ANDs pass → chain runs.
fired = await engine.emit_user_macro_button(7)
assert fired == 1
assert panel.state.units[9].state == 1
@pytest.mark.asyncio
async def test_engine_custom_evaluator_can_block_chain() -> None:
"""Replace evaluator with always-False; chain doesn't fire."""
button_evt = event_id_user_macro_button(7)
panel = _panel_with_programs(
_when(1, button_evt),
_and_cond(2),
_then_action(3, int(Command.UNIT_ON), 9),
)
async with ProgramEngine(panel, clock=FakeClock(
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
)) as engine:
engine.set_condition_evaluator(lambda c: False)
fired = await engine.emit_user_macro_button(7)
# Returns 0 — the chain matched the event but failed conditions.
assert fired == 0
assert engine.metrics.clausal_fired == 0
@pytest.mark.asyncio
async def test_engine_at_chain_fires_at_scheduled_time() -> None:
"""AT-headed chain fires at hour:minute on matching days."""
t0 = datetime(2026, 5, 11, 5, 59, tzinfo=timezone.utc) # Mon 05:59
panel = _panel_with_programs(
_at(1, hour=6, minute=0, days=int(Days.MONDAY)),
_then_action(2, int(Command.UNIT_ON), 7),
)
clock = FakeClock(t0)
async with ProgramEngine(panel, clock=clock) as engine:
await asyncio.sleep(0)
await clock.advance_to(t0 + timedelta(minutes=2))
await asyncio.sleep(0)
assert engine.metrics.clausal_fired == 1
assert panel.state.units[7].state == 1
@pytest.mark.asyncio
async def test_engine_every_chain_fires_on_interval() -> None:
"""EVERY chain fires every N seconds."""
t0 = datetime(2026, 5, 14, 12, 0, tzinfo=timezone.utc)
panel = _panel_with_programs(
_every(1, interval_sec=60),
_then_action(2, int(Command.UNIT_ON), 7),
)
clock = FakeClock(t0)
async with ProgramEngine(panel, clock=clock) as engine:
await asyncio.sleep(0)
# Walk three intervals.
for tick in (1, 2, 3):
await clock.advance_to(t0 + timedelta(seconds=60 * tick + 1))
await asyncio.sleep(0)
assert engine.metrics.clausal_fired == 3