diff --git a/src/omni_pca/program_engine.py b/src/omni_pca/program_engine.py index 783a39a..bc92ed6 100644 --- a/src/omni_pca/program_engine.py +++ b/src/omni_pca/program_engine.py @@ -418,6 +418,64 @@ def _command_payload(program: Program) -> bytes: ) +# -------------------------------------------------------------------------- +# Event taxonomy +# -------------------------------------------------------------------------- + + +# Event-ID encoding mirrors clsText.GetEventCategory (clsText.cs:1585-...). +# Each event has a 16-bit ID; bit-pattern masks pick out the category, and +# the low-order bits within each category encode the object number / state. +# We expose helper builders rather than a full enuEventType mirror — the +# common cases below cover what TIMED-program authors actually wire up. + + +def event_id_user_macro_button(button: int) -> int: + """Event ID fired when a panel button macro runs. + + Category mask: ``(evt & 0xFF00) == 0x0000``. The low byte holds the + 1-based button number (1..255). + """ + if not 1 <= button <= 255: + raise ValueError(f"button {button} out of range 1..255") + return button & 0xFF + + +def event_id_zone_state(zone: int, state: int) -> int: + """Event ID for a zone-state change. + + Category mask: ``(evt & 0xFC00) == 0x0400`` (high bits 0b000001). + Low 10 bits encode zone × state per clsText: ``(zone - 1) * 4 + state`` + where state is the 2-bit "current_state" code (0=secure, 1=not-ready, + 2=trouble, 3=tamper). Range fits 256 zones × 4 states = 1024 IDs. + """ + if not 1 <= zone <= 256: + raise ValueError(f"zone {zone} out of range 1..256") + if not 0 <= state <= 3: + raise ValueError(f"state {state} out of range 0..3") + return 0x0400 | (((zone - 1) * 4 + state) & 0x03FF) + + +def event_id_unit_state(unit: int, on: bool) -> int: + """Event ID for a unit (light/output) state change. + + Category mask: ``(evt & 0xFC00) == 0x0800``. Low bits encode + ``(unit - 1) * 2 + (1 if on else 0)`` per clsText. + """ + if not 1 <= unit <= 511: + raise ValueError(f"unit {unit} out of range 1..511") + return 0x0800 | (((unit - 1) * 2 + (1 if on else 0)) & 0x03FF) + + +# Hand-rolled fixed-ID events from clsText.cs:1647-... (PHONE/AC_POWER/etc.). +EVENT_PHONE_DEAD: int = 768 +EVENT_PHONE_RINGING: int = 769 +EVENT_PHONE_OFF_HOOK: int = 770 +EVENT_PHONE_ON_HOOK: int = 771 +EVENT_AC_POWER_OFF: int = 772 +EVENT_AC_POWER_ON: int = 773 + + # -------------------------------------------------------------------------- # Engine # -------------------------------------------------------------------------- @@ -478,6 +536,10 @@ class ProgramEngine: self._classified = classify(self._programs) self._tasks: list[asyncio.Task[None]] = [] self._running = False + # event_id → list of EVENT programs that fire on it. Built lazily + # in start() so callers can mutate state.programs between init + # and start (e.g. a test seeding extra programs). + self._event_table: dict[int, list[Program]] = {} self.metrics = _EngineMetrics() # ---- inspection ------------------------------------------------------- @@ -523,6 +585,12 @@ class ProgramEngine: name=f"omni-pca-yearly-slot-{program.slot}", ) ) + # Phase 4: EVENT programs aren't long-running tasks — they just + # register in the event table and the engine dispatches on + # emit_event(). Build the table now so emit is O(1). + self._event_table.clear() + for program in self._classified.event: + self._event_table.setdefault(program.event_id, []).append(program) async def _run_timed_program(self, program: Program) -> None: """Sleep-until-next-fire loop for one TIMED program. @@ -560,6 +628,43 @@ class ProgramEngine: ) self.metrics.errors += 1 + # ---- event dispatch (Phase 4) ---------------------------------------- + + async def emit_event(self, event_id: int) -> int: + """Fire every EVENT program subscribed to ``event_id``. + + Returns the number of programs that fired. Safe to call before + ``start()`` (returns 0 since no event table is built yet) or + after ``stop()`` (programs registered while running aren't + retained — call start again to rebuild). + + The classic use cases are wired up via the convenience helpers + below, but tests and HA code can also call this directly with + any raw ``event_id``. + """ + if not self._running: + return 0 + programs = self._event_table.get(event_id, ()) + for program in programs: + await self._fire(program) + return len(programs) + + async def emit_user_macro_button(self, button: int) -> int: + """Convenience: fire EVENT programs subscribed to a button press.""" + return await self.emit_event(event_id_user_macro_button(button)) + + async def emit_zone_state(self, zone: int, state: int) -> int: + """Convenience: fire EVENT programs subscribed to a zone-state change. + + ``state`` is the 2-bit current_state code: 0=secure, 1=not-ready, + 2=trouble, 3=tamper. Matches MockZoneState.current_state. + """ + return await self.emit_event(event_id_zone_state(zone, state)) + + async def emit_unit_state(self, unit: int, on: bool) -> int: + """Convenience: fire EVENT programs subscribed to a unit on/off.""" + return await self.emit_event(event_id_unit_state(unit, on)) + async def _run_yearly_program(self, program: Program) -> None: """Sleep-until-next-fire loop for one YEARLY program.""" try: diff --git a/tests/test_program_engine.py b/tests/test_program_engine.py index 6e2f5dc..0f53a47 100644 --- a/tests/test_program_engine.py +++ b/tests/test_program_engine.py @@ -601,6 +601,152 @@ async def test_engine_sun_relative_without_location_is_skipped() -> None: assert engine.metrics.timed_fired == 0 +# ---- Phase 4: EVENT programs -------------------------------------------- + + +from omni_pca.program_engine import ( # noqa: E402 + EVENT_AC_POWER_OFF, + event_id_unit_state, + event_id_user_macro_button, + event_id_zone_state, +) + + +def test_event_id_user_macro_button_packs_low_byte() -> None: + assert event_id_user_macro_button(1) == 0x0001 + assert event_id_user_macro_button(255) == 0x00FF + + +def test_event_id_user_macro_button_rejects_out_of_range() -> None: + with pytest.raises(ValueError): + event_id_user_macro_button(0) + with pytest.raises(ValueError): + event_id_user_macro_button(256) + + +def test_event_id_zone_state_encodes_zone_and_state() -> None: + # Zone 1 state 0 (secure) → 0x0400 base. + assert event_id_zone_state(1, 0) == 0x0400 + assert event_id_zone_state(1, 3) == 0x0403 # tamper + # Zone 2 state 0 = 0x0404 (2-1)*4+0 + assert event_id_zone_state(2, 0) == 0x0404 + + +def test_event_id_unit_state_encodes_unit_and_on_off() -> None: + assert event_id_unit_state(1, on=False) == 0x0800 + assert event_id_unit_state(1, on=True) == 0x0801 + assert event_id_unit_state(2, on=True) == 0x0803 + + +@pytest.mark.asyncio +async def test_engine_emit_event_fires_subscribed_program() -> None: + """An EVENT program with event_id matching the emit fires.""" + button_evt = event_id_user_macro_button(5) + # EVENT program stores event_id in (month<<8)|day per programs.event_id. + p = Program( + slot=1, prog_type=int(ProgramType.EVENT), + cmd=int(Command.UNIT_ON), pr2=7, + month=(button_evt >> 8) & 0xFF, day=button_evt & 0xFF, + ) + panel = _panel_with_programs(p) + async with ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) as engine: + fired = await engine.emit_user_macro_button(5) + assert fired == 1 + assert engine.metrics.event_fired == 1 + assert panel.state.units[7].state == 1 + + +@pytest.mark.asyncio +async def test_engine_emit_event_no_match_returns_zero() -> None: + """Emitting an event with no subscribed program is a silent no-op.""" + panel = MockPanel(controller_key=CONTROLLER_KEY, state=MockState()) + async with ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) as engine: + assert await engine.emit_user_macro_button(99) == 0 + assert engine.metrics.event_fired == 0 + + +@pytest.mark.asyncio +async def test_engine_emit_event_before_start_is_no_op() -> None: + """emit_event on an un-started engine doesn't raise — just returns 0.""" + p = Program( + slot=1, prog_type=int(ProgramType.EVENT), + cmd=1, month=0, day=1, + ) + panel = _panel_with_programs(p) + engine = ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) + assert await engine.emit_event(1) == 0 # not started yet + + +@pytest.mark.asyncio +async def test_engine_multiple_programs_on_same_event() -> None: + """Two EVENT programs subscribed to the same event both fire.""" + evt = event_id_user_macro_button(3) + hi = (evt >> 8) & 0xFF + lo = evt & 0xFF + p1 = Program( + slot=1, prog_type=int(ProgramType.EVENT), + cmd=int(Command.UNIT_ON), pr2=5, + month=hi, day=lo, + ) + p2 = Program( + slot=2, prog_type=int(ProgramType.EVENT), + cmd=int(Command.UNIT_ON), pr2=6, + month=hi, day=lo, + ) + panel = _panel_with_programs(p1, p2) + async with ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) as engine: + fired = await engine.emit_user_macro_button(3) + assert fired == 2 + assert panel.state.units[5].state == 1 + assert panel.state.units[6].state == 1 + + +@pytest.mark.asyncio +async def test_engine_zone_state_emit_helper() -> None: + """emit_zone_state fires programs matching that exact (zone, state).""" + evt = event_id_zone_state(7, 1) # zone 7 not-ready + p = Program( + slot=10, prog_type=int(ProgramType.EVENT), + cmd=int(Command.UNIT_ON), pr2=12, + month=(evt >> 8) & 0xFF, day=evt & 0xFF, + ) + panel = _panel_with_programs(p) + async with ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) as engine: + # Wrong state — no fire. + assert await engine.emit_zone_state(7, 0) == 0 + assert engine.metrics.event_fired == 0 + # Right state — fire. + assert await engine.emit_zone_state(7, 1) == 1 + assert engine.metrics.event_fired == 1 + + +@pytest.mark.asyncio +async def test_engine_fixed_event_constants() -> None: + """The hand-rolled fixed-ID events (PHONE/AC_POWER) dispatch correctly.""" + p = Program( + slot=1, prog_type=int(ProgramType.EVENT), + cmd=int(Command.UNIT_ON), pr2=4, + month=(EVENT_AC_POWER_OFF >> 8) & 0xFF, + day=EVENT_AC_POWER_OFF & 0xFF, + ) + panel = _panel_with_programs(p) + async with ProgramEngine(panel, clock=FakeClock( + datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc) + )) as engine: + fired = await engine.emit_event(EVENT_AC_POWER_OFF) + assert fired == 1 + + @pytest.mark.asyncio async def test_engine_fires_sun_relative_program_with_location() -> None: """End-to-end: TIMED AT_SUNSET program fires at astral-computed