program_renderer: structured-English token streams for HA UI
Some checks are pending
Validate / HACS validation (push) Waiting to run
Validate / Hassfest (push) Waiting to run

Phase A of the HA-side program viewer: a Python rendering library that
turns Program records and ClausalChains into structured-English text,
modeled on PC Access's program editor:

    WHEN OPEN BIG GAR is pressed
      AND IF BIG GARAGE DOOR is secure
    THEN Turn ON BGD

Output is a Token stream rather than a flat string so the HA frontend
can identify object references (REF tokens carry entity_kind + entity_id
for navigation) and overlay live state (REF tokens carry an optional
state string consumers render as "Front Door [SECURE]").

Resolver protocols (NameResolver, StateResolver) decouple the renderer
from any specific state model. Two adapters ship:

* AccountNameResolver — wraps a PcaAccount for offline .pca rendering
* MockStateResolver — wraps a MockState; implements both protocols,
  produces sensible live-state labels (SECURE / NOT READY / BYPASSED
  for zones; OFF / ON / ON 60% for units; Day / Night / Away for
  areas; "72°F" for thermostats)

Both compact-form programs (TIMED / EVENT / YEARLY / REMARK / FREE)
and multi-record clausal chains (WHEN / AT / EVERY + AND / OR / THEN)
render correctly. Each supports a one-line summary form for list
views and a multi-line full form for detail panels.

Decode coverage matches StateEvaluator's:
* Traditional ANDs: ZONE secure/not-ready, CTRL on/off, TIME enabled/
  disabled, SEC area-mode, OTHER (NEVER, LIGHT/DARK, AC_POWER_*, etc.)
* Structured ANDs: Arg1 OP Arg2 with full field-label decoration
  ("FRONT_DOOR.CurrentState == 1", "DOWNSTAIRS.Temperature > 75")
* Event IDs: USER_MACRO_BUTTON, ZONE_STATE_CHANGE, UNIT_STATE_CHANGE,
  AC_POWER_*, PHONE_* — natural-language phrases per category
* Commands: friendly verbs (Turn ON, Bypass, Set level X% to Y, Arm
  Away, etc.) with object-kind-aware reference rendering

Live-fixture smoke test renders all 330 real programs from the
homeowner's .pca with zero errors. Real button-press programs come
out reading like English documentation of what they do.

Full suite: 624 passed, 1 skipped (up from 581, 43 renderer tests).
This commit is contained in:
Ryan Malloy 2026-05-14 02:58:51 -06:00
parent 172aa2974a
commit 0026c5b00a
2 changed files with 1628 additions and 0 deletions

View File

@ -0,0 +1,884 @@
"""Structured-English rendering of HAI Omni panel programs.
The decoded :class:`omni_pca.programs.Program` records produced by
``pca_file`` and the wire upload paths carry every byte but no narrative.
This module turns them into readable sentences modelled on PC Access's
program editor:
WHEN Front Door is opened
AND IF Living Room Motion is secure
AND IF after sunset
OR IF Bedtime Mode is active
THEN Turn ON Hallway Light
AND Show Message "WELCOME HOME"
Output is a sequence of :class:`Token` records rather than a flat string
so that consumers (CLI, HA frontend, anything else) can:
* Identify object references (zones / units / areas / thermostats /
buttons / messages) render each as a clickable link to the entity
page, badge them with live state, etc.
* Style keywords (`WHEN`, `AND IF`, `THEN`) separately from object
names and values.
* Recover plain text trivially via ``"".join(t.text for t in tokens)``.
A :class:`ProgramRenderer` is constructed with a :class:`NameResolver`
and an optional :class:`StateResolver` for the live-state overlay. The
two resolvers are protocols (any object with the right methods works);
the convenience :class:`AccountNameResolver` adapts a :class:`PcaAccount`
and :class:`MockStateResolver` adapts a :class:`MockState` together
those cover the two common consumers (offline ``.pca`` snapshot vs.
running mock panel) without forcing either into a base class.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Iterable, Protocol, runtime_checkable
from .commands import Command
from .programs import (
CondArgType,
CondOP,
Days,
MiscConditional,
Program,
ProgramCond,
ProgramType,
TimeKind,
)
from .program_engine import (
ClausalChain,
EVENT_AC_POWER_OFF,
EVENT_AC_POWER_ON,
EVENT_PHONE_DEAD,
EVENT_PHONE_OFF_HOOK,
EVENT_PHONE_ON_HOOK,
EVENT_PHONE_RINGING,
)
# --------------------------------------------------------------------------
# Token stream
# --------------------------------------------------------------------------
class TokenKind:
"""String constants for :attr:`Token.kind`. Defined as a class of
str constants so consumers can do ``if t.kind == TokenKind.REF``."""
KEYWORD: str = "keyword" # WHEN, AND IF, THEN, OR IF, etc.
OPERATOR: str = "operator" # is, ==, >, after, before, …
REF: str = "ref" # an object reference (zone / unit / …)
VALUE: str = "value" # a literal value (time, number, mode name)
TEXT: str = "text" # plain prose connectors
INDENT: str = "indent" # leading whitespace for the next line
NEWLINE: str = "newline" # end-of-line
@dataclass(frozen=True, slots=True)
class Token:
"""One unit of structured-English output.
``text`` is what the consumer prints. The other fields are
metadata; only ``REF`` tokens use them all.
For ``REF`` tokens:
* ``entity_kind`` is one of ``"zone"`` / ``"unit"`` / ``"area"``
/ ``"thermostat"`` / ``"button"`` / ``"message"``
/ ``"code"`` / ``"timeclock"``
* ``entity_id`` is the 1-based slot the reference resolves to
* ``state`` is the live-state overlay string when a state
resolver was provided (e.g. ``"SECURE"``, ``"ON 60%"``,
``"Off"``); ``None`` when no overlay is available
For non-REF tokens, ``entity_kind`` / ``entity_id`` / ``state`` are ``None``.
"""
kind: str
text: str
entity_kind: str | None = None
entity_id: int | None = None
state: str | None = None
def tokens_to_string(tokens: Iterable[Token]) -> str:
"""Render a token stream to plain text. Useful for logs / dumps."""
pieces: list[str] = []
for t in tokens:
if t.kind == TokenKind.NEWLINE:
pieces.append("\n")
elif t.kind == TokenKind.INDENT:
pieces.append(t.text)
else:
pieces.append(t.text)
if t.kind == TokenKind.REF and t.state is not None:
pieces.append(f" [{t.state}]")
return "".join(pieces)
# --------------------------------------------------------------------------
# Resolver protocols + default implementations
# --------------------------------------------------------------------------
@runtime_checkable
class NameResolver(Protocol):
"""Translate a (kind, 1-based-index) reference into a human name.
Returns the name string when known, or ``None`` when the slot is
undefined / the kind isn't supported. The renderer falls back to
a generated label (``"Zone 5"``, ``"Unit 7"``) when the resolver
returns ``None``.
"""
def name_of(self, kind: str, index: int) -> str | None: ...
@runtime_checkable
class StateResolver(Protocol):
"""Translate a (kind, 1-based-index) reference into a live-state
overlay string. Returns ``None`` when no overlay applies the
renderer omits the bracketed annotation in that case.
"""
def state_of(self, kind: str, index: int) -> str | None: ...
class AccountNameResolver:
"""Resolves names from a :class:`omni_pca.pca_file.PcaAccount`.
Works as both a static-snapshot view (offline ``.pca`` inspection)
and as a fallback for the HA path when only header data is loaded.
"""
def __init__(self, account) -> None:
self._account = account
def name_of(self, kind: str, index: int) -> str | None:
table = {
"zone": getattr(self._account, "zone_names", {}),
"unit": getattr(self._account, "unit_names", {}),
"area": getattr(self._account, "area_names", {}),
"thermostat": getattr(self._account, "thermostat_names", {}),
"button": getattr(self._account, "button_names", {}),
"message": getattr(self._account, "message_names", {}),
"code": getattr(self._account, "code_names", {}),
}.get(kind, {})
return table.get(index)
class MockStateResolver:
"""Resolves both names and live state from a :class:`MockState`.
Implements both :class:`NameResolver` and :class:`StateResolver`
so the same object covers both roles when rendering against a
running mock panel.
"""
def __init__(self, state) -> None:
self._state = state
def name_of(self, kind: str, index: int) -> str | None:
getter = {
"zone": getattr(self._state, "zones", {}).get,
"unit": getattr(self._state, "units", {}).get,
"area": getattr(self._state, "areas", {}).get,
"thermostat": getattr(self._state, "thermostats", {}).get,
"button": getattr(self._state, "buttons", {}).get,
}.get(kind)
if getter is None:
return None
obj = getter(index)
return getattr(obj, "name", None) if obj else None
def state_of(self, kind: str, index: int) -> str | None:
if kind == "zone":
z = self._state.zones.get(index)
if z is None:
return None
if z.is_bypassed:
return "BYPASSED"
return "NOT READY" if z.current_state != 0 else "SECURE"
if kind == "unit":
u = self._state.units.get(index)
if u is None:
return None
if u.state == 0:
return "OFF"
if u.state >= 100:
return f"ON {u.state - 100}%"
return "ON"
if kind == "area":
a = self._state.areas.get(index)
if a is None:
return None
return _SECURITY_MODE_NAMES.get(a.mode, f"mode {a.mode}")
if kind == "thermostat":
t = self._state.thermostats.get(index)
if t is None or t.temperature_raw == 0:
return None
# Linear scale on Omni: temp_raw / 2 - 40 = °F.
return f"{t.temperature_raw // 2 - 40}°F"
return None
_SECURITY_MODE_NAMES: dict[int, str] = {
0: "Off",
1: "Day",
2: "Night",
3: "Away",
4: "Vacation",
5: "Day Instant",
6: "Night Delayed",
}
# --------------------------------------------------------------------------
# Helpers — friendly names for fixed enums
# --------------------------------------------------------------------------
_DAY_BIT_LABELS: tuple[tuple[int, str], ...] = (
(int(Days.MONDAY), "Mon"),
(int(Days.TUESDAY), "Tue"),
(int(Days.WEDNESDAY), "Wed"),
(int(Days.THURSDAY), "Thu"),
(int(Days.FRIDAY), "Fri"),
(int(Days.SATURDAY), "Sat"),
(int(Days.SUNDAY), "Sun"),
)
_ALL_DAYS_MASK: int = sum(b for b, _ in _DAY_BIT_LABELS)
_WEEKDAYS_MASK: int = int(
Days.MONDAY | Days.TUESDAY | Days.WEDNESDAY | Days.THURSDAY | Days.FRIDAY
)
_WEEKEND_MASK: int = int(Days.SATURDAY | Days.SUNDAY)
def format_days(mask: int) -> str:
"""Render a Days bitmask as a friendly schedule string.
Common patterns get short names; anything else is the abbreviated
weekday list (``"Mon, Wed, Fri"``).
"""
if mask == 0:
return "never"
if mask & _ALL_DAYS_MASK == _ALL_DAYS_MASK:
return "every day"
if mask & _ALL_DAYS_MASK == _WEEKDAYS_MASK:
return "weekdays"
if mask & _ALL_DAYS_MASK == _WEEKEND_MASK:
return "weekends"
parts = [label for bit, label in _DAY_BIT_LABELS if mask & bit]
return ", ".join(parts) if parts else "(no days)"
# Command → ("verb", expects_pr2_object_kind) lookup. ``None`` for the
# second element means "no object reference" — the command's parameters
# are the action's payload alone.
_COMMAND_VERBS: dict[int, tuple[str, str | None]] = {
int(Command.UNIT_OFF): ("Turn OFF", "unit"),
int(Command.UNIT_ON): ("Turn ON", "unit"),
int(Command.ALL_OFF): ("Turn ALL OFF", None),
int(Command.ALL_ON): ("Turn ALL ON", None),
int(Command.BYPASS_ZONE): ("Bypass", "zone"),
int(Command.RESTORE_ZONE): ("Restore", "zone"),
int(Command.RESTORE_ALL_ZONES): ("Restore all zones", None),
int(Command.EXECUTE_BUTTON): ("Execute button", "button"),
int(Command.UNIT_LEVEL): ("Set level", "unit"),
int(Command.UNIT_RAMP): ("Ramp", "unit"),
int(Command.DIM_STEP): ("Dim", "unit"),
int(Command.BRIGHT_STEP): ("Brighten", "unit"),
int(Command.SECURITY_OFF): ("Disarm", "area"),
int(Command.SECURITY_DAY): ("Arm Day", "area"),
int(Command.SECURITY_NIGHT): ("Arm Night", "area"),
int(Command.SECURITY_AWAY): ("Arm Away", "area"),
int(Command.SECURITY_VACATION): ("Arm Vacation", "area"),
int(Command.SECURITY_DAY_INSTANT): ("Arm Day Instant", "area"),
int(Command.SECURITY_NIGHT_DELAYED): ("Arm Night Delayed", "area"),
}
# --------------------------------------------------------------------------
# The renderer
# --------------------------------------------------------------------------
@dataclass
class ProgramRenderer:
"""Render :class:`Program` records and clausal chains as token streams.
Parameters
----------
names:
Object-name resolver (zones, units, areas, thermostats, buttons,
messages, codes). Pass an :class:`AccountNameResolver` for
offline ``.pca`` snapshots or a :class:`MockStateResolver` for
the mock-panel case.
state:
Optional live-state resolver. When provided, every ``REF`` token
carries a ``state`` annotation that consumers can render as a
badge (``"Front Door [SECURE]"`` etc.).
"""
names: NameResolver
state: StateResolver | None = None
# ---- public API ------------------------------------------------------
def render_program(self, p: Program) -> list[Token]:
"""Render a single compact-form program (TIMED / EVENT / YEARLY).
Returns the multi-line full form. For a one-line summary, see
:meth:`summarize_program`.
"""
out: list[Token] = []
try:
kind = ProgramType(p.prog_type)
except ValueError:
out.append(Token(TokenKind.TEXT, f"Unknown program type {p.prog_type}"))
return out
if kind == ProgramType.TIMED:
self._emit_timed_header(p, out)
elif kind == ProgramType.EVENT:
self._emit_event_header(p, out)
elif kind == ProgramType.YEARLY:
self._emit_yearly_header(p, out)
elif kind == ProgramType.REMARK:
self._emit_remark(p, out)
return out
elif kind == ProgramType.FREE:
out.append(Token(TokenKind.TEXT, "(empty slot)"))
return out
else:
# Multi-record record on its own — caller should use
# render_chain instead. Be helpful rather than silent.
out.append(Token(
TokenKind.TEXT,
f"(multi-record {kind.name} — render with render_chain)",
))
return out
# Compact-form programs can carry up to two inline AND conditions
# in their cond / cond2 fields. Skip when both are zero.
for slot_idx, field_val in (("cond", p.cond), ("cond2", p.cond2)):
if field_val == 0:
continue
out.append(Token(TokenKind.NEWLINE, ""))
out.append(Token(TokenKind.INDENT, " "))
out.append(Token(TokenKind.KEYWORD, "AND IF"))
out.append(Token(TokenKind.TEXT, " "))
self._emit_traditional_cond(field_val, out)
out.append(Token(TokenKind.NEWLINE, ""))
out.append(Token(TokenKind.KEYWORD, "THEN"))
out.append(Token(TokenKind.TEXT, " "))
self._emit_action(p, out)
return out
def render_chain(self, chain: ClausalChain) -> list[Token]:
"""Render a multi-record clausal chain (WHEN/AT/EVERY + body).
Output mirrors PC Access's structured-English: trigger on the
first line, conditions indented two spaces with ``AND IF`` /
``OR IF`` keywords, actions on their own lines under ``THEN`` /
``AND``.
"""
out: list[Token] = []
head = chain.head
head_kind = head.prog_type
if head_kind == int(ProgramType.WHEN):
self._emit_when_header(head, out)
elif head_kind == int(ProgramType.AT):
self._emit_at_header(head, out)
elif head_kind == int(ProgramType.EVERY):
self._emit_every_header(head, out)
else:
out.append(Token(TokenKind.TEXT, f"(chain head type {head_kind}?)"))
# Conditions: AND IF / OR IF, indented.
for cond in chain.conditions:
out.append(Token(TokenKind.NEWLINE, ""))
out.append(Token(TokenKind.INDENT, " "))
keyword = "OR IF" if cond.prog_type == int(ProgramType.OR) else "AND IF"
out.append(Token(TokenKind.KEYWORD, keyword))
out.append(Token(TokenKind.TEXT, " "))
self._emit_and_record(cond, out)
# Actions: first one prefixed THEN, rest AND.
for i, action in enumerate(chain.actions):
out.append(Token(TokenKind.NEWLINE, ""))
out.append(Token(TokenKind.KEYWORD, "THEN" if i == 0 else "AND"))
out.append(Token(TokenKind.TEXT, " "))
self._emit_action(action, out)
return out
def summarize_program(self, p: Program) -> list[Token]:
"""One-line summary suitable for the list view.
Format: ``<trigger summary> <action summary>``. Conditions
on compact-form programs are elided with ``(+N conds)``.
"""
out: list[Token] = []
try:
kind = ProgramType(p.prog_type)
except ValueError:
out.append(Token(TokenKind.TEXT, f"?type {p.prog_type}"))
return out
if kind == ProgramType.TIMED:
self._emit_timed_summary(p, out)
elif kind == ProgramType.EVENT:
self._emit_event_summary(p, out)
elif kind == ProgramType.YEARLY:
self._emit_yearly_summary(p, out)
elif kind == ProgramType.REMARK:
self._emit_remark(p, out)
return out
elif kind == ProgramType.FREE:
out.append(Token(TokenKind.TEXT, "(empty)"))
return out
else:
out.append(Token(TokenKind.TEXT, kind.name))
return out
# Inline condition count.
cond_count = (1 if p.cond else 0) + (1 if p.cond2 else 0)
if cond_count:
out.append(Token(TokenKind.TEXT, f" (+{cond_count} cond)"))
out.append(Token(TokenKind.TEXT, ""))
self._emit_action(p, out)
return out
def summarize_chain(self, chain: ClausalChain) -> list[Token]:
"""One-line summary of a clausal chain for the list view."""
out: list[Token] = []
head = chain.head
if head.prog_type == int(ProgramType.WHEN):
out.append(Token(TokenKind.KEYWORD, "WHEN"))
out.append(Token(TokenKind.TEXT, " "))
self._emit_event(head.event_id, out)
elif head.prog_type == int(ProgramType.AT):
out.append(Token(TokenKind.KEYWORD, "AT"))
out.append(Token(TokenKind.TEXT, " "))
out.append(Token(TokenKind.VALUE, head.format_time()))
out.append(Token(TokenKind.TEXT, " "))
out.append(Token(TokenKind.VALUE, format_days(head.days)))
elif head.prog_type == int(ProgramType.EVERY):
out.append(Token(TokenKind.KEYWORD, "EVERY"))
out.append(Token(TokenKind.TEXT, " "))
out.append(Token(TokenKind.VALUE, _format_interval(head.every_interval)))
if chain.conditions:
out.append(Token(
TokenKind.TEXT,
f" (+{len(chain.conditions)} cond)",
))
out.append(Token(TokenKind.TEXT, ""))
# Show the first action only on summary; "+N more" if there are more.
if chain.actions:
self._emit_action(chain.actions[0], out)
if len(chain.actions) > 1:
out.append(Token(
TokenKind.TEXT,
f" (+{len(chain.actions) - 1} more)",
))
return out
# ---- emit helpers — triggers / headers -------------------------------
def _emit_timed_header(self, p: Program, out: list[Token]) -> None:
out.append(Token(TokenKind.KEYWORD, "AT"))
out.append(Token(TokenKind.TEXT, " "))
out.append(Token(TokenKind.VALUE, p.format_time()))
out.append(Token(TokenKind.TEXT, " "))
out.append(Token(TokenKind.VALUE, format_days(p.days)))
def _emit_event_header(self, p: Program, out: list[Token]) -> None:
out.append(Token(TokenKind.KEYWORD, "WHEN"))
out.append(Token(TokenKind.TEXT, " "))
self._emit_event(p.event_id, out)
def _emit_yearly_header(self, p: Program, out: list[Token]) -> None:
out.append(Token(TokenKind.KEYWORD, "ON"))
out.append(Token(TokenKind.TEXT, " "))
out.append(Token(
TokenKind.VALUE, f"{p.month:d}/{p.day:d} at {p.hour:02d}:{p.minute:02d}",
))
def _emit_remark(self, p: Program, out: list[Token]) -> None:
rid = p.remark_id if p.remark_id is not None else 0
out.append(Token(TokenKind.KEYWORD, "REMARK"))
out.append(Token(TokenKind.TEXT, f" #{rid}"))
def _emit_when_header(self, p: Program, out: list[Token]) -> None:
out.append(Token(TokenKind.KEYWORD, "WHEN"))
out.append(Token(TokenKind.TEXT, " "))
self._emit_event(p.event_id, out)
def _emit_at_header(self, p: Program, out: list[Token]) -> None:
out.append(Token(TokenKind.KEYWORD, "AT"))
out.append(Token(TokenKind.TEXT, " "))
out.append(Token(TokenKind.VALUE, p.format_time()))
out.append(Token(TokenKind.TEXT, " "))
out.append(Token(TokenKind.VALUE, format_days(p.days)))
def _emit_every_header(self, p: Program, out: list[Token]) -> None:
out.append(Token(TokenKind.KEYWORD, "EVERY"))
out.append(Token(TokenKind.TEXT, " "))
out.append(Token(TokenKind.VALUE, _format_interval(p.every_interval)))
def _emit_timed_summary(self, p: Program, out: list[Token]) -> None:
out.append(Token(TokenKind.VALUE, p.format_time()))
out.append(Token(TokenKind.TEXT, " "))
out.append(Token(TokenKind.VALUE, format_days(p.days)))
def _emit_event_summary(self, p: Program, out: list[Token]) -> None:
out.append(Token(TokenKind.KEYWORD, "WHEN"))
out.append(Token(TokenKind.TEXT, " "))
self._emit_event(p.event_id, out)
def _emit_yearly_summary(self, p: Program, out: list[Token]) -> None:
out.append(Token(
TokenKind.VALUE,
f"{p.month:d}/{p.day:d} @ {p.hour:02d}:{p.minute:02d}",
))
def _emit_event(self, event_id: int, out: list[Token]) -> None:
"""Render an event-ID as natural language.
Mirrors clsText.GetEventCategory (clsText.cs:1585-...) for the
common categories. Unknown event IDs render as ``"event 0xNNNN"``.
"""
if event_id == EVENT_PHONE_DEAD:
out.append(Token(TokenKind.TEXT, "phone line is dead"))
return
if event_id == EVENT_PHONE_RINGING:
out.append(Token(TokenKind.TEXT, "phone is ringing"))
return
if event_id == EVENT_PHONE_OFF_HOOK:
out.append(Token(TokenKind.TEXT, "phone is off hook"))
return
if event_id == EVENT_PHONE_ON_HOOK:
out.append(Token(TokenKind.TEXT, "phone is on hook"))
return
if event_id == EVENT_AC_POWER_OFF:
out.append(Token(TokenKind.TEXT, "AC power lost"))
return
if event_id == EVENT_AC_POWER_ON:
out.append(Token(TokenKind.TEXT, "AC power restored"))
return
# USER_MACRO_BUTTON (high byte == 0)
if (event_id & 0xFF00) == 0x0000:
button = event_id & 0xFF
self._emit_ref("button", button, out)
out.append(Token(TokenKind.TEXT, " is pressed"))
return
# ZONE_STATE_CHANGE (& 0xFC00 == 0x0400)
if (event_id & 0xFC00) == 0x0400:
zone_state = event_id & 0x03FF
zone = (zone_state // 4) + 1
state = zone_state % 4
self._emit_ref("zone", zone, out)
state_label = {
0: "becomes secure",
1: "becomes not ready",
2: "reports trouble",
3: "reports tamper",
}.get(state, f"changes to state {state}")
out.append(Token(TokenKind.TEXT, " " + state_label))
return
# UNIT_STATE_CHANGE (& 0xFC00 == 0x0800)
if (event_id & 0xFC00) == 0x0800:
unit_state = event_id & 0x03FF
unit = (unit_state // 2) + 1
on = unit_state & 1
self._emit_ref("unit", unit, out)
out.append(Token(TokenKind.TEXT, " turns " + ("ON" if on else "OFF")))
return
out.append(Token(TokenKind.TEXT, f"event 0x{event_id:04x}"))
# ---- emit helpers — conditions ---------------------------------------
def _emit_traditional_cond(self, cond: int, out: list[Token]) -> None:
"""Render a compact-form ``cond`` u16 (TIMED/EVENT/YEARLY inline
AND condition).
These use a different bit-layout from AND-record cond fields
see clsText.GetConditionalText (clsText.cs:2224-2274).
"""
family = (cond >> 8) & 0xFC
if family == 0:
misc = cond & 0x0F
self._emit_misc_conditional(misc, out)
return
if family == ProgramCond.ZONE:
zone = cond & 0xFF
not_ready = bool(cond & 0x0200)
self._emit_ref("zone", zone, out)
out.append(Token(TokenKind.TEXT, " is "))
out.append(Token(TokenKind.OPERATOR, "not ready" if not_ready else "secure"))
return
if family == ProgramCond.CTRL:
unit = cond & 0x01FF
on = bool(cond & 0x0200)
self._emit_ref("unit", unit, out)
out.append(Token(TokenKind.TEXT, " is "))
out.append(Token(TokenKind.OPERATOR, "ON" if on else "OFF"))
return
if family == ProgramCond.TIME:
tc = cond & 0xFF
enabled = bool(cond & 0x0200)
out.append(Token(TokenKind.TEXT, "Time clock "))
out.append(Token(TokenKind.VALUE, str(tc)))
out.append(Token(TokenKind.TEXT, " is "))
out.append(Token(TokenKind.OPERATOR,
"enabled" if enabled else "disabled"))
return
# SEC default: high nibble = mode, bits 8-11 = area.
area = (cond >> 8) & 0x0F
mode = (cond >> 12) & 0x07
if area == 0:
area = 1
self._emit_ref("area", area, out)
out.append(Token(TokenKind.TEXT, " is "))
out.append(Token(
TokenKind.VALUE,
_SECURITY_MODE_NAMES.get(mode, f"mode {mode}"),
))
def _emit_and_record(self, c: Program, out: list[Token]) -> None:
"""Render an AND/OR Program record (Traditional or Structured)."""
if c.and_op == CondOP.ARG1_TRADITIONAL:
self._emit_traditional_and(c, out)
else:
self._emit_structured_and(c, out)
def _emit_traditional_and(self, c: Program, out: list[Token]) -> None:
"""AND/OR record carrying a Traditional condition.
Encoding via clsConditionLine.Cond (clsConditionLine.cs:17-33):
``and_family`` is the family+selector byte; ``and_instance`` is
the object index (1-based).
"""
family = c.and_family
instance = c.and_instance
family_major = family & 0xFC
secondary = bool(family & 0x02)
if family_major == 0:
self._emit_misc_conditional(family & 0x0F, out)
return
if family_major == ProgramCond.ZONE:
self._emit_ref("zone", instance, out)
out.append(Token(TokenKind.TEXT, " is "))
out.append(Token(
TokenKind.OPERATOR, "not ready" if secondary else "secure",
))
return
if family_major == ProgramCond.CTRL:
self._emit_ref("unit", instance, out)
out.append(Token(TokenKind.TEXT, " is "))
out.append(Token(
TokenKind.OPERATOR, "ON" if secondary else "OFF",
))
return
if family_major == ProgramCond.TIME:
out.append(Token(TokenKind.TEXT, "Time clock "))
out.append(Token(TokenKind.VALUE, str(instance)))
out.append(Token(TokenKind.TEXT, " is "))
out.append(Token(
TokenKind.OPERATOR,
"enabled" if secondary else "disabled",
))
return
# SEC: high nibble = mode, low = area
area = family & 0x0F
mode = (family >> 4) & 0x07
if area == 0:
area = 1
self._emit_ref("area", area, out)
out.append(Token(TokenKind.TEXT, " is "))
out.append(Token(
TokenKind.VALUE,
_SECURITY_MODE_NAMES.get(mode, f"mode {mode}"),
))
def _emit_misc_conditional(self, misc_code: int, out: list[Token]) -> None:
try:
cat = MiscConditional(misc_code)
except ValueError:
out.append(Token(TokenKind.TEXT, f"misc condition {misc_code}"))
return
labels = {
MiscConditional.NONE: "always",
MiscConditional.NEVER: "never",
MiscConditional.LIGHT: "it is light outside",
MiscConditional.DARK: "it is dark outside",
MiscConditional.PHONE_DEAD: "phone line is dead",
MiscConditional.PHONE_RINGING: "phone is ringing",
MiscConditional.PHONE_OFF_HOOK: "phone is off hook",
MiscConditional.PHONE_ON_HOOK: "phone is on hook",
MiscConditional.AC_POWER_OFF: "AC power is off",
MiscConditional.AC_POWER_ON: "AC power is on",
MiscConditional.BATTERY_LOW: "battery is low",
MiscConditional.BATTERY_OK: "battery is OK",
MiscConditional.ENERGY_COST_LOW: "energy cost is low",
MiscConditional.ENERGY_COST_MID: "energy cost is mid",
MiscConditional.ENERGY_COST_HIGH: "energy cost is high",
MiscConditional.ENERGY_COST_CRITICAL: "energy cost is critical",
}
out.append(Token(TokenKind.TEXT, labels.get(cat, cat.name)))
def _emit_structured_and(self, c: Program, out: list[Token]) -> None:
"""Render an ``Arg1 OP Arg2`` AND/OR record.
For each arg side we render either an object reference + field,
or a literal value. The operator goes in between.
"""
self._emit_structured_arg(
c.and_arg1_argtype, c.and_arg1_ix, c.and_arg1_field, out,
)
out.append(Token(TokenKind.TEXT, " "))
out.append(Token(TokenKind.OPERATOR, _OP_SYMBOLS.get(c.and_op, "?")))
out.append(Token(TokenKind.TEXT, " "))
self._emit_structured_arg(
c.and_arg2_argtype, c.and_arg2_ix, c.and_arg2_field, out,
)
def _emit_structured_arg(
self, argtype: int, ix: int, field_id: int, out: list[Token],
) -> None:
if argtype == CondArgType.CONSTANT:
out.append(Token(TokenKind.VALUE, str(ix)))
return
kind = _ARGTYPE_KIND.get(argtype)
if kind is None:
out.append(Token(TokenKind.TEXT, f"argtype{argtype}#{ix}"))
return
if kind == "timedate":
field_label = _TIMEDATE_FIELD_LABELS.get(field_id, f"field{field_id}")
out.append(Token(TokenKind.TEXT, field_label))
return
# Object reference with field suffix (when known).
self._emit_ref(kind, ix, out)
field_label = _FIELD_LABELS.get((kind, field_id))
if field_label:
out.append(Token(TokenKind.TEXT, "."))
out.append(Token(TokenKind.TEXT, field_label))
# ---- emit helpers — actions ------------------------------------------
def _emit_action(self, p: Program, out: list[Token]) -> None:
"""Render the cmd / par / pr2 triple as a friendly verb.
For unrecognised commands we fall back to the raw enum name,
which keeps the rendering useful even for less-common
Command values we haven't mapped yet.
"""
cmd_byte = p.cmd
try:
cmd = Command(cmd_byte)
except ValueError:
out.append(Token(TokenKind.TEXT, f"command {cmd_byte}"))
return
verb_entry = _COMMAND_VERBS.get(cmd_byte)
verb, ref_kind = verb_entry if verb_entry else (cmd.name.replace("_", " "), None)
out.append(Token(TokenKind.KEYWORD, verb))
if ref_kind is not None:
out.append(Token(TokenKind.TEXT, " "))
self._emit_ref(ref_kind, p.pr2, out)
if cmd == Command.UNIT_LEVEL:
out.append(Token(TokenKind.TEXT, " to "))
out.append(Token(TokenKind.VALUE, f"{p.par}%"))
# ---- emit helpers — refs ---------------------------------------------
def _emit_ref(self, kind: str, index: int, out: list[Token]) -> None:
"""Emit a typed object reference token with name + live state."""
name = self.names.name_of(kind, index)
if not name:
name = f"{kind.capitalize()} {index}"
state = None
if self.state is not None:
state = self.state.state_of(kind, index)
out.append(Token(
TokenKind.REF, name,
entity_kind=kind, entity_id=index, state=state,
))
# --------------------------------------------------------------------------
# Tables — kept at module scope so they're not re-allocated per render
# --------------------------------------------------------------------------
_OP_SYMBOLS: dict[int, str] = {
int(CondOP.ARG1_EQ_ARG2): "==",
int(CondOP.ARG1_NE_ARG2): "!=",
int(CondOP.ARG1_LT_ARG2): "<",
int(CondOP.ARG1_GT_ARG2): ">",
int(CondOP.ARG1_ODD): "is odd",
int(CondOP.ARG1_EVEN): "is even",
int(CondOP.ARG1_MULTIPLE_ARG2): "is multiple of",
int(CondOP.ARG1_IN_ARG2): "in",
int(CondOP.ARG1_NOT_IN_ARG2): "not in",
}
_ARGTYPE_KIND: dict[int, str] = {
int(CondArgType.ZONE): "zone",
int(CondArgType.UNIT): "unit",
int(CondArgType.THERMOSTAT): "thermostat",
int(CondArgType.AREA): "area",
int(CondArgType.TIME_DATE): "timedate",
}
_FIELD_LABELS: dict[tuple[str, int], str] = {
# enuZoneField
("zone", 1): "LoopReading",
("zone", 2): "CurrentState",
("zone", 3): "ArmingState",
("zone", 4): "AlarmState",
# enuUnitField
("unit", 1): "CurrentState",
("unit", 2): "PreviousState",
("unit", 3): "Timer",
("unit", 4): "Level",
# enuThermostatField
("thermostat", 1): "Temperature",
("thermostat", 2): "HeatSetpoint",
("thermostat", 3): "CoolSetpoint",
("thermostat", 4): "SystemMode",
("thermostat", 5): "FanMode",
("thermostat", 6): "HoldMode",
("thermostat", 7): "FreezeAlarm",
("thermostat", 8): "CommError",
("thermostat", 9): "Humidity",
("thermostat", 10): "HumidifySetpoint",
("thermostat", 11): "DehumidifySetpoint",
("thermostat", 12): "OutdoorTemperature",
("thermostat", 13): "SystemStatus",
}
_TIMEDATE_FIELD_LABELS: dict[int, str] = {
1: "Date",
2: "Year",
3: "Month",
4: "Day",
5: "DayOfWeek",
6: "Time",
7: "DST_Flag",
8: "Hour",
9: "Minute",
10: "SunriseSunset",
}
def _format_interval(seconds: int) -> str:
"""Render an EVERY-program interval. Treats the raw value as
seconds matches the live-fixture observation that 5 SECONDS UI
selection stores as 5. Higher values fall through to natural
``"30 min"`` / ``"2 hr"`` shortenings for readability."""
if seconds <= 0:
return "(disabled)"
if seconds < 60:
return f"{seconds} sec"
if seconds < 3600:
return f"{seconds // 60} min"
return f"{seconds // 3600} hr"

View File

@ -0,0 +1,744 @@
"""Tests for the structured-English program renderer.
Coverage strategy:
* Each trigger / condition / action branch gets at least one focused
test asserting the rendered tokens (or plain-text projection).
* End-to-end tests build a Program (or ClausalChain) that mirrors what
PC Access produces and verify the renderer's output reads cleanly.
* Live-state overlay is tested separately via a small fake StateResolver
so we can assert the badges land on the right REF tokens.
"""
from __future__ import annotations
from dataclasses import dataclass
import pytest
from omni_pca.commands import Command
from omni_pca.mock_panel import (
MockAreaState,
MockState,
MockThermostatState,
MockUnitState,
MockZoneState,
)
from omni_pca.program_engine import (
ClausalChain,
EVENT_AC_POWER_OFF,
event_id_unit_state,
event_id_user_macro_button,
event_id_zone_state,
)
from omni_pca.program_renderer import (
AccountNameResolver,
MockStateResolver,
NameResolver,
ProgramRenderer,
StateResolver,
Token,
TokenKind,
_format_interval,
format_days,
tokens_to_string,
)
from omni_pca.programs import (
CondArgType,
CondOP,
Days,
Program,
ProgramType,
)
# ---- Test helpers --------------------------------------------------------
class _StaticNameResolver:
"""Trivial name resolver — explicit name dict, useful in unit tests."""
def __init__(self, names: dict[tuple[str, int], str]) -> None:
self._names = names
def name_of(self, kind: str, index: int) -> str | None:
return self._names.get((kind, index))
class _StaticStateResolver:
"""Trivial state resolver — explicit state dict."""
def __init__(self, states: dict[tuple[str, int], str]) -> None:
self._states = states
def state_of(self, kind: str, index: int) -> str | None:
return self._states.get((kind, index))
def _renderer_with(
names: dict[tuple[str, int], str] | None = None,
states: dict[tuple[str, int], str] | None = None,
) -> ProgramRenderer:
return ProgramRenderer(
names=_StaticNameResolver(names or {}),
state=_StaticStateResolver(states) if states is not None else None,
)
# ---- Format helpers ------------------------------------------------------
def test_format_days_everyday() -> None:
assert format_days(int(
Days.MONDAY | Days.TUESDAY | Days.WEDNESDAY | Days.THURSDAY
| Days.FRIDAY | Days.SATURDAY | Days.SUNDAY
)) == "every day"
def test_format_days_weekdays() -> None:
assert format_days(int(
Days.MONDAY | Days.TUESDAY | Days.WEDNESDAY | Days.THURSDAY | Days.FRIDAY
)) == "weekdays"
def test_format_days_weekend() -> None:
assert format_days(int(Days.SATURDAY | Days.SUNDAY)) == "weekends"
def test_format_days_individual_days() -> None:
assert format_days(int(Days.MONDAY | Days.WEDNESDAY | Days.FRIDAY)) == "Mon, Wed, Fri"
def test_format_days_zero() -> None:
assert format_days(0) == "never"
def test_format_interval_seconds() -> None:
assert _format_interval(5) == "5 sec"
assert _format_interval(45) == "45 sec"
def test_format_interval_minutes_and_hours() -> None:
assert _format_interval(300) == "5 min"
assert _format_interval(7200) == "2 hr"
def test_format_interval_disabled() -> None:
assert _format_interval(0) == "(disabled)"
# ---- Tokens to string ----------------------------------------------------
def test_tokens_to_string_with_state_badge() -> None:
"""REF tokens with `state` set surface as ``name [state]``."""
tokens = [
Token(TokenKind.KEYWORD, "WHEN"),
Token(TokenKind.TEXT, " "),
Token(TokenKind.REF, "Front Door",
entity_kind="zone", entity_id=1, state="SECURE"),
Token(TokenKind.TEXT, " is opened"),
]
assert tokens_to_string(tokens) == "WHEN Front Door [SECURE] is opened"
def test_tokens_to_string_handles_newline_and_indent() -> None:
tokens = [
Token(TokenKind.KEYWORD, "WHEN"),
Token(TokenKind.TEXT, " trigger"),
Token(TokenKind.NEWLINE, ""),
Token(TokenKind.INDENT, " "),
Token(TokenKind.KEYWORD, "AND IF"),
Token(TokenKind.TEXT, " condition"),
]
assert tokens_to_string(tokens) == "WHEN trigger\n AND IF condition"
# ---- Trigger rendering ---------------------------------------------------
def test_render_timed_program() -> None:
"""AT 06:00 weekdays → Turn ON LIVING_LAMP."""
p = Program(
slot=42, prog_type=int(ProgramType.TIMED),
cmd=int(Command.UNIT_ON), pr2=7,
hour=6, minute=0,
days=int(Days.MONDAY | Days.TUESDAY | Days.WEDNESDAY
| Days.THURSDAY | Days.FRIDAY),
)
r = _renderer_with(names={("unit", 7): "LIVING_LAMP"})
text = tokens_to_string(r.render_program(p))
assert text == "AT 06:00 weekdays\nTHEN Turn ON LIVING_LAMP"
def test_render_event_program_zone_state_change() -> None:
"""EVENT triggered by zone 5 not-ready → unit 3 OFF."""
evt = event_id_zone_state(5, 1) # zone 5 becomes not-ready
p = Program(
slot=10, prog_type=int(ProgramType.EVENT),
cmd=int(Command.UNIT_OFF), pr2=3,
month=(evt >> 8) & 0xFF, day=evt & 0xFF,
)
r = _renderer_with(names={
("zone", 5): "FRONT_DOOR",
("unit", 3): "PORCH_LIGHT",
})
assert tokens_to_string(r.render_program(p)) == (
"WHEN FRONT_DOOR becomes not ready\n"
"THEN Turn OFF PORCH_LIGHT"
)
def test_render_yearly_program() -> None:
p = Program(
slot=99, prog_type=int(ProgramType.YEARLY),
cmd=int(Command.UNIT_ON), pr2=12,
month=12, day=25, hour=18, minute=30,
)
r = _renderer_with(names={("unit", 12): "CHRISTMAS_LIGHTS"})
assert tokens_to_string(r.render_program(p)) == (
"ON 12/25 at 18:30\n"
"THEN Turn ON CHRISTMAS_LIGHTS"
)
def test_render_remark_program() -> None:
"""Remark records render as 'REMARK #N'."""
p = Program(
slot=5, prog_type=int(ProgramType.REMARK),
remark_id=42,
)
r = _renderer_with()
assert tokens_to_string(r.render_program(p)) == "REMARK #42"
def test_render_free_slot() -> None:
p = Program(slot=1, prog_type=int(ProgramType.FREE))
r = _renderer_with()
assert tokens_to_string(r.render_program(p)) == "(empty slot)"
# ---- Event-ID decoding ---------------------------------------------------
def test_render_event_button_press() -> None:
"""Button-press events render via the button name."""
evt = event_id_user_macro_button(7)
p = Program(
slot=1, prog_type=int(ProgramType.EVENT),
cmd=int(Command.UNIT_ON), pr2=1,
month=(evt >> 8) & 0xFF, day=evt & 0xFF,
)
r = _renderer_with(names={
("button", 7): "GOOD_NIGHT",
("unit", 1): "BEDROOM_LAMP",
})
assert tokens_to_string(r.render_program(p)).startswith(
"WHEN GOOD_NIGHT is pressed\n"
)
def test_render_event_unit_state_change() -> None:
evt = event_id_unit_state(4, on=True)
p = Program(
slot=1, prog_type=int(ProgramType.EVENT),
cmd=int(Command.UNIT_OFF), pr2=5,
month=(evt >> 8) & 0xFF, day=evt & 0xFF,
)
r = _renderer_with(names={("unit", 4): "ALARM", ("unit", 5): "SIREN"})
assert tokens_to_string(r.render_program(p)) == (
"WHEN ALARM turns ON\n"
"THEN Turn OFF SIREN"
)
def test_render_event_ac_power_lost() -> None:
p = Program(
slot=1, prog_type=int(ProgramType.EVENT),
cmd=int(Command.UNIT_ON), pr2=1,
month=(EVENT_AC_POWER_OFF >> 8) & 0xFF,
day=EVENT_AC_POWER_OFF & 0xFF,
)
r = _renderer_with(names={("unit", 1): "EMERGENCY_LIGHT"})
assert tokens_to_string(r.render_program(p)) == (
"WHEN AC power lost\n"
"THEN Turn ON EMERGENCY_LIGHT"
)
# ---- Inline AND conditions (compact form) -------------------------------
def test_render_timed_with_inline_zone_condition() -> None:
"""TIMED program with an inline AND IF ZONE ... SECURE condition."""
# cond = high byte: 0x04 (ZONE family), low byte: zone 5
cond = (0x04 << 8) | 5
p = Program(
slot=1, prog_type=int(ProgramType.TIMED),
cmd=int(Command.UNIT_ON), pr2=7,
hour=22, minute=30,
days=int(Days.MONDAY),
cond=cond,
)
r = _renderer_with(names={
("zone", 5): "FRONT_DOOR", ("unit", 7): "PORCH_LIGHT",
})
assert tokens_to_string(r.render_program(p)) == (
"AT 22:30 Mon\n"
" AND IF FRONT_DOOR is secure\n"
"THEN Turn ON PORCH_LIGHT"
)
def test_render_timed_with_inline_unit_on_condition() -> None:
"""TIMED + AND IF UNIT ... ON. Compact cond high byte 0x0A = CTRL+ON."""
cond = (0x0A << 8) | 3 # CTRL family + ON, unit 3
p = Program(
slot=1, prog_type=int(ProgramType.TIMED),
cmd=int(Command.UNIT_ON), pr2=7,
hour=6, minute=0,
days=int(Days.MONDAY),
cond=cond,
)
r = _renderer_with(names={
("unit", 3): "OCCUPANCY", ("unit", 7): "KITCHEN_LIGHT",
})
assert tokens_to_string(r.render_program(p)) == (
"AT 06:00 Mon\n"
" AND IF OCCUPANCY is ON\n"
"THEN Turn ON KITCHEN_LIGHT"
)
# ---- Clausal chain rendering --------------------------------------------
def _and_traditional(slot: int, family: int, instance: int = 0) -> Program:
return Program(
slot=slot, prog_type=int(ProgramType.AND),
cond=family & 0xFF, cond2=(instance & 0xFF) << 8,
)
def _or_record(slot: int) -> Program:
"""An empty OR-separator record. PC Access in practice always
bundles a condition into the OR record itself; use ``_or_traditional``
for that case. This helper exists for the rare empty-OR cases."""
return Program(slot=slot, prog_type=int(ProgramType.OR))
def _or_traditional(slot: int, family: int, instance: int = 0) -> Program:
"""OR-alternative record carrying a Traditional condition inline."""
return Program(
slot=slot, prog_type=int(ProgramType.OR),
cond=family & 0xFF, cond2=(instance & 0xFF) << 8,
)
def _then_record(slot: int, cmd: int, pr2: int, par: int = 0) -> Program:
return Program(
slot=slot, prog_type=int(ProgramType.THEN),
cmd=cmd, pr2=pr2, par=par,
)
def test_render_when_chain_simple() -> None:
"""WHEN button N pressed → 1 cond → 1 action."""
evt = event_id_user_macro_button(5)
when = Program(
slot=1, prog_type=int(ProgramType.WHEN),
month=(evt >> 8) & 0xFF, day=evt & 0xFF,
)
and_cond = _and_traditional(2, family=0x04, instance=7) # ZONE 7 secure
then = _then_record(3, int(Command.UNIT_ON), 9)
chain = ClausalChain(head=when, conditions=(and_cond,), actions=(then,))
r = _renderer_with(names={
("button", 5): "GOODNIGHT",
("zone", 7): "BACK_DOOR",
("unit", 9): "HALLWAY",
})
assert tokens_to_string(r.render_chain(chain)) == (
"WHEN GOODNIGHT is pressed\n"
" AND IF BACK_DOOR is secure\n"
"THEN Turn ON HALLWAY"
)
def test_render_when_chain_with_or_branch_and_multiple_actions() -> None:
"""Full clausal program with OR branch and two THEN actions."""
evt = event_id_user_macro_button(5)
when = Program(
slot=1, prog_type=int(ProgramType.WHEN),
month=(evt >> 8) & 0xFF, day=evt & 0xFF,
)
chain = ClausalChain(
head=when,
conditions=(
_and_traditional(2, family=0x04, instance=7), # ZONE 7 secure
_or_traditional(3, family=0x0A, instance=3), # OR IF UNIT 3 ON
),
actions=(
_then_record(4, int(Command.UNIT_ON), 9), # Turn ON HALLWAY
_then_record(5, int(Command.UNIT_OFF), 10), # Turn OFF FOYER
),
)
r = _renderer_with(names={
("button", 5): "GOODNIGHT",
("zone", 7): "BACK_DOOR",
("unit", 3): "MOTION",
("unit", 9): "HALLWAY",
("unit", 10): "FOYER",
})
assert tokens_to_string(r.render_chain(chain)) == (
"WHEN GOODNIGHT is pressed\n"
" AND IF BACK_DOOR is secure\n"
" OR IF MOTION is ON\n"
"THEN Turn ON HALLWAY\n"
"AND Turn OFF FOYER"
)
def test_render_at_chain() -> None:
"""AT-headed clausal chain with structured-English output."""
head = Program(
slot=1, prog_type=int(ProgramType.AT),
hour=7, minute=15, days=int(Days.SATURDAY | Days.SUNDAY),
)
chain = ClausalChain(
head=head, conditions=(),
actions=(_then_record(2, int(Command.UNIT_ON), 12),),
)
r = _renderer_with(names={("unit", 12): "COFFEE_MAKER"})
assert tokens_to_string(r.render_chain(chain)) == (
"AT 07:15 weekends\n"
"THEN Turn ON COFFEE_MAKER"
)
def test_render_every_chain() -> None:
head = Program(
slot=1, prog_type=int(ProgramType.EVERY),
# every_interval = ((cond & 0xFF) << 8) | ((cond2 >> 8) & 0xFF).
# For interval=60: cond=0, cond2=60<<8=0x3C00 → 60 sec = 1 min.
cond=0, cond2=60 << 8,
)
chain = ClausalChain(
head=head, conditions=(),
actions=(_then_record(2, int(Command.UNIT_ON), 1),),
)
r = _renderer_with(names={("unit", 1): "AERATOR"})
assert tokens_to_string(r.render_chain(chain)) == (
"EVERY 1 min\n"
"THEN Turn ON AERATOR"
)
# ---- Structured AND/OR rendering ----------------------------------------
def _and_structured(
slot: int, op: int,
arg1_type: int, arg1_ix: int, arg1_field: int,
arg2_type: int, arg2_ix: int, arg2_field: int = 0,
) -> Program:
return Program(
slot=slot, prog_type=int(ProgramType.AND),
cond=(op << 8) | arg1_type,
cond2=arg1_ix,
cmd=arg1_field,
par=arg2_type,
pr2=arg2_ix,
month=arg2_field,
)
def test_render_structured_zone_current_state_eq_constant() -> None:
"""AND IF Zone(5).CurrentState == 1"""
and_rec = _and_structured(
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=1,
)
chain = ClausalChain(
head=Program(slot=0, prog_type=int(ProgramType.WHEN),
month=0, day=1),
conditions=(and_rec,),
actions=(_then_record(2, int(Command.UNIT_ON), 9),),
)
r = _renderer_with(names={
("zone", 5): "FRONT_DOOR",
("button", 1): "BTN_1",
("unit", 9): "HALLWAY",
})
text = tokens_to_string(r.render_chain(chain))
assert "AND IF FRONT_DOOR.CurrentState == 1" in text
def test_render_structured_thermostat_temp_gt_constant() -> None:
"""AND IF Thermostat(1).Temperature > 75"""
and_rec = _and_structured(
slot=1, op=int(CondOP.ARG1_GT_ARG2),
arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=75,
)
chain = ClausalChain(
head=Program(slot=0, prog_type=int(ProgramType.WHEN), month=0, day=1),
conditions=(and_rec,),
actions=(_then_record(2, int(Command.UNIT_ON), 1),),
)
r = _renderer_with(names={
("thermostat", 1): "DOWNSTAIRS",
("button", 1): "BTN_1",
("unit", 1): "AC",
})
text = tokens_to_string(r.render_chain(chain))
assert "AND IF DOWNSTAIRS.Temperature > 75" in text
def test_render_structured_timedate_hour_eq() -> None:
"""AND IF TimeDate.Hour == 22"""
and_rec = _and_structured(
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=22,
)
chain = ClausalChain(
head=Program(slot=0, prog_type=int(ProgramType.WHEN), month=0, day=1),
conditions=(and_rec,),
actions=(_then_record(2, int(Command.UNIT_ON), 1),),
)
r = _renderer_with(names={("button", 1): "BTN", ("unit", 1): "LIGHT"})
text = tokens_to_string(r.render_chain(chain))
assert "AND IF Hour == 22" in text
# ---- Action verb rendering ----------------------------------------------
def test_render_action_bypass_zone() -> None:
p = Program(
slot=1, prog_type=int(ProgramType.TIMED),
cmd=int(Command.BYPASS_ZONE), pr2=5,
hour=22, minute=0, days=int(Days.MONDAY),
)
r = _renderer_with(names={("zone", 5): "WINDOW"})
assert "THEN Bypass WINDOW" in tokens_to_string(r.render_program(p))
def test_render_action_unit_level_with_percentage() -> None:
"""UNIT_LEVEL uses ``par`` as the percentage."""
p = Program(
slot=1, prog_type=int(ProgramType.TIMED),
cmd=int(Command.UNIT_LEVEL), pr2=7, par=50,
hour=6, minute=0, days=int(Days.MONDAY),
)
r = _renderer_with(names={("unit", 7): "DIMMER"})
assert "THEN Set level DIMMER to 50%" in tokens_to_string(r.render_program(p))
def test_render_action_security_arm() -> None:
p = Program(
slot=1, prog_type=int(ProgramType.TIMED),
cmd=int(Command.SECURITY_AWAY), pr2=1,
hour=22, minute=0, days=int(Days.MONDAY),
)
r = _renderer_with(names={("area", 1): "MAIN"})
assert "THEN Arm Away MAIN" in tokens_to_string(r.render_program(p))
# ---- Live-state overlay --------------------------------------------------
def test_live_state_overlay_appears_in_string() -> None:
"""When a state resolver is set, REF tokens get bracketed badges."""
p = Program(
slot=1, prog_type=int(ProgramType.TIMED),
cmd=int(Command.UNIT_ON), pr2=7,
hour=6, minute=0, days=int(Days.MONDAY),
)
r = _renderer_with(
names={("unit", 7): "LIVING_LAMP"},
states={("unit", 7): "OFF"},
)
text = tokens_to_string(r.render_program(p))
assert "Turn ON LIVING_LAMP [OFF]" in text
def test_live_state_overlay_tokens_carry_state_field() -> None:
"""REF tokens themselves have .state populated — not just the text."""
p = Program(
slot=1, prog_type=int(ProgramType.TIMED),
cmd=int(Command.UNIT_ON), pr2=7,
hour=6, minute=0, days=int(Days.MONDAY),
)
r = _renderer_with(
names={("unit", 7): "LIVING_LAMP"},
states={("unit", 7): "ON 50%"},
)
refs = [t for t in r.render_program(p) if t.kind == TokenKind.REF]
assert len(refs) == 1
assert refs[0].entity_kind == "unit"
assert refs[0].entity_id == 7
assert refs[0].state == "ON 50%"
def test_live_state_absent_when_resolver_returns_none() -> None:
"""A resolver that doesn't know about an entity omits the badge."""
p = Program(
slot=1, prog_type=int(ProgramType.TIMED),
cmd=int(Command.UNIT_ON), pr2=99,
hour=6, minute=0, days=int(Days.MONDAY),
)
r = _renderer_with(states={("unit", 7): "ON"}) # nothing for unit 99
text = tokens_to_string(r.render_program(p))
assert "[" not in text # no badge anywhere
# ---- MockStateResolver end-to-end ---------------------------------------
def test_mock_state_resolver_zone_badge() -> None:
state = MockState(zones={
5: MockZoneState(name="FRONT_DOOR", current_state=1), # not-ready
})
res = MockStateResolver(state)
assert res.name_of("zone", 5) == "FRONT_DOOR"
assert res.state_of("zone", 5) == "NOT READY"
def test_mock_state_resolver_unit_on_with_dim_level() -> None:
state = MockState(units={3: MockUnitState(name="DIMMER", state=150)})
res = MockStateResolver(state)
assert res.state_of("unit", 3) == "ON 50%"
def test_mock_state_resolver_area_security_mode() -> None:
state = MockState(areas={1: MockAreaState(name="MAIN", mode=3)})
res = MockStateResolver(state)
assert res.state_of("area", 1) == "Away"
def test_mock_state_resolver_thermostat_temperature() -> None:
state = MockState(thermostats={1: MockThermostatState(temperature_raw=170)})
res = MockStateResolver(state)
# raw 170 / 2 - 40 = 45°F (low side of the linear scale)
assert res.state_of("thermostat", 1) == "45°F"
def test_mock_state_resolver_unknown_kind_returns_none() -> None:
res = MockStateResolver(MockState())
assert res.state_of("nonexistent", 1) is None
# ---- AccountNameResolver end-to-end -------------------------------------
def test_account_name_resolver_pulls_from_account() -> None:
@dataclass
class _AcctStub:
zone_names: dict[int, str]
unit_names: dict[int, str]
acct = _AcctStub(
zone_names={1: "FRONT", 2: "BACK"},
unit_names={5: "LAMP"},
)
res = AccountNameResolver(acct)
assert res.name_of("zone", 1) == "FRONT"
assert res.name_of("unit", 5) == "LAMP"
assert res.name_of("zone", 99) is None
assert res.name_of("area", 1) is None # no area_names on stub
# ---- Summary (one-liner) --------------------------------------------------
def test_summarize_timed_program() -> None:
p = Program(
slot=1, prog_type=int(ProgramType.TIMED),
cmd=int(Command.UNIT_ON), pr2=7,
hour=22, minute=30, days=int(Days.MONDAY),
)
r = _renderer_with(names={("unit", 7): "LAMP"})
assert tokens_to_string(r.summarize_program(p)) == (
"22:30 Mon → Turn ON LAMP"
)
def test_summarize_compact_program_with_conditions() -> None:
"""Summary shows count of inline conditions."""
cond = (0x04 << 8) | 5 # AND IF zone 5 secure
p = Program(
slot=1, prog_type=int(ProgramType.TIMED),
cmd=int(Command.UNIT_ON), pr2=7,
hour=22, minute=30, days=int(Days.MONDAY),
cond=cond,
)
r = _renderer_with(names={("unit", 7): "LAMP", ("zone", 5): "DOOR"})
text = tokens_to_string(r.summarize_program(p))
assert "(+1 cond)" in text
# ---- Live-fixture smoke test --------------------------------------------
def test_renderer_handles_every_program_in_live_fixture() -> None:
"""Every defined program in the live .pca fixture renders cleanly.
This is the broadest correctness signal: 330 real homeowner-authored
programs with names, conditions, and actions, all decoded by the
same code path the HA panel will use. Skipped when the gitignored
fixture isn't on disk.
"""
from pathlib import Path
fixture = Path("/home/kdm/home-auto/HAI/pca-re/extracted/Our_House.pca.plain")
if not fixture.is_file():
pytest.skip(f"fixture not available: {fixture}")
from omni_pca.pca_file import KEY_EXPORT, decrypt_pca_bytes, parse_pca_file
acct = parse_pca_file(decrypt_pca_bytes(fixture.read_bytes(), KEY_EXPORT),
key=KEY_EXPORT)
r = ProgramRenderer(names=AccountNameResolver(acct))
defined = [p for p in acct.programs if not p.is_empty()]
assert len(defined) == 330
# Every program produces a non-empty summary + full render. No
# exception should escape — the renderer's job is to be informative
# even for records it doesn't fully understand.
for p in defined:
summary = tokens_to_string(r.summarize_program(p))
full = tokens_to_string(r.render_program(p))
assert summary
assert full
# The first few programs in this fixture are button-press chains
# against the garage doors — confirm the rendering reads the way
# we expect ("WHEN ... is pressed AND IF ... is secure THEN ...").
slot1 = tokens_to_string(r.render_program(acct.programs[0]))
assert slot1.startswith("WHEN ")
assert "is pressed" in slot1
assert "\n AND IF " in slot1
assert "\nTHEN " in slot1
def test_summarize_chain() -> None:
evt = event_id_user_macro_button(5)
chain = ClausalChain(
head=Program(
slot=1, prog_type=int(ProgramType.WHEN),
month=(evt >> 8) & 0xFF, day=evt & 0xFF,
),
conditions=(
_and_traditional(2, family=0x04, instance=7),
_and_traditional(3, family=0x0A, instance=3),
),
actions=(
_then_record(4, int(Command.UNIT_ON), 9),
_then_record(5, int(Command.UNIT_OFF), 10),
),
)
r = _renderer_with(names={
("button", 5): "BTN", ("unit", 9): "L1", ("unit", 10): "L2",
})
text = tokens_to_string(r.summarize_chain(chain))
assert text == "WHEN BTN is pressed (+2 cond) → Turn ON L1 (+1 more)"