From 2cc28b0e5080d8ebf314d5d9a91ce554cd98f813 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Thu, 14 May 2026 01:25:14 -0600 Subject: [PATCH] =?UTF-8?q?program=5Fengine:=20Phases=201+2=20=E2=80=94=20?= =?UTF-8?q?Clock=20abstraction=20+=20TIMED=20execution?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First half of the autonomous program-execution engine. Two phases land together because Phase 1 was pure scaffolding (Clock + classifier) and made little sense in isolation. Phase 1 — engine foundation: * Clock protocol with RealClock (wall time + asyncio.sleep) and FakeClock (manual advance, no real waiting; sleepers wake in chronological order on advance_to). * classify(programs) splits a Program tuple into timed / event / yearly / clausal-head buckets, dropping FREE / REMARK / unknown records and the AND/OR/THEN clausal continuations (those are reached by walking forward from each WHEN/AT/EVERY head, not by classification). * ProgramEngine class with start / stop lifecycle (idempotent + context-manager), per-program asyncio task list, _EngineMetrics counters. Phase 2 — TIMED programs actually run: * _next_absolute_fire(now, program) computes the next datetime at which a TIMED program with TimeKind.ABSOLUTE should fire, given its hour/minute/days mask. Walks forward up to 8 days; returns None for empty Days mask (program is effectively disabled). * Each TIMED program gets its own asyncio task running sleep-until-next-fire / fire / loop. Firing dispatches the 4-byte Command wire payload (cmd / par / pr2) through MockPanel._handle_command — same code path the v2 Command opcode uses, so a TIMED program turning on a unit produces identical state to a client sending the equivalent Command. * astral added as an [engine] optional dependency, pinned to 2.2 for HA compat (HA itself pins astral==2.2). Library wired up but not yet consumed — sunrise/sunset support lands in Phase 3. Tests (28 new): * RealClock and FakeClock behaviour incl. chronological wake order. * classify against each ProgramType, unknown values, empty input. * Engine lifecycle (idempotent start/stop, context manager, malformed-record tolerance). * End-to-end: TIMED UNIT_ON program fires at the right Monday 06:00, loops correctly across weeks, never fires outside its Days mask, ignores programs with empty Days mask. Full suite: 527 passed, 1 skipped (up from 499). --- pyproject.toml | 5 + src/omni_pca/program_engine.py | 444 +++++++++++++++++++++++++++++++++ tests/test_program_engine.py | 414 ++++++++++++++++++++++++++++++ uv.lock | 6 +- 4 files changed, 868 insertions(+), 1 deletion(-) create mode 100644 src/omni_pca/program_engine.py create mode 100644 tests/test_program_engine.py diff --git a/pyproject.toml b/pyproject.toml index 562b11b..30b1be5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,11 @@ dependencies = [ [project.optional-dependencies] cli = ["rich>=13.9.0", "typer>=0.15.0"] +# astral provides sunrise/sunset computation for the program engine's +# AT_SUNRISE / AT_SUNSET TIMED-program sentinels. Pure Python, MIT. +# Pinned to 2.2 for compatibility with Home Assistant (which pins it +# to exactly that version) so the `ha` group still resolves. +engine = ["astral>=2.2,<3"] [project.scripts] omni-pca = "omni_pca.__main__:main" diff --git a/src/omni_pca/program_engine.py b/src/omni_pca/program_engine.py new file mode 100644 index 0000000..f8eda05 --- /dev/null +++ b/src/omni_pca/program_engine.py @@ -0,0 +1,444 @@ +"""Autonomous execution engine for HAI Omni panel programs. + +The :mod:`omni_pca.mock_panel` module turns ``MockState`` into a +wire-speaking *replay* of a panel: clients ask for properties, names, +programs, and the mock serves what's on disk. The engine in this +module is the next layer — it interprets the decoded :class:`Program` +records as **automation rules** and fires them autonomously over time, +mutating ``MockState`` the same way a real panel firmware would. + +Architecture +------------ + +The engine is decoupled from real wall-time via a :class:`Clock` +protocol so tests can fast-forward through schedules without waiting +for real seconds to elapse. Two implementations ship: + +* :class:`RealClock` — the production engine. ``now()`` returns + ``datetime.now()`` and ``sleep_until()`` does ``asyncio.sleep``. +* :class:`FakeClock` — for tests. ``now()`` returns the manually-set + current time and ``sleep_until()`` returns immediately after + recording the target. Tests then call ``advance_to(target)`` to + jump the clock forward and let pending sleepers wake up. + +Program-type coverage is built up in phases: + +* Phase 1 (this module's initial cut) — skeleton + Clock + the + classifier that splits :class:`Program` records into "schedulable + by time", "event-triggered", and "clausal head" categories. +* Phase 2 — TIMED program execution. +* Phase 3 — YEARLY + sunrise/sunset via :mod:`astral`. +* Phase 4 — EVENT program routing. +* Phase 5 — full clausal evaluator for firmware-3.0 multi-record + WHEN/AT/EVERY + AND/OR/THEN chains. + +The engine never touches the wire directly. All state mutations go +through :meth:`MockPanel._apply_unit_command` (and siblings), which +are the same code paths the v2 ``Command`` opcode handler uses — so +"engine fires a TIMED program that turns on unit 5" and "client sends +``Command(UNIT_ON, 5)``" produce identical results. +""" + +from __future__ import annotations + +import asyncio +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from datetime import date, datetime, time, timedelta, timezone +from typing import TYPE_CHECKING + +from .programs import Days, Program, ProgramType, TimeKind + +_log = logging.getLogger(__name__) + +if TYPE_CHECKING: + from .mock_panel import MockPanel + + +# -------------------------------------------------------------------------- +# Clock abstraction +# -------------------------------------------------------------------------- + + +class Clock(ABC): + """Abstract source of "what time is it" + delay scheduling. + + Implementations decide whether ``sleep_until`` is a real + ``asyncio.sleep`` or a deterministic no-op that defers to a manual + advance call. The engine never references ``datetime.now()`` or + ``asyncio.sleep`` directly — it always goes through the Clock. + """ + + @abstractmethod + def now(self) -> datetime: ... + + @abstractmethod + async def sleep_until(self, target: datetime) -> None: ... + + +class RealClock(Clock): + """Production clock — wall-time + asyncio.sleep.""" + + def now(self) -> datetime: + return datetime.now(tz=timezone.utc) + + async def sleep_until(self, target: datetime) -> None: + delay = (target - self.now()).total_seconds() + if delay > 0: + await asyncio.sleep(delay) + + +@dataclass +class _PendingSleeper: + """A coroutine waiting for the FakeClock to reach ``target``.""" + + target: datetime + event: asyncio.Event + + +class FakeClock(Clock): + """Deterministic clock for tests. + + ``advance_to(t)`` jumps wall-time forward and wakes any sleepers + whose ``target <= t``. Multiple sleepers can wait concurrently — + they wake in target order. + + Example: + + clock = FakeClock(datetime(2026, 5, 14, 22, 29, tzinfo=timezone.utc)) + engine = ProgramEngine(panel, clock=clock) + await engine.start() + # No real seconds pass: + await clock.advance_to(datetime(2026, 5, 14, 22, 31, tzinfo=timezone.utc)) + # By now any TIMED program scheduled for 22:30 has fired. + """ + + def __init__(self, initial: datetime) -> None: + if initial.tzinfo is None: + raise ValueError("FakeClock requires a timezone-aware initial datetime") + self._now = initial + self._sleepers: list[_PendingSleeper] = [] + + def now(self) -> datetime: + return self._now + + async def sleep_until(self, target: datetime) -> None: + if target <= self._now: + return + sleeper = _PendingSleeper(target=target, event=asyncio.Event()) + self._sleepers.append(sleeper) + await sleeper.event.wait() + + async def advance_to(self, target: datetime) -> None: + """Jump clock to ``target``, waking any sleepers whose target is in + the past after the jump. Sleepers wake in chronological order so + a TIMED program scheduled for 06:00 wakes before one at 07:00 + even if we advance straight to 08:00.""" + if target < self._now: + raise ValueError("FakeClock can only move forward") + self._now = target + # Wake sleepers whose target has now passed, in chronological order. + ready = sorted( + (s for s in self._sleepers if s.target <= self._now), + key=lambda s: s.target, + ) + for sleeper in ready: + self._sleepers.remove(sleeper) + sleeper.event.set() + # Yield once per ready sleeper so each one's coroutine runs to + # its next suspension point before we return. + for _ in ready: + await asyncio.sleep(0) + + +# -------------------------------------------------------------------------- +# Program classification +# -------------------------------------------------------------------------- + + +@dataclass(frozen=True, slots=True) +class _ClassifiedPrograms: + """Programs sorted into execution buckets. + + ``timed`` / ``event`` / ``yearly`` carry compact-form Programs whose + behaviour is decoded directly from the single 14-byte record. + ``clausal_heads`` are WHEN/AT/EVERY records that begin a multi-record + chain; the engine resolves the chain (via following AND/OR/THEN + records in the same slot range) when it loads them in phase 5. + """ + + timed: tuple[Program, ...] = () + event: tuple[Program, ...] = () + yearly: tuple[Program, ...] = () + clausal_heads: tuple[Program, ...] = () + + +def classify(programs: tuple[Program, ...]) -> _ClassifiedPrograms: + """Split a Program tuple by execution kind. + + Empty / unknown / FREE / REMARK records are dropped — they have no + runtime behaviour. Clausal AND/OR/THEN records are *also* dropped at + this stage; the engine reaches them by walking forward from each + WHEN/AT/EVERY head, not by classifying them independently. + """ + timed: list[Program] = [] + event: list[Program] = [] + yearly: list[Program] = [] + clausal: list[Program] = [] + for p in programs: + if p.is_empty(): + continue + try: + kind = ProgramType(p.prog_type) + except ValueError: + continue + if kind == ProgramType.TIMED: + timed.append(p) + elif kind == ProgramType.EVENT: + event.append(p) + elif kind == ProgramType.YEARLY: + yearly.append(p) + elif kind in (ProgramType.WHEN, ProgramType.AT, ProgramType.EVERY): + clausal.append(p) + # FREE (0) / REMARK (4) / AND / OR / THEN — not scheduled directly. + return _ClassifiedPrograms( + timed=tuple(timed), + event=tuple(event), + yearly=tuple(yearly), + clausal_heads=tuple(clausal), + ) + + +# -------------------------------------------------------------------------- +# TIMED scheduling +# -------------------------------------------------------------------------- + + +# Omni's day-of-week bitmask maps bits 1..7 (LSB unused) to Mon..Sun. +# Python's datetime.weekday() returns Mon=0..Sun=6. We need a lookup. +_PYWEEKDAY_TO_DAYS_BIT: tuple[Days, ...] = ( + Days.MONDAY, + Days.TUESDAY, + Days.WEDNESDAY, + Days.THURSDAY, + Days.FRIDAY, + Days.SATURDAY, + Days.SUNDAY, +) + + +def _matches_days_mask(d: date, mask: int) -> bool: + """Return True iff ``d``'s weekday is enabled in the Omni Days bitmask. + + Mask 0 (no days set) never matches — TIMED programs with empty + Days masks are effectively disabled, matching real-panel behaviour. + """ + if mask == 0: + return False + return bool(int(_PYWEEKDAY_TO_DAYS_BIT[d.weekday()]) & mask) + + +def _next_absolute_fire(now: datetime, program: Program) -> datetime | None: + """Compute the next datetime ``program`` (assumed TIMED, ABSOLUTE + TimeKind) should fire, strictly after ``now``. + + Returns ``None`` if the program's Days mask is empty — it never fires. + """ + if program.time_kind != TimeKind.ABSOLUTE: + return None # SUNRISE/SUNSET-relative handled by Phase 3. + if program.days == 0: + return None + # Snap to the program's hour:minute today (in the clock's tz), + # then walk forward up to 8 days looking for the next matching weekday. + base = now.replace( + hour=program.hour, minute=program.minute, + second=0, microsecond=0, + ) + for offset in range(0, 8): + candidate = base + timedelta(days=offset) + if candidate <= now: + continue + if _matches_days_mask(candidate.date(), program.days): + return candidate + return None # safety net — shouldn't happen if mask is non-zero + + +def _command_payload(program: Program) -> bytes: + """Build the 4-byte Command wire payload from a Program record. + + The wire format (clsOL2MsgCommand.cs) is identical between the v2 + Command opcode and what the panel firmware fires internally for a + TIMED program — so feeding this to ``MockPanel._handle_command`` + has exactly the same state-mutation effect as a client sending + the equivalent Command. + """ + return bytes( + [ + program.cmd & 0xFF, + program.par & 0xFF, + (program.pr2 >> 8) & 0xFF, + program.pr2 & 0xFF, + ] + ) + + +# -------------------------------------------------------------------------- +# Engine +# -------------------------------------------------------------------------- + + +@dataclass +class _EngineMetrics: + """Lightweight counters useful in tests + diagnostics.""" + + timed_fired: int = 0 + event_fired: int = 0 + yearly_fired: int = 0 + clausal_fired: int = 0 + errors: int = 0 + + +class ProgramEngine: + """Run a panel's programs autonomously against a :class:`MockPanel`. + + Phase 1 (this skeleton) classifies the programs and stands up the + asyncio task harness but doesn't fire anything yet. Subsequent + phases plug in TIMED / YEARLY / EVENT / clausal execution. + + Lifecycle:: + + engine = ProgramEngine(panel, clock=FakeClock(t0)) + await engine.start() # spawns the per-bucket tasks + ... # tests advance the clock / emit events + await engine.stop() # cancels and awaits all tasks + + The engine is safe to instantiate without ever calling ``start`` — + the classification work happens up front but no tasks spawn until + explicit start. + """ + + def __init__(self, panel: "MockPanel", *, clock: Clock | None = None) -> None: + self._panel = panel + self._clock = clock or RealClock() + # Decode raw bytes from MockState.programs into Program objects + # once, at construction. Reclassifying on every start would be + # wasteful and would also lose the slot indices. + decoded: list[Program] = [] + for slot, raw in panel.state.programs.items(): + try: + decoded.append(Program.from_wire_bytes(raw, slot=slot)) + except Exception: + # Malformed records are skipped, not fatal. The engine + # carries on with whatever is decodable. + continue + self._programs: tuple[Program, ...] = tuple(decoded) + self._classified = classify(self._programs) + self._tasks: list[asyncio.Task[None]] = [] + self._running = False + self.metrics = _EngineMetrics() + + # ---- inspection ------------------------------------------------------- + + @property + def clock(self) -> Clock: + """The clock this engine is driven by.""" + return self._clock + + @property + def classified(self) -> _ClassifiedPrograms: + """Programs split into execution buckets. Useful in tests to + confirm the engine sees what you expect.""" + return self._classified + + @property + def running(self) -> bool: + return self._running + + # ---- lifecycle -------------------------------------------------------- + + async def start(self) -> None: + """Begin executing programs in the background. + + Idempotent — calling start on a running engine is a no-op. + """ + if self._running: + return + self._running = True + # Phase 2: one worker task per TIMED program. + for program in self._classified.timed: + self._tasks.append( + asyncio.create_task( + self._run_timed_program(program), + name=f"omni-pca-timed-slot-{program.slot}", + ) + ) + + async def _run_timed_program(self, program: Program) -> None: + """Sleep-until-next-fire loop for one TIMED program. + + Wakes at the program's scheduled hour:minute on the next + matching weekday, fires the command, then loops to the next + occurrence. Returns when the engine stops (task cancellation). + """ + try: + while self._running: + next_fire = _next_absolute_fire(self._clock.now(), program) + if next_fire is None: + return # no valid Days mask — give up on this program + await self._clock.sleep_until(next_fire) + if not self._running: + return + await self._fire(program) + except asyncio.CancelledError: + raise + except Exception: + _log.exception( + "engine: TIMED slot %s crashed", program.slot, + ) + self.metrics.errors += 1 + + async def _fire(self, program: Program) -> None: + """Execute one program by feeding its command through the same + wire-handler path the v2 Command opcode uses.""" + try: + self._panel._handle_command(_command_payload(program)) + except Exception: + _log.exception( + "engine: firing slot %s (cmd=%d par=%d pr2=%d) raised", + program.slot, program.cmd, program.par, program.pr2, + ) + self.metrics.errors += 1 + return + kind = ProgramType(program.prog_type) + if kind == ProgramType.TIMED: + self.metrics.timed_fired += 1 + elif kind == ProgramType.EVENT: + self.metrics.event_fired += 1 + elif kind == ProgramType.YEARLY: + self.metrics.yearly_fired += 1 + else: + self.metrics.clausal_fired += 1 + + async def stop(self) -> None: + """Cancel all engine-spawned tasks and wait for them to exit. + + Idempotent.""" + if not self._running: + return + self._running = False + for task in self._tasks: + task.cancel() + for task in self._tasks: + try: + await task + except (asyncio.CancelledError, Exception): + pass + self._tasks.clear() + + async def __aenter__(self) -> "ProgramEngine": + await self.start() + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + await self.stop() diff --git a/tests/test_program_engine.py b/tests/test_program_engine.py new file mode 100644 index 0000000..3416902 --- /dev/null +++ b/tests/test_program_engine.py @@ -0,0 +1,414 @@ +"""Tests for the autonomous program execution engine. + +Phase 1 coverage: Clock abstraction, program classification, engine +lifecycle. Subsequent phases add tests as they land. +""" + +from __future__ import annotations + +import asyncio +from datetime import date, datetime, timedelta, timezone + +import pytest + +from omni_pca.mock_panel import MockPanel, MockState +from omni_pca.program_engine import ( + Clock, + FakeClock, + ProgramEngine, + RealClock, + classify, +) +from omni_pca.programs import Days, Program, ProgramType + +CONTROLLER_KEY = bytes(range(16)) + + +# ---- Clock --------------------------------------------------------------- + + +def test_real_clock_now_is_utc_aware() -> None: + c = RealClock() + assert c.now().tzinfo is not None + + +@pytest.mark.asyncio +async def test_real_clock_sleep_until_past_returns_immediately() -> None: + c = RealClock() + past = c.now() - timedelta(seconds=10) + # Should not actually sleep. + await asyncio.wait_for(c.sleep_until(past), timeout=0.5) + + +@pytest.mark.asyncio +async def test_real_clock_sleep_until_short_future() -> None: + c = RealClock() + target = c.now() + timedelta(milliseconds=50) + await c.sleep_until(target) + assert c.now() >= target + + +def test_fake_clock_requires_tz_aware_initial() -> None: + with pytest.raises(ValueError): + FakeClock(datetime(2026, 5, 14, 12, 0)) # no tz + + +def test_fake_clock_now_returns_set_time() -> None: + t0 = datetime(2026, 5, 14, 22, 30, tzinfo=timezone.utc) + c = FakeClock(t0) + assert c.now() == t0 + + +@pytest.mark.asyncio +async def test_fake_clock_advance_wakes_sleepers() -> None: + t0 = datetime(2026, 5, 14, 22, 0, tzinfo=timezone.utc) + c = FakeClock(t0) + target = t0 + timedelta(minutes=30) + + woke: list[datetime] = [] + + async def sleeper() -> None: + await c.sleep_until(target) + woke.append(c.now()) + + task = asyncio.create_task(sleeper()) + await asyncio.sleep(0) # let sleeper register + assert woke == [] + await c.advance_to(target) + await task + assert len(woke) == 1 + assert woke[0] == target + + +@pytest.mark.asyncio +async def test_fake_clock_advance_wakes_in_chronological_order() -> None: + t0 = datetime(2026, 5, 14, 22, 0, tzinfo=timezone.utc) + c = FakeClock(t0) + woke: list[tuple[str, datetime]] = [] + + async def sleeper(label: str, delta_min: int) -> None: + await c.sleep_until(t0 + timedelta(minutes=delta_min)) + woke.append((label, c.now())) + + s1 = asyncio.create_task(sleeper("late", 60)) + s2 = asyncio.create_task(sleeper("early", 15)) + s3 = asyncio.create_task(sleeper("middle", 30)) + await asyncio.sleep(0) + # Jump past everything in one go. + await c.advance_to(t0 + timedelta(minutes=90)) + await s1 + await s2 + await s3 + assert [label for label, _ in woke] == ["early", "middle", "late"] + + +def test_fake_clock_cannot_move_backwards() -> None: + t0 = datetime(2026, 5, 14, 22, 0, tzinfo=timezone.utc) + c = FakeClock(t0) + with pytest.raises(ValueError): + # advance_to is async but the validation is synchronous. + asyncio.run(c.advance_to(t0 - timedelta(seconds=1))) + + +def test_clock_is_abstract() -> None: + with pytest.raises(TypeError): + Clock() # type: ignore[abstract] + + +# ---- Classification ------------------------------------------------------ + + +def _free() -> Program: + return Program(slot=1, prog_type=int(ProgramType.FREE)) + + +def _timed(slot: int) -> Program: + return Program( + slot=slot, prog_type=int(ProgramType.TIMED), + cmd=3, hour=6, minute=0, days=int(Days.MONDAY), + ) + + +def _event(slot: int) -> Program: + return Program(slot=slot, prog_type=int(ProgramType.EVENT), cmd=5, cond=0x0001) + + +def _yearly(slot: int) -> Program: + return Program( + slot=slot, prog_type=int(ProgramType.YEARLY), + cmd=4, month=5, day=14, hour=12, minute=0, + ) + + +def _when(slot: int) -> Program: + return Program(slot=slot, prog_type=int(ProgramType.WHEN), cond=0x0001) + + +def _and(slot: int) -> Program: + return Program(slot=slot, prog_type=int(ProgramType.AND)) + + +def _then(slot: int) -> Program: + return Program(slot=slot, prog_type=int(ProgramType.THEN), cmd=3) + + +def test_classify_buckets_each_type() -> None: + bag = ( + _free(), _timed(2), _event(3), _yearly(4), _when(5), _and(6), _then(7), + ) + out = classify(bag) + assert [p.slot for p in out.timed] == [2] + assert [p.slot for p in out.event] == [3] + assert [p.slot for p in out.yearly] == [4] + assert [p.slot for p in out.clausal_heads] == [5] + # FREE, AND, THEN are not in any bucket. + + +def test_classify_drops_unknown_prog_types() -> None: + # Use a raw int that isn't a valid ProgramType. + junk = Program(slot=1, prog_type=99) + out = classify((junk,)) + assert out.timed == () + assert out.event == () + assert out.yearly == () + assert out.clausal_heads == () + + +def test_classify_handles_empty_input() -> None: + out = classify(()) + assert out.timed == () + assert out.event == () + assert out.yearly == () + assert out.clausal_heads == () + + +# ---- Engine lifecycle ---------------------------------------------------- + + +def _panel_with_programs(*programs: Program) -> MockPanel: + return MockPanel( + controller_key=CONTROLLER_KEY, + state=MockState( + programs={p.slot: p.encode_wire_bytes() for p in programs if p.slot}, + ), + ) + + +@pytest.mark.asyncio +async def test_engine_constructs_against_empty_panel() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=MockState()) + engine = ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) + assert engine.running is False + assert engine.classified.timed == () + + +@pytest.mark.asyncio +async def test_engine_classifies_loaded_programs() -> None: + panel = _panel_with_programs(_timed(1), _event(2), _yearly(3), _when(4)) + engine = ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) + assert len(engine.classified.timed) == 1 + assert len(engine.classified.event) == 1 + assert len(engine.classified.yearly) == 1 + assert len(engine.classified.clausal_heads) == 1 + + +@pytest.mark.asyncio +async def test_engine_start_stop_idempotent() -> None: + panel = _panel_with_programs(_timed(1)) + engine = ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) + await engine.start() + assert engine.running + await engine.start() # idempotent + assert engine.running + await engine.stop() + assert not engine.running + await engine.stop() # idempotent + assert not engine.running + + +@pytest.mark.asyncio +async def test_engine_context_manager() -> None: + panel = _panel_with_programs(_timed(1)) + engine = ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) + async with engine: + assert engine.running + assert not engine.running + + +@pytest.mark.asyncio +async def test_engine_defaults_to_real_clock() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=MockState()) + engine = ProgramEngine(panel) + assert isinstance(engine.clock, RealClock) + + +@pytest.mark.asyncio +async def test_engine_skips_malformed_records() -> None: + """Garbage in panel.state.programs shouldn't break engine construction.""" + panel = MockPanel( + controller_key=CONTROLLER_KEY, + # Half-length blob — too short for from_wire_bytes; should be skipped. + state=MockState(programs={1: b"\x01\x02\x03"}), + ) + engine = ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) + # No tasks spawned, no exceptions raised, just empty classification. + assert engine.classified.timed == () + + +# ---- Phase 2: TIMED execution ------------------------------------------- + + +from omni_pca.commands import Command # noqa: E402 +from omni_pca.program_engine import ( # noqa: E402 + _matches_days_mask, + _next_absolute_fire, +) + + +def test_matches_days_mask_empty_never_matches() -> None: + assert _matches_days_mask(date(2026, 5, 14), 0) is False + + +def test_matches_days_mask_monday() -> None: + # 2026-05-11 is a Monday. + assert _matches_days_mask(date(2026, 5, 11), int(Days.MONDAY)) is True + assert _matches_days_mask(date(2026, 5, 12), int(Days.MONDAY)) is False + + +def test_matches_days_mask_weekdays_combo() -> None: + weekdays = int(Days.MONDAY | Days.TUESDAY | Days.WEDNESDAY | Days.THURSDAY | Days.FRIDAY) + # Mon..Fri match; Sat (5/16) / Sun (5/17) don't. + for day in (11, 12, 13, 14, 15): + assert _matches_days_mask(date(2026, 5, day), weekdays) + for day in (16, 17): + assert not _matches_days_mask(date(2026, 5, day), weekdays) + + +def test_next_absolute_fire_today_future() -> None: + # 2026-05-14 Thu 22:00; program 22:30 weekdays → fires today 22:30. + p = Program( + slot=1, prog_type=int(ProgramType.TIMED), + hour=22, minute=30, + days=int(Days.THURSDAY), + ) + now = datetime(2026, 5, 14, 22, 0, tzinfo=timezone.utc) + nxt = _next_absolute_fire(now, p) + assert nxt == datetime(2026, 5, 14, 22, 30, tzinfo=timezone.utc) + + +def test_next_absolute_fire_today_already_past_rolls_forward() -> None: + # 23:00 Thu, program 22:30 Thu → next is next Thursday. + p = Program( + slot=1, prog_type=int(ProgramType.TIMED), + hour=22, minute=30, + days=int(Days.THURSDAY), + ) + now = datetime(2026, 5, 14, 23, 0, tzinfo=timezone.utc) + nxt = _next_absolute_fire(now, p) + assert nxt == datetime(2026, 5, 21, 22, 30, tzinfo=timezone.utc) + + +def test_next_absolute_fire_no_days_returns_none() -> None: + p = Program(slot=1, prog_type=int(ProgramType.TIMED), hour=6, minute=0, days=0) + now = datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + assert _next_absolute_fire(now, p) is None + + +@pytest.mark.asyncio +async def test_engine_fires_timed_program_at_scheduled_time() -> None: + """End-to-end: TIMED UNIT_ON program at 06:00 Mon fires when the + fake clock advances past Monday 06:00 and mutates MockUnitState.""" + t0 = datetime(2026, 5, 11, 5, 59, tzinfo=timezone.utc) # Mon 05:59 + fire_at = datetime(2026, 5, 11, 6, 0, tzinfo=timezone.utc) + after = datetime(2026, 5, 11, 6, 1, tzinfo=timezone.utc) + # Unit 7 OFF initially; program turns it ON. + p = Program( + slot=42, prog_type=int(ProgramType.TIMED), + cmd=int(Command.UNIT_ON), pr2=7, + hour=6, minute=0, + days=int(Days.MONDAY), + ) + panel = _panel_with_programs(p) + clock = FakeClock(t0) + async with ProgramEngine(panel, clock=clock) as engine: + # Let the worker schedule itself. + await asyncio.sleep(0) + await clock.advance_to(after) + # Yield so the worker can finish firing. + await asyncio.sleep(0) + assert engine.metrics.timed_fired == 1 + assert panel.state.units[7].state == 1 # ON + + +@pytest.mark.asyncio +async def test_engine_fires_timed_program_repeatedly() -> None: + """Loop-around: same Monday program fires again the next Monday.""" + t0 = datetime(2026, 5, 11, 5, 59, tzinfo=timezone.utc) # Mon 05:59 + p = Program( + slot=42, prog_type=int(ProgramType.TIMED), + cmd=int(Command.UNIT_ON), pr2=7, + hour=6, minute=0, + days=int(Days.MONDAY), + ) + panel = _panel_with_programs(p) + clock = FakeClock(t0) + async with ProgramEngine(panel, clock=clock) as engine: + await asyncio.sleep(0) + # Walk the clock forward week-by-week so each Monday's fire + # completes (advance_to wakes one sleeper at a time; the worker + # needs to re-register for the next week between advances). + for week in range(1, 3): + await clock.advance_to(t0 + timedelta(days=7 * week)) + await asyncio.sleep(0) + assert engine.metrics.timed_fired == 2 + + +@pytest.mark.asyncio +async def test_engine_does_not_fire_outside_days_mask() -> None: + """A Mon-only program does NOT fire if the clock only advances on Tue.""" + t0 = datetime(2026, 5, 12, 5, 59, tzinfo=timezone.utc) # Tue 05:59 + p = Program( + slot=42, prog_type=int(ProgramType.TIMED), + cmd=int(Command.UNIT_ON), pr2=7, + hour=6, minute=0, + days=int(Days.MONDAY), + ) + panel = _panel_with_programs(p) + clock = FakeClock(t0) + async with ProgramEngine(panel, clock=clock) as engine: + await asyncio.sleep(0) + # Advance only ~6 hours — Tuesday 12:00 — still before next Monday 06:00. + await clock.advance_to(t0 + timedelta(hours=6)) + await asyncio.sleep(0) + assert engine.metrics.timed_fired == 0 + # And the unit is still OFF. + assert 7 not in panel.state.units or panel.state.units[7].state == 0 + + +@pytest.mark.asyncio +async def test_engine_skips_empty_days_mask() -> None: + """A program with no Days set never fires — matches real panel.""" + t0 = datetime(2026, 5, 11, 5, 59, tzinfo=timezone.utc) + p = Program( + slot=42, prog_type=int(ProgramType.TIMED), + cmd=int(Command.UNIT_ON), pr2=7, + hour=6, minute=0, + days=0, # disabled + ) + panel = _panel_with_programs(p) + clock = FakeClock(t0) + async with ProgramEngine(panel, clock=clock) as engine: + await asyncio.sleep(0) + await clock.advance_to(t0 + timedelta(days=7)) + await asyncio.sleep(0) + assert engine.metrics.timed_fired == 0 diff --git a/uv.lock b/uv.lock index ce45bf2..5bbd55a 100644 --- a/uv.lock +++ b/uv.lock @@ -1522,6 +1522,9 @@ cli = [ { name = "rich" }, { name = "typer" }, ] +engine = [ + { name = "astral" }, +] [package.dev-dependencies] dev = [ @@ -1537,11 +1540,12 @@ ha = [ [package.metadata] requires-dist = [ + { name = "astral", marker = "extra == 'engine'", specifier = ">=2.2,<3" }, { name = "cryptography", specifier = ">=44.0.0" }, { name = "rich", marker = "extra == 'cli'", specifier = ">=13.9.0" }, { name = "typer", marker = "extra == 'cli'", specifier = ">=0.15.0" }, ] -provides-extras = ["cli"] +provides-extras = ["cli", "engine"] [package.metadata.requires-dev] dev = [