diff --git a/src/omni_pca/program_renderer.py b/src/omni_pca/program_renderer.py new file mode 100644 index 0000000..d795e7d --- /dev/null +++ b/src/omni_pca/program_renderer.py @@ -0,0 +1,884 @@ +"""Structured-English rendering of HAI Omni panel programs. + +The decoded :class:`omni_pca.programs.Program` records produced by +``pca_file`` and the wire upload paths carry every byte but no narrative. +This module turns them into readable sentences modelled on PC Access's +program editor: + + WHEN Front Door is opened + AND IF Living Room Motion is secure + AND IF after sunset + OR IF Bedtime Mode is active + THEN Turn ON Hallway Light + AND Show Message "WELCOME HOME" + +Output is a sequence of :class:`Token` records rather than a flat string +so that consumers (CLI, HA frontend, anything else) can: + +* Identify object references (zones / units / areas / thermostats / + buttons / messages) — render each as a clickable link to the entity + page, badge them with live state, etc. +* Style keywords (`WHEN`, `AND IF`, `THEN`) separately from object + names and values. +* Recover plain text trivially via ``"".join(t.text for t in tokens)``. + +A :class:`ProgramRenderer` is constructed with a :class:`NameResolver` +and an optional :class:`StateResolver` for the live-state overlay. The +two resolvers are protocols (any object with the right methods works); +the convenience :class:`AccountNameResolver` adapts a :class:`PcaAccount` +and :class:`MockStateResolver` adapts a :class:`MockState` — together +those cover the two common consumers (offline ``.pca`` snapshot vs. +running mock panel) without forcing either into a base class. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Iterable, Protocol, runtime_checkable + +from .commands import Command +from .programs import ( + CondArgType, + CondOP, + Days, + MiscConditional, + Program, + ProgramCond, + ProgramType, + TimeKind, +) +from .program_engine import ( + ClausalChain, + EVENT_AC_POWER_OFF, + EVENT_AC_POWER_ON, + EVENT_PHONE_DEAD, + EVENT_PHONE_OFF_HOOK, + EVENT_PHONE_ON_HOOK, + EVENT_PHONE_RINGING, +) + + +# -------------------------------------------------------------------------- +# Token stream +# -------------------------------------------------------------------------- + + +class TokenKind: + """String constants for :attr:`Token.kind`. Defined as a class of + str constants so consumers can do ``if t.kind == TokenKind.REF``.""" + + KEYWORD: str = "keyword" # WHEN, AND IF, THEN, OR IF, etc. + OPERATOR: str = "operator" # is, ==, >, after, before, … + REF: str = "ref" # an object reference (zone / unit / …) + VALUE: str = "value" # a literal value (time, number, mode name) + TEXT: str = "text" # plain prose connectors + INDENT: str = "indent" # leading whitespace for the next line + NEWLINE: str = "newline" # end-of-line + + +@dataclass(frozen=True, slots=True) +class Token: + """One unit of structured-English output. + + ``text`` is what the consumer prints. The other fields are + metadata; only ``REF`` tokens use them all. + + For ``REF`` tokens: + * ``entity_kind`` is one of ``"zone"`` / ``"unit"`` / ``"area"`` + / ``"thermostat"`` / ``"button"`` / ``"message"`` + / ``"code"`` / ``"timeclock"`` + * ``entity_id`` is the 1-based slot the reference resolves to + * ``state`` is the live-state overlay string when a state + resolver was provided (e.g. ``"SECURE"``, ``"ON 60%"``, + ``"Off"``); ``None`` when no overlay is available + + For non-REF tokens, ``entity_kind`` / ``entity_id`` / ``state`` are ``None``. + """ + + kind: str + text: str + entity_kind: str | None = None + entity_id: int | None = None + state: str | None = None + + +def tokens_to_string(tokens: Iterable[Token]) -> str: + """Render a token stream to plain text. Useful for logs / dumps.""" + pieces: list[str] = [] + for t in tokens: + if t.kind == TokenKind.NEWLINE: + pieces.append("\n") + elif t.kind == TokenKind.INDENT: + pieces.append(t.text) + else: + pieces.append(t.text) + if t.kind == TokenKind.REF and t.state is not None: + pieces.append(f" [{t.state}]") + return "".join(pieces) + + +# -------------------------------------------------------------------------- +# Resolver protocols + default implementations +# -------------------------------------------------------------------------- + + +@runtime_checkable +class NameResolver(Protocol): + """Translate a (kind, 1-based-index) reference into a human name. + + Returns the name string when known, or ``None`` when the slot is + undefined / the kind isn't supported. The renderer falls back to + a generated label (``"Zone 5"``, ``"Unit 7"``) when the resolver + returns ``None``. + """ + + def name_of(self, kind: str, index: int) -> str | None: ... + + +@runtime_checkable +class StateResolver(Protocol): + """Translate a (kind, 1-based-index) reference into a live-state + overlay string. Returns ``None`` when no overlay applies — the + renderer omits the bracketed annotation in that case. + """ + + def state_of(self, kind: str, index: int) -> str | None: ... + + +class AccountNameResolver: + """Resolves names from a :class:`omni_pca.pca_file.PcaAccount`. + + Works as both a static-snapshot view (offline ``.pca`` inspection) + and as a fallback for the HA path when only header data is loaded. + """ + + def __init__(self, account) -> None: + self._account = account + + def name_of(self, kind: str, index: int) -> str | None: + table = { + "zone": getattr(self._account, "zone_names", {}), + "unit": getattr(self._account, "unit_names", {}), + "area": getattr(self._account, "area_names", {}), + "thermostat": getattr(self._account, "thermostat_names", {}), + "button": getattr(self._account, "button_names", {}), + "message": getattr(self._account, "message_names", {}), + "code": getattr(self._account, "code_names", {}), + }.get(kind, {}) + return table.get(index) + + +class MockStateResolver: + """Resolves both names and live state from a :class:`MockState`. + + Implements both :class:`NameResolver` and :class:`StateResolver` + so the same object covers both roles when rendering against a + running mock panel. + """ + + def __init__(self, state) -> None: + self._state = state + + def name_of(self, kind: str, index: int) -> str | None: + getter = { + "zone": getattr(self._state, "zones", {}).get, + "unit": getattr(self._state, "units", {}).get, + "area": getattr(self._state, "areas", {}).get, + "thermostat": getattr(self._state, "thermostats", {}).get, + "button": getattr(self._state, "buttons", {}).get, + }.get(kind) + if getter is None: + return None + obj = getter(index) + return getattr(obj, "name", None) if obj else None + + def state_of(self, kind: str, index: int) -> str | None: + if kind == "zone": + z = self._state.zones.get(index) + if z is None: + return None + if z.is_bypassed: + return "BYPASSED" + return "NOT READY" if z.current_state != 0 else "SECURE" + if kind == "unit": + u = self._state.units.get(index) + if u is None: + return None + if u.state == 0: + return "OFF" + if u.state >= 100: + return f"ON {u.state - 100}%" + return "ON" + if kind == "area": + a = self._state.areas.get(index) + if a is None: + return None + return _SECURITY_MODE_NAMES.get(a.mode, f"mode {a.mode}") + if kind == "thermostat": + t = self._state.thermostats.get(index) + if t is None or t.temperature_raw == 0: + return None + # Linear scale on Omni: temp_raw / 2 - 40 = °F. + return f"{t.temperature_raw // 2 - 40}°F" + return None + + +_SECURITY_MODE_NAMES: dict[int, str] = { + 0: "Off", + 1: "Day", + 2: "Night", + 3: "Away", + 4: "Vacation", + 5: "Day Instant", + 6: "Night Delayed", +} + + +# -------------------------------------------------------------------------- +# Helpers — friendly names for fixed enums +# -------------------------------------------------------------------------- + + +_DAY_BIT_LABELS: tuple[tuple[int, str], ...] = ( + (int(Days.MONDAY), "Mon"), + (int(Days.TUESDAY), "Tue"), + (int(Days.WEDNESDAY), "Wed"), + (int(Days.THURSDAY), "Thu"), + (int(Days.FRIDAY), "Fri"), + (int(Days.SATURDAY), "Sat"), + (int(Days.SUNDAY), "Sun"), +) + +_ALL_DAYS_MASK: int = sum(b for b, _ in _DAY_BIT_LABELS) +_WEEKDAYS_MASK: int = int( + Days.MONDAY | Days.TUESDAY | Days.WEDNESDAY | Days.THURSDAY | Days.FRIDAY +) +_WEEKEND_MASK: int = int(Days.SATURDAY | Days.SUNDAY) + + +def format_days(mask: int) -> str: + """Render a Days bitmask as a friendly schedule string. + + Common patterns get short names; anything else is the abbreviated + weekday list (``"Mon, Wed, Fri"``). + """ + if mask == 0: + return "never" + if mask & _ALL_DAYS_MASK == _ALL_DAYS_MASK: + return "every day" + if mask & _ALL_DAYS_MASK == _WEEKDAYS_MASK: + return "weekdays" + if mask & _ALL_DAYS_MASK == _WEEKEND_MASK: + return "weekends" + parts = [label for bit, label in _DAY_BIT_LABELS if mask & bit] + return ", ".join(parts) if parts else "(no days)" + + +# Command → ("verb", expects_pr2_object_kind) lookup. ``None`` for the +# second element means "no object reference" — the command's parameters +# are the action's payload alone. +_COMMAND_VERBS: dict[int, tuple[str, str | None]] = { + int(Command.UNIT_OFF): ("Turn OFF", "unit"), + int(Command.UNIT_ON): ("Turn ON", "unit"), + int(Command.ALL_OFF): ("Turn ALL OFF", None), + int(Command.ALL_ON): ("Turn ALL ON", None), + int(Command.BYPASS_ZONE): ("Bypass", "zone"), + int(Command.RESTORE_ZONE): ("Restore", "zone"), + int(Command.RESTORE_ALL_ZONES): ("Restore all zones", None), + int(Command.EXECUTE_BUTTON): ("Execute button", "button"), + int(Command.UNIT_LEVEL): ("Set level", "unit"), + int(Command.UNIT_RAMP): ("Ramp", "unit"), + int(Command.DIM_STEP): ("Dim", "unit"), + int(Command.BRIGHT_STEP): ("Brighten", "unit"), + int(Command.SECURITY_OFF): ("Disarm", "area"), + int(Command.SECURITY_DAY): ("Arm Day", "area"), + int(Command.SECURITY_NIGHT): ("Arm Night", "area"), + int(Command.SECURITY_AWAY): ("Arm Away", "area"), + int(Command.SECURITY_VACATION): ("Arm Vacation", "area"), + int(Command.SECURITY_DAY_INSTANT): ("Arm Day Instant", "area"), + int(Command.SECURITY_NIGHT_DELAYED): ("Arm Night Delayed", "area"), +} + + +# -------------------------------------------------------------------------- +# The renderer +# -------------------------------------------------------------------------- + + +@dataclass +class ProgramRenderer: + """Render :class:`Program` records and clausal chains as token streams. + + Parameters + ---------- + names: + Object-name resolver (zones, units, areas, thermostats, buttons, + messages, codes). Pass an :class:`AccountNameResolver` for + offline ``.pca`` snapshots or a :class:`MockStateResolver` for + the mock-panel case. + state: + Optional live-state resolver. When provided, every ``REF`` token + carries a ``state`` annotation that consumers can render as a + badge (``"Front Door [SECURE]"`` etc.). + """ + + names: NameResolver + state: StateResolver | None = None + + # ---- public API ------------------------------------------------------ + + def render_program(self, p: Program) -> list[Token]: + """Render a single compact-form program (TIMED / EVENT / YEARLY). + + Returns the multi-line full form. For a one-line summary, see + :meth:`summarize_program`. + """ + out: list[Token] = [] + try: + kind = ProgramType(p.prog_type) + except ValueError: + out.append(Token(TokenKind.TEXT, f"Unknown program type {p.prog_type}")) + return out + if kind == ProgramType.TIMED: + self._emit_timed_header(p, out) + elif kind == ProgramType.EVENT: + self._emit_event_header(p, out) + elif kind == ProgramType.YEARLY: + self._emit_yearly_header(p, out) + elif kind == ProgramType.REMARK: + self._emit_remark(p, out) + return out + elif kind == ProgramType.FREE: + out.append(Token(TokenKind.TEXT, "(empty slot)")) + return out + else: + # Multi-record record on its own — caller should use + # render_chain instead. Be helpful rather than silent. + out.append(Token( + TokenKind.TEXT, + f"(multi-record {kind.name} — render with render_chain)", + )) + return out + # Compact-form programs can carry up to two inline AND conditions + # in their cond / cond2 fields. Skip when both are zero. + for slot_idx, field_val in (("cond", p.cond), ("cond2", p.cond2)): + if field_val == 0: + continue + out.append(Token(TokenKind.NEWLINE, "")) + out.append(Token(TokenKind.INDENT, " ")) + out.append(Token(TokenKind.KEYWORD, "AND IF")) + out.append(Token(TokenKind.TEXT, " ")) + self._emit_traditional_cond(field_val, out) + out.append(Token(TokenKind.NEWLINE, "")) + out.append(Token(TokenKind.KEYWORD, "THEN")) + out.append(Token(TokenKind.TEXT, " ")) + self._emit_action(p, out) + return out + + def render_chain(self, chain: ClausalChain) -> list[Token]: + """Render a multi-record clausal chain (WHEN/AT/EVERY + body). + + Output mirrors PC Access's structured-English: trigger on the + first line, conditions indented two spaces with ``AND IF`` / + ``OR IF`` keywords, actions on their own lines under ``THEN`` / + ``AND``. + """ + out: list[Token] = [] + head = chain.head + head_kind = head.prog_type + if head_kind == int(ProgramType.WHEN): + self._emit_when_header(head, out) + elif head_kind == int(ProgramType.AT): + self._emit_at_header(head, out) + elif head_kind == int(ProgramType.EVERY): + self._emit_every_header(head, out) + else: + out.append(Token(TokenKind.TEXT, f"(chain head type {head_kind}?)")) + # Conditions: AND IF / OR IF, indented. + for cond in chain.conditions: + out.append(Token(TokenKind.NEWLINE, "")) + out.append(Token(TokenKind.INDENT, " ")) + keyword = "OR IF" if cond.prog_type == int(ProgramType.OR) else "AND IF" + out.append(Token(TokenKind.KEYWORD, keyword)) + out.append(Token(TokenKind.TEXT, " ")) + self._emit_and_record(cond, out) + # Actions: first one prefixed THEN, rest AND. + for i, action in enumerate(chain.actions): + out.append(Token(TokenKind.NEWLINE, "")) + out.append(Token(TokenKind.KEYWORD, "THEN" if i == 0 else "AND")) + out.append(Token(TokenKind.TEXT, " ")) + self._emit_action(action, out) + return out + + def summarize_program(self, p: Program) -> list[Token]: + """One-line summary suitable for the list view. + + Format: ````. Conditions + on compact-form programs are elided with ``(+N conds)``. + """ + out: list[Token] = [] + try: + kind = ProgramType(p.prog_type) + except ValueError: + out.append(Token(TokenKind.TEXT, f"?type {p.prog_type}")) + return out + if kind == ProgramType.TIMED: + self._emit_timed_summary(p, out) + elif kind == ProgramType.EVENT: + self._emit_event_summary(p, out) + elif kind == ProgramType.YEARLY: + self._emit_yearly_summary(p, out) + elif kind == ProgramType.REMARK: + self._emit_remark(p, out) + return out + elif kind == ProgramType.FREE: + out.append(Token(TokenKind.TEXT, "(empty)")) + return out + else: + out.append(Token(TokenKind.TEXT, kind.name)) + return out + # Inline condition count. + cond_count = (1 if p.cond else 0) + (1 if p.cond2 else 0) + if cond_count: + out.append(Token(TokenKind.TEXT, f" (+{cond_count} cond)")) + out.append(Token(TokenKind.TEXT, " → ")) + self._emit_action(p, out) + return out + + def summarize_chain(self, chain: ClausalChain) -> list[Token]: + """One-line summary of a clausal chain for the list view.""" + out: list[Token] = [] + head = chain.head + if head.prog_type == int(ProgramType.WHEN): + out.append(Token(TokenKind.KEYWORD, "WHEN")) + out.append(Token(TokenKind.TEXT, " ")) + self._emit_event(head.event_id, out) + elif head.prog_type == int(ProgramType.AT): + out.append(Token(TokenKind.KEYWORD, "AT")) + out.append(Token(TokenKind.TEXT, " ")) + out.append(Token(TokenKind.VALUE, head.format_time())) + out.append(Token(TokenKind.TEXT, " ")) + out.append(Token(TokenKind.VALUE, format_days(head.days))) + elif head.prog_type == int(ProgramType.EVERY): + out.append(Token(TokenKind.KEYWORD, "EVERY")) + out.append(Token(TokenKind.TEXT, " ")) + out.append(Token(TokenKind.VALUE, _format_interval(head.every_interval))) + if chain.conditions: + out.append(Token( + TokenKind.TEXT, + f" (+{len(chain.conditions)} cond)", + )) + out.append(Token(TokenKind.TEXT, " → ")) + # Show the first action only on summary; "+N more" if there are more. + if chain.actions: + self._emit_action(chain.actions[0], out) + if len(chain.actions) > 1: + out.append(Token( + TokenKind.TEXT, + f" (+{len(chain.actions) - 1} more)", + )) + return out + + # ---- emit helpers — triggers / headers ------------------------------- + + def _emit_timed_header(self, p: Program, out: list[Token]) -> None: + out.append(Token(TokenKind.KEYWORD, "AT")) + out.append(Token(TokenKind.TEXT, " ")) + out.append(Token(TokenKind.VALUE, p.format_time())) + out.append(Token(TokenKind.TEXT, " ")) + out.append(Token(TokenKind.VALUE, format_days(p.days))) + + def _emit_event_header(self, p: Program, out: list[Token]) -> None: + out.append(Token(TokenKind.KEYWORD, "WHEN")) + out.append(Token(TokenKind.TEXT, " ")) + self._emit_event(p.event_id, out) + + def _emit_yearly_header(self, p: Program, out: list[Token]) -> None: + out.append(Token(TokenKind.KEYWORD, "ON")) + out.append(Token(TokenKind.TEXT, " ")) + out.append(Token( + TokenKind.VALUE, f"{p.month:d}/{p.day:d} at {p.hour:02d}:{p.minute:02d}", + )) + + def _emit_remark(self, p: Program, out: list[Token]) -> None: + rid = p.remark_id if p.remark_id is not None else 0 + out.append(Token(TokenKind.KEYWORD, "REMARK")) + out.append(Token(TokenKind.TEXT, f" #{rid}")) + + def _emit_when_header(self, p: Program, out: list[Token]) -> None: + out.append(Token(TokenKind.KEYWORD, "WHEN")) + out.append(Token(TokenKind.TEXT, " ")) + self._emit_event(p.event_id, out) + + def _emit_at_header(self, p: Program, out: list[Token]) -> None: + out.append(Token(TokenKind.KEYWORD, "AT")) + out.append(Token(TokenKind.TEXT, " ")) + out.append(Token(TokenKind.VALUE, p.format_time())) + out.append(Token(TokenKind.TEXT, " ")) + out.append(Token(TokenKind.VALUE, format_days(p.days))) + + def _emit_every_header(self, p: Program, out: list[Token]) -> None: + out.append(Token(TokenKind.KEYWORD, "EVERY")) + out.append(Token(TokenKind.TEXT, " ")) + out.append(Token(TokenKind.VALUE, _format_interval(p.every_interval))) + + def _emit_timed_summary(self, p: Program, out: list[Token]) -> None: + out.append(Token(TokenKind.VALUE, p.format_time())) + out.append(Token(TokenKind.TEXT, " ")) + out.append(Token(TokenKind.VALUE, format_days(p.days))) + + def _emit_event_summary(self, p: Program, out: list[Token]) -> None: + out.append(Token(TokenKind.KEYWORD, "WHEN")) + out.append(Token(TokenKind.TEXT, " ")) + self._emit_event(p.event_id, out) + + def _emit_yearly_summary(self, p: Program, out: list[Token]) -> None: + out.append(Token( + TokenKind.VALUE, + f"{p.month:d}/{p.day:d} @ {p.hour:02d}:{p.minute:02d}", + )) + + def _emit_event(self, event_id: int, out: list[Token]) -> None: + """Render an event-ID as natural language. + + Mirrors clsText.GetEventCategory (clsText.cs:1585-...) for the + common categories. Unknown event IDs render as ``"event 0xNNNN"``. + """ + if event_id == EVENT_PHONE_DEAD: + out.append(Token(TokenKind.TEXT, "phone line is dead")) + return + if event_id == EVENT_PHONE_RINGING: + out.append(Token(TokenKind.TEXT, "phone is ringing")) + return + if event_id == EVENT_PHONE_OFF_HOOK: + out.append(Token(TokenKind.TEXT, "phone is off hook")) + return + if event_id == EVENT_PHONE_ON_HOOK: + out.append(Token(TokenKind.TEXT, "phone is on hook")) + return + if event_id == EVENT_AC_POWER_OFF: + out.append(Token(TokenKind.TEXT, "AC power lost")) + return + if event_id == EVENT_AC_POWER_ON: + out.append(Token(TokenKind.TEXT, "AC power restored")) + return + # USER_MACRO_BUTTON (high byte == 0) + if (event_id & 0xFF00) == 0x0000: + button = event_id & 0xFF + self._emit_ref("button", button, out) + out.append(Token(TokenKind.TEXT, " is pressed")) + return + # ZONE_STATE_CHANGE (& 0xFC00 == 0x0400) + if (event_id & 0xFC00) == 0x0400: + zone_state = event_id & 0x03FF + zone = (zone_state // 4) + 1 + state = zone_state % 4 + self._emit_ref("zone", zone, out) + state_label = { + 0: "becomes secure", + 1: "becomes not ready", + 2: "reports trouble", + 3: "reports tamper", + }.get(state, f"changes to state {state}") + out.append(Token(TokenKind.TEXT, " " + state_label)) + return + # UNIT_STATE_CHANGE (& 0xFC00 == 0x0800) + if (event_id & 0xFC00) == 0x0800: + unit_state = event_id & 0x03FF + unit = (unit_state // 2) + 1 + on = unit_state & 1 + self._emit_ref("unit", unit, out) + out.append(Token(TokenKind.TEXT, " turns " + ("ON" if on else "OFF"))) + return + out.append(Token(TokenKind.TEXT, f"event 0x{event_id:04x}")) + + # ---- emit helpers — conditions --------------------------------------- + + def _emit_traditional_cond(self, cond: int, out: list[Token]) -> None: + """Render a compact-form ``cond`` u16 (TIMED/EVENT/YEARLY inline + AND condition). + + These use a different bit-layout from AND-record cond fields — + see clsText.GetConditionalText (clsText.cs:2224-2274). + """ + family = (cond >> 8) & 0xFC + if family == 0: + misc = cond & 0x0F + self._emit_misc_conditional(misc, out) + return + if family == ProgramCond.ZONE: + zone = cond & 0xFF + not_ready = bool(cond & 0x0200) + self._emit_ref("zone", zone, out) + out.append(Token(TokenKind.TEXT, " is ")) + out.append(Token(TokenKind.OPERATOR, "not ready" if not_ready else "secure")) + return + if family == ProgramCond.CTRL: + unit = cond & 0x01FF + on = bool(cond & 0x0200) + self._emit_ref("unit", unit, out) + out.append(Token(TokenKind.TEXT, " is ")) + out.append(Token(TokenKind.OPERATOR, "ON" if on else "OFF")) + return + if family == ProgramCond.TIME: + tc = cond & 0xFF + enabled = bool(cond & 0x0200) + out.append(Token(TokenKind.TEXT, "Time clock ")) + out.append(Token(TokenKind.VALUE, str(tc))) + out.append(Token(TokenKind.TEXT, " is ")) + out.append(Token(TokenKind.OPERATOR, + "enabled" if enabled else "disabled")) + return + # SEC default: high nibble = mode, bits 8-11 = area. + area = (cond >> 8) & 0x0F + mode = (cond >> 12) & 0x07 + if area == 0: + area = 1 + self._emit_ref("area", area, out) + out.append(Token(TokenKind.TEXT, " is ")) + out.append(Token( + TokenKind.VALUE, + _SECURITY_MODE_NAMES.get(mode, f"mode {mode}"), + )) + + def _emit_and_record(self, c: Program, out: list[Token]) -> None: + """Render an AND/OR Program record (Traditional or Structured).""" + if c.and_op == CondOP.ARG1_TRADITIONAL: + self._emit_traditional_and(c, out) + else: + self._emit_structured_and(c, out) + + def _emit_traditional_and(self, c: Program, out: list[Token]) -> None: + """AND/OR record carrying a Traditional condition. + + Encoding via clsConditionLine.Cond (clsConditionLine.cs:17-33): + ``and_family`` is the family+selector byte; ``and_instance`` is + the object index (1-based). + """ + family = c.and_family + instance = c.and_instance + family_major = family & 0xFC + secondary = bool(family & 0x02) + if family_major == 0: + self._emit_misc_conditional(family & 0x0F, out) + return + if family_major == ProgramCond.ZONE: + self._emit_ref("zone", instance, out) + out.append(Token(TokenKind.TEXT, " is ")) + out.append(Token( + TokenKind.OPERATOR, "not ready" if secondary else "secure", + )) + return + if family_major == ProgramCond.CTRL: + self._emit_ref("unit", instance, out) + out.append(Token(TokenKind.TEXT, " is ")) + out.append(Token( + TokenKind.OPERATOR, "ON" if secondary else "OFF", + )) + return + if family_major == ProgramCond.TIME: + out.append(Token(TokenKind.TEXT, "Time clock ")) + out.append(Token(TokenKind.VALUE, str(instance))) + out.append(Token(TokenKind.TEXT, " is ")) + out.append(Token( + TokenKind.OPERATOR, + "enabled" if secondary else "disabled", + )) + return + # SEC: high nibble = mode, low = area + area = family & 0x0F + mode = (family >> 4) & 0x07 + if area == 0: + area = 1 + self._emit_ref("area", area, out) + out.append(Token(TokenKind.TEXT, " is ")) + out.append(Token( + TokenKind.VALUE, + _SECURITY_MODE_NAMES.get(mode, f"mode {mode}"), + )) + + def _emit_misc_conditional(self, misc_code: int, out: list[Token]) -> None: + try: + cat = MiscConditional(misc_code) + except ValueError: + out.append(Token(TokenKind.TEXT, f"misc condition {misc_code}")) + return + labels = { + MiscConditional.NONE: "always", + MiscConditional.NEVER: "never", + MiscConditional.LIGHT: "it is light outside", + MiscConditional.DARK: "it is dark outside", + MiscConditional.PHONE_DEAD: "phone line is dead", + MiscConditional.PHONE_RINGING: "phone is ringing", + MiscConditional.PHONE_OFF_HOOK: "phone is off hook", + MiscConditional.PHONE_ON_HOOK: "phone is on hook", + MiscConditional.AC_POWER_OFF: "AC power is off", + MiscConditional.AC_POWER_ON: "AC power is on", + MiscConditional.BATTERY_LOW: "battery is low", + MiscConditional.BATTERY_OK: "battery is OK", + MiscConditional.ENERGY_COST_LOW: "energy cost is low", + MiscConditional.ENERGY_COST_MID: "energy cost is mid", + MiscConditional.ENERGY_COST_HIGH: "energy cost is high", + MiscConditional.ENERGY_COST_CRITICAL: "energy cost is critical", + } + out.append(Token(TokenKind.TEXT, labels.get(cat, cat.name))) + + def _emit_structured_and(self, c: Program, out: list[Token]) -> None: + """Render an ``Arg1 OP Arg2`` AND/OR record. + + For each arg side we render either an object reference + field, + or a literal value. The operator goes in between. + """ + self._emit_structured_arg( + c.and_arg1_argtype, c.and_arg1_ix, c.and_arg1_field, out, + ) + out.append(Token(TokenKind.TEXT, " ")) + out.append(Token(TokenKind.OPERATOR, _OP_SYMBOLS.get(c.and_op, "?"))) + out.append(Token(TokenKind.TEXT, " ")) + self._emit_structured_arg( + c.and_arg2_argtype, c.and_arg2_ix, c.and_arg2_field, out, + ) + + def _emit_structured_arg( + self, argtype: int, ix: int, field_id: int, out: list[Token], + ) -> None: + if argtype == CondArgType.CONSTANT: + out.append(Token(TokenKind.VALUE, str(ix))) + return + kind = _ARGTYPE_KIND.get(argtype) + if kind is None: + out.append(Token(TokenKind.TEXT, f"argtype{argtype}#{ix}")) + return + if kind == "timedate": + field_label = _TIMEDATE_FIELD_LABELS.get(field_id, f"field{field_id}") + out.append(Token(TokenKind.TEXT, field_label)) + return + # Object reference with field suffix (when known). + self._emit_ref(kind, ix, out) + field_label = _FIELD_LABELS.get((kind, field_id)) + if field_label: + out.append(Token(TokenKind.TEXT, ".")) + out.append(Token(TokenKind.TEXT, field_label)) + + # ---- emit helpers — actions ------------------------------------------ + + def _emit_action(self, p: Program, out: list[Token]) -> None: + """Render the cmd / par / pr2 triple as a friendly verb. + + For unrecognised commands we fall back to the raw enum name, + which keeps the rendering useful even for less-common + Command values we haven't mapped yet. + """ + cmd_byte = p.cmd + try: + cmd = Command(cmd_byte) + except ValueError: + out.append(Token(TokenKind.TEXT, f"command {cmd_byte}")) + return + verb_entry = _COMMAND_VERBS.get(cmd_byte) + verb, ref_kind = verb_entry if verb_entry else (cmd.name.replace("_", " "), None) + out.append(Token(TokenKind.KEYWORD, verb)) + if ref_kind is not None: + out.append(Token(TokenKind.TEXT, " ")) + self._emit_ref(ref_kind, p.pr2, out) + if cmd == Command.UNIT_LEVEL: + out.append(Token(TokenKind.TEXT, " to ")) + out.append(Token(TokenKind.VALUE, f"{p.par}%")) + + # ---- emit helpers — refs --------------------------------------------- + + def _emit_ref(self, kind: str, index: int, out: list[Token]) -> None: + """Emit a typed object reference token with name + live state.""" + name = self.names.name_of(kind, index) + if not name: + name = f"{kind.capitalize()} {index}" + state = None + if self.state is not None: + state = self.state.state_of(kind, index) + out.append(Token( + TokenKind.REF, name, + entity_kind=kind, entity_id=index, state=state, + )) + + +# -------------------------------------------------------------------------- +# Tables — kept at module scope so they're not re-allocated per render +# -------------------------------------------------------------------------- + + +_OP_SYMBOLS: dict[int, str] = { + int(CondOP.ARG1_EQ_ARG2): "==", + int(CondOP.ARG1_NE_ARG2): "!=", + int(CondOP.ARG1_LT_ARG2): "<", + int(CondOP.ARG1_GT_ARG2): ">", + int(CondOP.ARG1_ODD): "is odd", + int(CondOP.ARG1_EVEN): "is even", + int(CondOP.ARG1_MULTIPLE_ARG2): "is multiple of", + int(CondOP.ARG1_IN_ARG2): "in", + int(CondOP.ARG1_NOT_IN_ARG2): "not in", +} + + +_ARGTYPE_KIND: dict[int, str] = { + int(CondArgType.ZONE): "zone", + int(CondArgType.UNIT): "unit", + int(CondArgType.THERMOSTAT): "thermostat", + int(CondArgType.AREA): "area", + int(CondArgType.TIME_DATE): "timedate", +} + + +_FIELD_LABELS: dict[tuple[str, int], str] = { + # enuZoneField + ("zone", 1): "LoopReading", + ("zone", 2): "CurrentState", + ("zone", 3): "ArmingState", + ("zone", 4): "AlarmState", + # enuUnitField + ("unit", 1): "CurrentState", + ("unit", 2): "PreviousState", + ("unit", 3): "Timer", + ("unit", 4): "Level", + # enuThermostatField + ("thermostat", 1): "Temperature", + ("thermostat", 2): "HeatSetpoint", + ("thermostat", 3): "CoolSetpoint", + ("thermostat", 4): "SystemMode", + ("thermostat", 5): "FanMode", + ("thermostat", 6): "HoldMode", + ("thermostat", 7): "FreezeAlarm", + ("thermostat", 8): "CommError", + ("thermostat", 9): "Humidity", + ("thermostat", 10): "HumidifySetpoint", + ("thermostat", 11): "DehumidifySetpoint", + ("thermostat", 12): "OutdoorTemperature", + ("thermostat", 13): "SystemStatus", +} + + +_TIMEDATE_FIELD_LABELS: dict[int, str] = { + 1: "Date", + 2: "Year", + 3: "Month", + 4: "Day", + 5: "DayOfWeek", + 6: "Time", + 7: "DST_Flag", + 8: "Hour", + 9: "Minute", + 10: "SunriseSunset", +} + + +def _format_interval(seconds: int) -> str: + """Render an EVERY-program interval. Treats the raw value as + seconds — matches the live-fixture observation that 5 SECONDS UI + selection stores as 5. Higher values fall through to natural + ``"30 min"`` / ``"2 hr"`` shortenings for readability.""" + if seconds <= 0: + return "(disabled)" + if seconds < 60: + return f"{seconds} sec" + if seconds < 3600: + return f"{seconds // 60} min" + return f"{seconds // 3600} hr" diff --git a/tests/test_program_renderer.py b/tests/test_program_renderer.py new file mode 100644 index 0000000..6be3550 --- /dev/null +++ b/tests/test_program_renderer.py @@ -0,0 +1,744 @@ +"""Tests for the structured-English program renderer. + +Coverage strategy: +* Each trigger / condition / action branch gets at least one focused + test asserting the rendered tokens (or plain-text projection). +* End-to-end tests build a Program (or ClausalChain) that mirrors what + PC Access produces and verify the renderer's output reads cleanly. +* Live-state overlay is tested separately via a small fake StateResolver + so we can assert the badges land on the right REF tokens. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +import pytest + +from omni_pca.commands import Command +from omni_pca.mock_panel import ( + MockAreaState, + MockState, + MockThermostatState, + MockUnitState, + MockZoneState, +) +from omni_pca.program_engine import ( + ClausalChain, + EVENT_AC_POWER_OFF, + event_id_unit_state, + event_id_user_macro_button, + event_id_zone_state, +) +from omni_pca.program_renderer import ( + AccountNameResolver, + MockStateResolver, + NameResolver, + ProgramRenderer, + StateResolver, + Token, + TokenKind, + _format_interval, + format_days, + tokens_to_string, +) +from omni_pca.programs import ( + CondArgType, + CondOP, + Days, + Program, + ProgramType, +) + + +# ---- Test helpers -------------------------------------------------------- + + +class _StaticNameResolver: + """Trivial name resolver — explicit name dict, useful in unit tests.""" + + def __init__(self, names: dict[tuple[str, int], str]) -> None: + self._names = names + + def name_of(self, kind: str, index: int) -> str | None: + return self._names.get((kind, index)) + + +class _StaticStateResolver: + """Trivial state resolver — explicit state dict.""" + + def __init__(self, states: dict[tuple[str, int], str]) -> None: + self._states = states + + def state_of(self, kind: str, index: int) -> str | None: + return self._states.get((kind, index)) + + +def _renderer_with( + names: dict[tuple[str, int], str] | None = None, + states: dict[tuple[str, int], str] | None = None, +) -> ProgramRenderer: + return ProgramRenderer( + names=_StaticNameResolver(names or {}), + state=_StaticStateResolver(states) if states is not None else None, + ) + + +# ---- Format helpers ------------------------------------------------------ + + +def test_format_days_everyday() -> None: + assert format_days(int( + Days.MONDAY | Days.TUESDAY | Days.WEDNESDAY | Days.THURSDAY + | Days.FRIDAY | Days.SATURDAY | Days.SUNDAY + )) == "every day" + + +def test_format_days_weekdays() -> None: + assert format_days(int( + Days.MONDAY | Days.TUESDAY | Days.WEDNESDAY | Days.THURSDAY | Days.FRIDAY + )) == "weekdays" + + +def test_format_days_weekend() -> None: + assert format_days(int(Days.SATURDAY | Days.SUNDAY)) == "weekends" + + +def test_format_days_individual_days() -> None: + assert format_days(int(Days.MONDAY | Days.WEDNESDAY | Days.FRIDAY)) == "Mon, Wed, Fri" + + +def test_format_days_zero() -> None: + assert format_days(0) == "never" + + +def test_format_interval_seconds() -> None: + assert _format_interval(5) == "5 sec" + assert _format_interval(45) == "45 sec" + + +def test_format_interval_minutes_and_hours() -> None: + assert _format_interval(300) == "5 min" + assert _format_interval(7200) == "2 hr" + + +def test_format_interval_disabled() -> None: + assert _format_interval(0) == "(disabled)" + + +# ---- Tokens to string ---------------------------------------------------- + + +def test_tokens_to_string_with_state_badge() -> None: + """REF tokens with `state` set surface as ``name [state]``.""" + tokens = [ + Token(TokenKind.KEYWORD, "WHEN"), + Token(TokenKind.TEXT, " "), + Token(TokenKind.REF, "Front Door", + entity_kind="zone", entity_id=1, state="SECURE"), + Token(TokenKind.TEXT, " is opened"), + ] + assert tokens_to_string(tokens) == "WHEN Front Door [SECURE] is opened" + + +def test_tokens_to_string_handles_newline_and_indent() -> None: + tokens = [ + Token(TokenKind.KEYWORD, "WHEN"), + Token(TokenKind.TEXT, " trigger"), + Token(TokenKind.NEWLINE, ""), + Token(TokenKind.INDENT, " "), + Token(TokenKind.KEYWORD, "AND IF"), + Token(TokenKind.TEXT, " condition"), + ] + assert tokens_to_string(tokens) == "WHEN trigger\n AND IF condition" + + +# ---- Trigger rendering --------------------------------------------------- + + +def test_render_timed_program() -> None: + """AT 06:00 weekdays → Turn ON LIVING_LAMP.""" + p = Program( + slot=42, prog_type=int(ProgramType.TIMED), + cmd=int(Command.UNIT_ON), pr2=7, + hour=6, minute=0, + days=int(Days.MONDAY | Days.TUESDAY | Days.WEDNESDAY + | Days.THURSDAY | Days.FRIDAY), + ) + r = _renderer_with(names={("unit", 7): "LIVING_LAMP"}) + text = tokens_to_string(r.render_program(p)) + assert text == "AT 06:00 weekdays\nTHEN Turn ON LIVING_LAMP" + + +def test_render_event_program_zone_state_change() -> None: + """EVENT triggered by zone 5 not-ready → unit 3 OFF.""" + evt = event_id_zone_state(5, 1) # zone 5 becomes not-ready + p = Program( + slot=10, prog_type=int(ProgramType.EVENT), + cmd=int(Command.UNIT_OFF), pr2=3, + month=(evt >> 8) & 0xFF, day=evt & 0xFF, + ) + r = _renderer_with(names={ + ("zone", 5): "FRONT_DOOR", + ("unit", 3): "PORCH_LIGHT", + }) + assert tokens_to_string(r.render_program(p)) == ( + "WHEN FRONT_DOOR becomes not ready\n" + "THEN Turn OFF PORCH_LIGHT" + ) + + +def test_render_yearly_program() -> None: + p = Program( + slot=99, prog_type=int(ProgramType.YEARLY), + cmd=int(Command.UNIT_ON), pr2=12, + month=12, day=25, hour=18, minute=30, + ) + r = _renderer_with(names={("unit", 12): "CHRISTMAS_LIGHTS"}) + assert tokens_to_string(r.render_program(p)) == ( + "ON 12/25 at 18:30\n" + "THEN Turn ON CHRISTMAS_LIGHTS" + ) + + +def test_render_remark_program() -> None: + """Remark records render as 'REMARK #N'.""" + p = Program( + slot=5, prog_type=int(ProgramType.REMARK), + remark_id=42, + ) + r = _renderer_with() + assert tokens_to_string(r.render_program(p)) == "REMARK #42" + + +def test_render_free_slot() -> None: + p = Program(slot=1, prog_type=int(ProgramType.FREE)) + r = _renderer_with() + assert tokens_to_string(r.render_program(p)) == "(empty slot)" + + +# ---- Event-ID decoding --------------------------------------------------- + + +def test_render_event_button_press() -> None: + """Button-press events render via the button name.""" + evt = event_id_user_macro_button(7) + p = Program( + slot=1, prog_type=int(ProgramType.EVENT), + cmd=int(Command.UNIT_ON), pr2=1, + month=(evt >> 8) & 0xFF, day=evt & 0xFF, + ) + r = _renderer_with(names={ + ("button", 7): "GOOD_NIGHT", + ("unit", 1): "BEDROOM_LAMP", + }) + assert tokens_to_string(r.render_program(p)).startswith( + "WHEN GOOD_NIGHT is pressed\n" + ) + + +def test_render_event_unit_state_change() -> None: + evt = event_id_unit_state(4, on=True) + p = Program( + slot=1, prog_type=int(ProgramType.EVENT), + cmd=int(Command.UNIT_OFF), pr2=5, + month=(evt >> 8) & 0xFF, day=evt & 0xFF, + ) + r = _renderer_with(names={("unit", 4): "ALARM", ("unit", 5): "SIREN"}) + assert tokens_to_string(r.render_program(p)) == ( + "WHEN ALARM turns ON\n" + "THEN Turn OFF SIREN" + ) + + +def test_render_event_ac_power_lost() -> None: + p = Program( + slot=1, prog_type=int(ProgramType.EVENT), + cmd=int(Command.UNIT_ON), pr2=1, + month=(EVENT_AC_POWER_OFF >> 8) & 0xFF, + day=EVENT_AC_POWER_OFF & 0xFF, + ) + r = _renderer_with(names={("unit", 1): "EMERGENCY_LIGHT"}) + assert tokens_to_string(r.render_program(p)) == ( + "WHEN AC power lost\n" + "THEN Turn ON EMERGENCY_LIGHT" + ) + + +# ---- Inline AND conditions (compact form) ------------------------------- + + +def test_render_timed_with_inline_zone_condition() -> None: + """TIMED program with an inline AND IF ZONE ... SECURE condition.""" + # cond = high byte: 0x04 (ZONE family), low byte: zone 5 + cond = (0x04 << 8) | 5 + p = Program( + slot=1, prog_type=int(ProgramType.TIMED), + cmd=int(Command.UNIT_ON), pr2=7, + hour=22, minute=30, + days=int(Days.MONDAY), + cond=cond, + ) + r = _renderer_with(names={ + ("zone", 5): "FRONT_DOOR", ("unit", 7): "PORCH_LIGHT", + }) + assert tokens_to_string(r.render_program(p)) == ( + "AT 22:30 Mon\n" + " AND IF FRONT_DOOR is secure\n" + "THEN Turn ON PORCH_LIGHT" + ) + + +def test_render_timed_with_inline_unit_on_condition() -> None: + """TIMED + AND IF UNIT ... ON. Compact cond high byte 0x0A = CTRL+ON.""" + cond = (0x0A << 8) | 3 # CTRL family + ON, unit 3 + p = Program( + slot=1, prog_type=int(ProgramType.TIMED), + cmd=int(Command.UNIT_ON), pr2=7, + hour=6, minute=0, + days=int(Days.MONDAY), + cond=cond, + ) + r = _renderer_with(names={ + ("unit", 3): "OCCUPANCY", ("unit", 7): "KITCHEN_LIGHT", + }) + assert tokens_to_string(r.render_program(p)) == ( + "AT 06:00 Mon\n" + " AND IF OCCUPANCY is ON\n" + "THEN Turn ON KITCHEN_LIGHT" + ) + + +# ---- Clausal chain rendering -------------------------------------------- + + +def _and_traditional(slot: int, family: int, instance: int = 0) -> Program: + return Program( + slot=slot, prog_type=int(ProgramType.AND), + cond=family & 0xFF, cond2=(instance & 0xFF) << 8, + ) + + +def _or_record(slot: int) -> Program: + """An empty OR-separator record. PC Access in practice always + bundles a condition into the OR record itself; use ``_or_traditional`` + for that case. This helper exists for the rare empty-OR cases.""" + return Program(slot=slot, prog_type=int(ProgramType.OR)) + + +def _or_traditional(slot: int, family: int, instance: int = 0) -> Program: + """OR-alternative record carrying a Traditional condition inline.""" + return Program( + slot=slot, prog_type=int(ProgramType.OR), + cond=family & 0xFF, cond2=(instance & 0xFF) << 8, + ) + + +def _then_record(slot: int, cmd: int, pr2: int, par: int = 0) -> Program: + return Program( + slot=slot, prog_type=int(ProgramType.THEN), + cmd=cmd, pr2=pr2, par=par, + ) + + +def test_render_when_chain_simple() -> None: + """WHEN button N pressed → 1 cond → 1 action.""" + evt = event_id_user_macro_button(5) + when = Program( + slot=1, prog_type=int(ProgramType.WHEN), + month=(evt >> 8) & 0xFF, day=evt & 0xFF, + ) + and_cond = _and_traditional(2, family=0x04, instance=7) # ZONE 7 secure + then = _then_record(3, int(Command.UNIT_ON), 9) + chain = ClausalChain(head=when, conditions=(and_cond,), actions=(then,)) + r = _renderer_with(names={ + ("button", 5): "GOODNIGHT", + ("zone", 7): "BACK_DOOR", + ("unit", 9): "HALLWAY", + }) + assert tokens_to_string(r.render_chain(chain)) == ( + "WHEN GOODNIGHT is pressed\n" + " AND IF BACK_DOOR is secure\n" + "THEN Turn ON HALLWAY" + ) + + +def test_render_when_chain_with_or_branch_and_multiple_actions() -> None: + """Full clausal program with OR branch and two THEN actions.""" + evt = event_id_user_macro_button(5) + when = Program( + slot=1, prog_type=int(ProgramType.WHEN), + month=(evt >> 8) & 0xFF, day=evt & 0xFF, + ) + chain = ClausalChain( + head=when, + conditions=( + _and_traditional(2, family=0x04, instance=7), # ZONE 7 secure + _or_traditional(3, family=0x0A, instance=3), # OR IF UNIT 3 ON + ), + actions=( + _then_record(4, int(Command.UNIT_ON), 9), # Turn ON HALLWAY + _then_record(5, int(Command.UNIT_OFF), 10), # Turn OFF FOYER + ), + ) + r = _renderer_with(names={ + ("button", 5): "GOODNIGHT", + ("zone", 7): "BACK_DOOR", + ("unit", 3): "MOTION", + ("unit", 9): "HALLWAY", + ("unit", 10): "FOYER", + }) + assert tokens_to_string(r.render_chain(chain)) == ( + "WHEN GOODNIGHT is pressed\n" + " AND IF BACK_DOOR is secure\n" + " OR IF MOTION is ON\n" + "THEN Turn ON HALLWAY\n" + "AND Turn OFF FOYER" + ) + + +def test_render_at_chain() -> None: + """AT-headed clausal chain with structured-English output.""" + head = Program( + slot=1, prog_type=int(ProgramType.AT), + hour=7, minute=15, days=int(Days.SATURDAY | Days.SUNDAY), + ) + chain = ClausalChain( + head=head, conditions=(), + actions=(_then_record(2, int(Command.UNIT_ON), 12),), + ) + r = _renderer_with(names={("unit", 12): "COFFEE_MAKER"}) + assert tokens_to_string(r.render_chain(chain)) == ( + "AT 07:15 weekends\n" + "THEN Turn ON COFFEE_MAKER" + ) + + +def test_render_every_chain() -> None: + head = Program( + slot=1, prog_type=int(ProgramType.EVERY), + # every_interval = ((cond & 0xFF) << 8) | ((cond2 >> 8) & 0xFF). + # For interval=60: cond=0, cond2=60<<8=0x3C00 → 60 sec = 1 min. + cond=0, cond2=60 << 8, + ) + chain = ClausalChain( + head=head, conditions=(), + actions=(_then_record(2, int(Command.UNIT_ON), 1),), + ) + r = _renderer_with(names={("unit", 1): "AERATOR"}) + assert tokens_to_string(r.render_chain(chain)) == ( + "EVERY 1 min\n" + "THEN Turn ON AERATOR" + ) + + +# ---- Structured AND/OR rendering ---------------------------------------- + + +def _and_structured( + slot: int, op: int, + arg1_type: int, arg1_ix: int, arg1_field: int, + arg2_type: int, arg2_ix: int, arg2_field: int = 0, +) -> Program: + return Program( + slot=slot, prog_type=int(ProgramType.AND), + cond=(op << 8) | arg1_type, + cond2=arg1_ix, + cmd=arg1_field, + par=arg2_type, + pr2=arg2_ix, + month=arg2_field, + ) + + +def test_render_structured_zone_current_state_eq_constant() -> None: + """AND IF Zone(5).CurrentState == 1""" + and_rec = _and_structured( + slot=1, op=int(CondOP.ARG1_EQ_ARG2), + arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2, + arg2_type=int(CondArgType.CONSTANT), arg2_ix=1, + ) + chain = ClausalChain( + head=Program(slot=0, prog_type=int(ProgramType.WHEN), + month=0, day=1), + conditions=(and_rec,), + actions=(_then_record(2, int(Command.UNIT_ON), 9),), + ) + r = _renderer_with(names={ + ("zone", 5): "FRONT_DOOR", + ("button", 1): "BTN_1", + ("unit", 9): "HALLWAY", + }) + text = tokens_to_string(r.render_chain(chain)) + assert "AND IF FRONT_DOOR.CurrentState == 1" in text + + +def test_render_structured_thermostat_temp_gt_constant() -> None: + """AND IF Thermostat(1).Temperature > 75""" + and_rec = _and_structured( + slot=1, op=int(CondOP.ARG1_GT_ARG2), + arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1, + arg2_type=int(CondArgType.CONSTANT), arg2_ix=75, + ) + chain = ClausalChain( + head=Program(slot=0, prog_type=int(ProgramType.WHEN), month=0, day=1), + conditions=(and_rec,), + actions=(_then_record(2, int(Command.UNIT_ON), 1),), + ) + r = _renderer_with(names={ + ("thermostat", 1): "DOWNSTAIRS", + ("button", 1): "BTN_1", + ("unit", 1): "AC", + }) + text = tokens_to_string(r.render_chain(chain)) + assert "AND IF DOWNSTAIRS.Temperature > 75" in text + + +def test_render_structured_timedate_hour_eq() -> None: + """AND IF TimeDate.Hour == 22""" + and_rec = _and_structured( + slot=1, op=int(CondOP.ARG1_EQ_ARG2), + arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8, + arg2_type=int(CondArgType.CONSTANT), arg2_ix=22, + ) + chain = ClausalChain( + head=Program(slot=0, prog_type=int(ProgramType.WHEN), month=0, day=1), + conditions=(and_rec,), + actions=(_then_record(2, int(Command.UNIT_ON), 1),), + ) + r = _renderer_with(names={("button", 1): "BTN", ("unit", 1): "LIGHT"}) + text = tokens_to_string(r.render_chain(chain)) + assert "AND IF Hour == 22" in text + + +# ---- Action verb rendering ---------------------------------------------- + + +def test_render_action_bypass_zone() -> None: + p = Program( + slot=1, prog_type=int(ProgramType.TIMED), + cmd=int(Command.BYPASS_ZONE), pr2=5, + hour=22, minute=0, days=int(Days.MONDAY), + ) + r = _renderer_with(names={("zone", 5): "WINDOW"}) + assert "THEN Bypass WINDOW" in tokens_to_string(r.render_program(p)) + + +def test_render_action_unit_level_with_percentage() -> None: + """UNIT_LEVEL uses ``par`` as the percentage.""" + p = Program( + slot=1, prog_type=int(ProgramType.TIMED), + cmd=int(Command.UNIT_LEVEL), pr2=7, par=50, + hour=6, minute=0, days=int(Days.MONDAY), + ) + r = _renderer_with(names={("unit", 7): "DIMMER"}) + assert "THEN Set level DIMMER to 50%" in tokens_to_string(r.render_program(p)) + + +def test_render_action_security_arm() -> None: + p = Program( + slot=1, prog_type=int(ProgramType.TIMED), + cmd=int(Command.SECURITY_AWAY), pr2=1, + hour=22, minute=0, days=int(Days.MONDAY), + ) + r = _renderer_with(names={("area", 1): "MAIN"}) + assert "THEN Arm Away MAIN" in tokens_to_string(r.render_program(p)) + + +# ---- Live-state overlay -------------------------------------------------- + + +def test_live_state_overlay_appears_in_string() -> None: + """When a state resolver is set, REF tokens get bracketed badges.""" + p = Program( + slot=1, prog_type=int(ProgramType.TIMED), + cmd=int(Command.UNIT_ON), pr2=7, + hour=6, minute=0, days=int(Days.MONDAY), + ) + r = _renderer_with( + names={("unit", 7): "LIVING_LAMP"}, + states={("unit", 7): "OFF"}, + ) + text = tokens_to_string(r.render_program(p)) + assert "Turn ON LIVING_LAMP [OFF]" in text + + +def test_live_state_overlay_tokens_carry_state_field() -> None: + """REF tokens themselves have .state populated — not just the text.""" + p = Program( + slot=1, prog_type=int(ProgramType.TIMED), + cmd=int(Command.UNIT_ON), pr2=7, + hour=6, minute=0, days=int(Days.MONDAY), + ) + r = _renderer_with( + names={("unit", 7): "LIVING_LAMP"}, + states={("unit", 7): "ON 50%"}, + ) + refs = [t for t in r.render_program(p) if t.kind == TokenKind.REF] + assert len(refs) == 1 + assert refs[0].entity_kind == "unit" + assert refs[0].entity_id == 7 + assert refs[0].state == "ON 50%" + + +def test_live_state_absent_when_resolver_returns_none() -> None: + """A resolver that doesn't know about an entity omits the badge.""" + p = Program( + slot=1, prog_type=int(ProgramType.TIMED), + cmd=int(Command.UNIT_ON), pr2=99, + hour=6, minute=0, days=int(Days.MONDAY), + ) + r = _renderer_with(states={("unit", 7): "ON"}) # nothing for unit 99 + text = tokens_to_string(r.render_program(p)) + assert "[" not in text # no badge anywhere + + +# ---- MockStateResolver end-to-end --------------------------------------- + + +def test_mock_state_resolver_zone_badge() -> None: + state = MockState(zones={ + 5: MockZoneState(name="FRONT_DOOR", current_state=1), # not-ready + }) + res = MockStateResolver(state) + assert res.name_of("zone", 5) == "FRONT_DOOR" + assert res.state_of("zone", 5) == "NOT READY" + + +def test_mock_state_resolver_unit_on_with_dim_level() -> None: + state = MockState(units={3: MockUnitState(name="DIMMER", state=150)}) + res = MockStateResolver(state) + assert res.state_of("unit", 3) == "ON 50%" + + +def test_mock_state_resolver_area_security_mode() -> None: + state = MockState(areas={1: MockAreaState(name="MAIN", mode=3)}) + res = MockStateResolver(state) + assert res.state_of("area", 1) == "Away" + + +def test_mock_state_resolver_thermostat_temperature() -> None: + state = MockState(thermostats={1: MockThermostatState(temperature_raw=170)}) + res = MockStateResolver(state) + # raw 170 / 2 - 40 = 45°F (low side of the linear scale) + assert res.state_of("thermostat", 1) == "45°F" + + +def test_mock_state_resolver_unknown_kind_returns_none() -> None: + res = MockStateResolver(MockState()) + assert res.state_of("nonexistent", 1) is None + + +# ---- AccountNameResolver end-to-end ------------------------------------- + + +def test_account_name_resolver_pulls_from_account() -> None: + @dataclass + class _AcctStub: + zone_names: dict[int, str] + unit_names: dict[int, str] + + acct = _AcctStub( + zone_names={1: "FRONT", 2: "BACK"}, + unit_names={5: "LAMP"}, + ) + res = AccountNameResolver(acct) + assert res.name_of("zone", 1) == "FRONT" + assert res.name_of("unit", 5) == "LAMP" + assert res.name_of("zone", 99) is None + assert res.name_of("area", 1) is None # no area_names on stub + + +# ---- Summary (one-liner) -------------------------------------------------- + + +def test_summarize_timed_program() -> None: + p = Program( + slot=1, prog_type=int(ProgramType.TIMED), + cmd=int(Command.UNIT_ON), pr2=7, + hour=22, minute=30, days=int(Days.MONDAY), + ) + r = _renderer_with(names={("unit", 7): "LAMP"}) + assert tokens_to_string(r.summarize_program(p)) == ( + "22:30 Mon → Turn ON LAMP" + ) + + +def test_summarize_compact_program_with_conditions() -> None: + """Summary shows count of inline conditions.""" + cond = (0x04 << 8) | 5 # AND IF zone 5 secure + p = Program( + slot=1, prog_type=int(ProgramType.TIMED), + cmd=int(Command.UNIT_ON), pr2=7, + hour=22, minute=30, days=int(Days.MONDAY), + cond=cond, + ) + r = _renderer_with(names={("unit", 7): "LAMP", ("zone", 5): "DOOR"}) + text = tokens_to_string(r.summarize_program(p)) + assert "(+1 cond)" in text + + +# ---- Live-fixture smoke test -------------------------------------------- + + +def test_renderer_handles_every_program_in_live_fixture() -> None: + """Every defined program in the live .pca fixture renders cleanly. + + This is the broadest correctness signal: 330 real homeowner-authored + programs with names, conditions, and actions, all decoded by the + same code path the HA panel will use. Skipped when the gitignored + fixture isn't on disk. + """ + from pathlib import Path + + fixture = Path("/home/kdm/home-auto/HAI/pca-re/extracted/Our_House.pca.plain") + if not fixture.is_file(): + pytest.skip(f"fixture not available: {fixture}") + from omni_pca.pca_file import KEY_EXPORT, decrypt_pca_bytes, parse_pca_file + + acct = parse_pca_file(decrypt_pca_bytes(fixture.read_bytes(), KEY_EXPORT), + key=KEY_EXPORT) + r = ProgramRenderer(names=AccountNameResolver(acct)) + defined = [p for p in acct.programs if not p.is_empty()] + assert len(defined) == 330 + + # Every program produces a non-empty summary + full render. No + # exception should escape — the renderer's job is to be informative + # even for records it doesn't fully understand. + for p in defined: + summary = tokens_to_string(r.summarize_program(p)) + full = tokens_to_string(r.render_program(p)) + assert summary + assert full + + # The first few programs in this fixture are button-press chains + # against the garage doors — confirm the rendering reads the way + # we expect ("WHEN ... is pressed AND IF ... is secure THEN ..."). + slot1 = tokens_to_string(r.render_program(acct.programs[0])) + assert slot1.startswith("WHEN ") + assert "is pressed" in slot1 + assert "\n AND IF " in slot1 + assert "\nTHEN " in slot1 + + +def test_summarize_chain() -> None: + evt = event_id_user_macro_button(5) + chain = ClausalChain( + head=Program( + slot=1, prog_type=int(ProgramType.WHEN), + month=(evt >> 8) & 0xFF, day=evt & 0xFF, + ), + conditions=( + _and_traditional(2, family=0x04, instance=7), + _and_traditional(3, family=0x0A, instance=3), + ), + actions=( + _then_record(4, int(Command.UNIT_ON), 9), + _then_record(5, int(Command.UNIT_OFF), 10), + ), + ) + r = _renderer_with(names={ + ("button", 5): "BTN", ("unit", 9): "L1", ("unit", 10): "L2", + }) + text = tokens_to_string(r.summarize_chain(chain)) + assert text == "WHEN BTN is pressed (+2 cond) → Turn ON L1 (+1 more)"