From d4c04b30446afb4366baf9d39c700321b856bc40 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Mon, 11 May 2026 19:48:00 -0600 Subject: [PATCH] programs: typed decoder/encoder for the 14-byte program record MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First reverse-engineering pass on the panel's built-in automation engine. Adds a typed Python Program dataclass that decodes/encodes the 14-byte program record used both on the wire (clsOLMsgProgramData) and on disk (the 21,000-byte Programs block in a .pca file). Coverage: * enums: ProgramType, ProgramCond, Days bitmask * Program dataclass with from_wire_bytes / from_file_record / encode_wire_bytes / encode_file_record (Mon/Day swap for EVENT-typed records applied on the file form only -- mirrors clsProgram.Read at clsProgram.cs:471, while clsProgram.ToByteArray omits the swap) * Remark variant (bytes 1-4 = BE u32 RemarkID instead of cond/cond2) * unknown ProgType / Cmd bytes pass through as raw ints with a once-per-process warning * decode_program_table for the full 1500-slot .pca block * pca_file.parse_pca_file populates PcaAccount.programs (backward- compatible: defaults to ()) * mock_panel.MockState.programs + _reply_program_data so OmniLink2 UploadProgram (opcode 9) round-trips through the test fixture Verification (422 passed, 1 skipped — was 400): * 15 unit tests in test_programs.py: golden bytes for each ProgramType, Mon/Day swap proven distinct between wire and file layouts, Remark round-trip, 500 random-input wire+file round-trips, unknown-enum tolerance * 4 fixture-gated live-data tests in test_pca_file.py: all 1500 slots decode cleanly, 330 non-empty (matches Phase 1 recon distribution 209 TIMED / 105 EVENT / 16 YEARLY), 21,000-byte byte-for-byte round-trip against the live decrypted fixture, YEARLY month/day in valid calendar ranges * 3 wire-echo tests in test_e2e_program_echo.py: client drives UploadProgram (opcode 9) through the mock, server replies with ProgramData (opcode 10) wrapping [number_hi, number_lo, body]; full Program round-trips field-by-field, empty slots return zero bodies, EVENT bytes are emitted in wire order (no swap) What this pass deliberately leaves open (documented in the docs page): * cond / cond2 internal bit split (selector vs operand) * multi-record clausal encoding (When/At/Every/And/Or/Then) * RemarkID -> RemarkText lookup table layout * DPC capability flag location for non-OPII models * TIMED time-of-day vs sunrise/sunset-relative offset flag References: * clsProgram.cs (entire) — field accessors, Read/Write, Evt u16 * enuProgramType.cs / enuProgramCond.cs / enuDays.cs * Owner's Manual SETUP chapter — user-facing programming-line model * Installation Manual SETUP MISC — installer-facing setup screen --- src/omni_pca/__init__.py | 19 +- src/omni_pca/mock_panel.py | 29 +++ src/omni_pca/pca_file.py | 41 +++- src/omni_pca/programs.py | 399 +++++++++++++++++++++++++++++++++ tests/test_e2e_program_echo.py | 136 +++++++++++ tests/test_pca_file.py | 116 ++++++++++ tests/test_programs.py | 247 ++++++++++++++++++++ 7 files changed, 981 insertions(+), 6 deletions(-) create mode 100644 src/omni_pca/programs.py create mode 100644 tests/test_e2e_program_echo.py create mode 100644 tests/test_programs.py diff --git a/src/omni_pca/__init__.py b/src/omni_pca/__init__.py index 44af214..bd2bca1 100644 --- a/src/omni_pca/__init__.py +++ b/src/omni_pca/__init__.py @@ -2,9 +2,26 @@ from importlib.metadata import PackageNotFoundError, version +from .programs import ( + Days, + Program, + ProgramCond, + ProgramType, + decode_program_table, + iter_defined, +) + try: __version__ = version("omni-pca") except PackageNotFoundError: __version__ = "0.0.0+unknown" -__all__ = ["__version__"] +__all__ = [ + "Days", + "Program", + "ProgramCond", + "ProgramType", + "__version__", + "decode_program_table", + "iter_defined", +] diff --git a/src/omni_pca/mock_panel.py b/src/omni_pca/mock_panel.py index 04afa46..5483332 100644 --- a/src/omni_pca/mock_panel.py +++ b/src/omni_pca/mock_panel.py @@ -231,6 +231,12 @@ class MockState: # matched code_index in the area's last_user field on success. user_codes: dict[int, int] = field(default_factory=dict) + # Program table — slot number (1-based) → raw 14-byte wire body. + # Wire layout (no Mon/Day swap) so a v2 ``ProgramData`` reply is a + # direct copy. Slots not present in this dict respond with 14 zero + # bytes, matching real-panel "unused slot" behavior. + programs: dict[int, bytes] = field(default_factory=dict) + # SystemStatus snapshot. Defaults: time set, battery good, no alarms. time_set: bool = True year: int = 26 # 2026 @@ -698,8 +704,31 @@ class MockPanel: return self._reply_extended_status(payload), () if opcode == OmniLink2MessageType.AcknowledgeAlerts: return _build_ack(), () + if opcode == OmniLink2MessageType.UploadProgram: + return self._reply_program_data(payload), () return _build_nak(opcode), () + def _reply_program_data(self, payload: bytes) -> Message: + """Single-shot v2 program read. + + Request payload: ``[number_hi, number_lo, request_reason]`` (3 bytes + per ``clsOL2MsgUploadProgram``). Reply payload: ``[number_hi, + number_lo] + raw_14_byte_body`` per ``clsOL2MsgProgramData``. + + If the slot is missing from ``state.programs`` we serve 14 zero + bytes — same as a real panel reporting an empty slot. + """ + if len(payload) < 2: + return _build_nak(OmniLink2MessageType.UploadProgram) + number = (payload[0] << 8) | payload[1] + body = self.state.programs.get(number, b"\x00" * 14) + if len(body) != 14: + return _build_nak(OmniLink2MessageType.UploadProgram) + return encode_v2( + OmniLink2MessageType.ProgramData, + bytes([(number >> 8) & 0xFF, number & 0xFF]) + body, + ) + # -------- reply builders (byte-exact per clsOL2Msg*.cs) -------- def _reply_system_information(self) -> Message: diff --git a/src/omni_pca/pca_file.py b/src/omni_pca/pca_file.py index 40025ab..e1a59c8 100644 --- a/src/omni_pca/pca_file.py +++ b/src/omni_pca/pca_file.py @@ -39,10 +39,20 @@ from __future__ import annotations import io import os import struct +import logging from dataclasses import dataclass, field from pathlib import Path from typing import Final +from .programs import ( + MAX_PROGRAMS, + PROGRAM_BYTES, + Program, + decode_program_table, +) + +_log = logging.getLogger(__name__) + KEY_PC01: Final[int] = 0x14326573 # 338847091 — clsPcaCfg.keyPC01 (PCA01.CFG) KEY_EXPORT: Final[int] = 0x17569237 # 391549495 — clsPcaCfg.keyExport (.pca import/export) @@ -185,6 +195,13 @@ class PcaAccount: account_phone: str = field(default="", repr=False) account_code: str = field(default="", repr=False) account_remarks: str = field(default="", repr=False) + programs: tuple[Program, ...] = () + """Decoded panel automation programs (1500 slots; many usually empty). + + Populated only when the .pca body is walked successfully and the + Programs block decodes without error. Empty tuple otherwise. Use + :func:`omni_pca.programs.iter_defined` to filter to in-use slots. + """ def parse_pca01_cfg(data: bytes, key: int = KEY_PC01) -> PcaConfig: @@ -262,9 +279,11 @@ def _parse_header(r: PcaReader) -> tuple[str, int, int, int, int, int, str, str, return version_tag, file_version, model, fw_major, fw_minor, fw_rev, name, address, phone, code, remarks -def _walk_to_connection(r: PcaReader, cap: dict[str, int]) -> None: - """Skip past SetupData, flags, Names, Voices, Programs, EventLog so the - next read lands on the Connection block. Mirrors clsHAC.cs:7995-8044.""" +def _walk_to_connection(r: PcaReader, cap: dict[str, int]) -> bytes: + """Walk SetupData, flags, Names, Voices, Programs, EventLog so the + next read lands on the Connection block. Returns the raw 21,000-byte + Programs blob so the caller can decode it; everything else is still + skipped as before. Mirrors clsHAC.cs:7995-8044.""" r.bytes_(cap["lenSetupData"]) r.bytes_(10) # bool + bool + u16 + u16 + u32 @@ -297,8 +316,9 @@ def _walk_to_connection(r: PcaReader, cap: dict[str, int]) -> None: skip_slots = max(0, max_slots - items_count) r.bytes_(struct_slots * s_b + skip_slots * k_b) - r.bytes_(cap["max_programs"] * cap["program_bytes"]) + programs_blob = r.bytes_(cap["max_programs"] * cap["program_bytes"]) r.bytes_(cap["max_event_log"] * cap["event_bytes"]) + return programs_blob def parse_pca_file(path_or_bytes: str | os.PathLike[str] | bytes, key: int) -> PcaAccount: @@ -349,7 +369,7 @@ def parse_pca_file(path_or_bytes: str | os.PathLike[str] | bytes, key: int) -> P return account try: - _walk_to_connection(r, _CAP_OMNI_PRO_II) + programs_blob = _walk_to_connection(r, _CAP_OMNI_PRO_II) network_address = r.string8_fixed(120) port_str = r.string8_fixed(5) try: @@ -358,6 +378,16 @@ def parse_pca_file(path_or_bytes: str | os.PathLike[str] | bytes, key: int) -> P network_port = 4369 key_hex = r.string8_fixed(32).ljust(32, "0")[:32] controller_key = bytes.fromhex(key_hex) + # Decode the program table — non-fatal if it fails; Connection + # block has already been read so the network/key fields land + # regardless. A malformed Programs block likely means the body + # walker misaligned for a non-OMNI_PRO_II model, in which case + # leaving programs=() is the honest answer. + try: + programs: tuple[Program, ...] = decode_program_table(programs_blob) + except Exception: + _log.warning("failed to decode Programs block", exc_info=True) + programs = () except (EOFError, ValueError): # Body layout depends on panel model; if the OMNI_PRO_II walker # misaligns for another model, leave the connection fields unset @@ -367,4 +397,5 @@ def parse_pca_file(path_or_bytes: str | os.PathLike[str] | bytes, key: int) -> P account.network_address = network_address account.network_port = network_port account.controller_key = controller_key + account.programs = programs return account diff --git a/src/omni_pca/programs.py b/src/omni_pca/programs.py new file mode 100644 index 0000000..de192c3 --- /dev/null +++ b/src/omni_pca/programs.py @@ -0,0 +1,399 @@ +"""Decoder/encoder for the Omni Pro II program record. + +A "program" is one line in the panel's built-in automation engine — +the panel-side counterpart to a Home Assistant automation. Each panel +stores up to 1500 programs in a fixed-size table: in a .pca file +they live in a 21,000-byte block (1500 × 14 bytes); on the wire they +are exchanged one at a time via :class:`clsOLMsgProgramData` / +:class:`clsOL2MsgProgramData`. + +The Omni Pro II has the ``DoubleProgramConditional`` feature flag +set, so each record is **14 bytes** with two condition slots: + +========== ============================================== +Offset Field +========== ============================================== +0 ``prog_type`` (``ProgramType`` enum) +1-2 ``cond`` (BE u16; opaque this pass) +3-4 ``cond2`` (BE u16; opaque this pass) +5 ``cmd`` (``Command`` enum from + :mod:`omni_pca.commands`) +6 ``par`` (byte parameter) +7-8 ``pr2`` (BE u16, usually object#) +9-10 ``month, day`` (or ``day, month`` in the .pca + on-disk layout when ProgType==Event; + see "Mon/Day swap" below) +11 ``days`` (``Days`` bitmask) +12 ``hour`` (0-23) +13 ``minute`` (0-59) +========== ============================================== + +When ``prog_type == Remark`` (4), bytes 1-4 hold a 32-bit BE +RemarkID instead of cond/cond2; the lookup table that resolves an +ID back to the user-visible remark text lives elsewhere on disk +and is not implemented yet. + +**Mon/Day swap (a quirk worth knowing about):** +There are two byte layouts in the wild for the same Program record. + +* **Wire layout** — ``clsOLMsgProgramData`` and + ``clsProgram.ToByteArray()``: bytes at offsets 9/10 are always + ``[month, day]``, regardless of program type. This is what the + panel sends over UDP/TCP. +* **File layout** — what ``clsProgram.Read/Write`` writes into a + ``.pca`` file: same layout *except* for ``prog_type == Event``, + where bytes 9/10 are swapped to ``[day, month]``. + +Our :class:`Program` dataclass normalises both to semantic fields +(``month`` and ``day`` always mean what their names say). Use +:meth:`Program.from_wire_bytes` / :meth:`encode_wire_bytes` for +on-the-wire messages and :meth:`Program.from_file_record` / +:meth:`encode_file_record` for ``.pca`` table slots. The split is +load-bearing for round-trip stability. + +What this module deliberately does NOT do (yet): + +* Decode the internal bit-split of ``cond`` / ``cond2`` into + selector + operand (zone#, security mode, time clock, etc.). +* Recognise the When/At/Every/And/Or/Then connector ProgTypes that + string multiple records into one user-visible "program line" — + none appear in any fixture we have, so the multi-record encoding + is still un-RE'd. +* Resolve RemarkID → RemarkText (the lookup table is on a TODO). + +References: + clsProgram.cs (entire file) — field accessors, Read/Write, + ToByteArray/FromByteArray, Mon/Day swap for Event-typed + programs at lines 471-484 and 506-515. + enuProgramType.cs — the program-type enum mirrored below. + enuProgramCond.cs — the condition-family enum. + enuDays.cs — day-of-week bitmask. + Installation Manual *INSTALLER SETUP → SETUP MISC* (Programs) + and Owner's Manual *Programming* chapter for the user-visible + model. +""" + +from __future__ import annotations + +import warnings +from dataclasses import dataclass, field +from enum import IntEnum, IntFlag + +PROGRAM_BYTES = 14 +"""On-disk and on-wire size of one program record on the Omni Pro II. + +If we ever support models without ``DoubleProgramConditional``, that +value drops to 12 — see the docstring up top for the layout +difference. +""" + +MAX_PROGRAMS = 1500 +"""Number of program slots per panel (Omni Pro II).""" + + +class ProgramType(IntEnum): + """Program record discriminator (``enuProgramType``). + + The first five values are the actual stored types; values 5..10 + are connector tokens used by PC Access's program-line editor to + string multiple records into one user-visible "line". The + multi-record encoding is not yet reverse-engineered. + """ + + FREE = 0 # unused slot (all bytes zero) + TIMED = 1 # fires at a specific time of day on selected days + EVENT = 2 # fires when a panel event occurs (zone open, etc.) + YEARLY = 3 # fires on a specific calendar date each year + REMARK = 4 # stores a 32-bit RemarkID + remark-text association + WHEN = 5 # connector (multi-record line, RE-pending) + AT = 6 # connector + EVERY = 7 # connector + AND = 8 # connector + OR = 9 # connector + THEN = 10 # connector + + +class ProgramCond(IntEnum): + """Condition family byte (``enuProgramCond``). + + The high bits of ``cond`` / ``cond2`` discriminate the family; + the low bits carry the selector / operand. We expose the family + enum here but do **not** decode the bit split — that's a future + pass once we can drive PC Access with controlled inputs. + """ + + OTHER = 0 + ZONE = 4 + CTRL = 8 + TIME = 12 + SEC = 16 + + +class Days(IntFlag): + """Day-of-week bitmask (``enuDays``). + + Note: Sunday is the high bit (0x80), not 0x01 — and Monday is + 0x02, not 0x01. ``enuDays.None`` (zero) means "no day selected". + """ + + NONE = 0 + MONDAY = 0x02 + TUESDAY = 0x04 + WEDNESDAY = 0x08 + THURSDAY = 0x10 + FRIDAY = 0x20 + SATURDAY = 0x40 + SUNDAY = 0x80 + + +# Once-per-process warnings — see _warn_unknown. +_warned_unknown: set[tuple[str, int]] = set() + + +def _warn_unknown(category: str, value: int) -> None: + """Emit a one-time UserWarning for an unrecognised enum value. + + We pass unknown bytes through as raw ints (forward-compatibility + for new ProgType / Cmd values we haven't catalogued yet) but + warn once per ``(category, value)`` pair so users notice. + """ + key = (category, value) + if key in _warned_unknown: + return + _warned_unknown.add(key) + warnings.warn( + f"unknown {category} byte {value:#04x}; passing through as raw int", + stacklevel=3, + ) + + +def _decode_common(body: bytes) -> dict[str, object]: + """Decode the fields that don't depend on the file-vs-wire layout.""" + if len(body) != PROGRAM_BYTES: + raise ValueError( + f"program record must be {PROGRAM_BYTES} bytes, got {len(body)}" + ) + + prog_type = body[0] + try: + ProgramType(prog_type) + except ValueError: + _warn_unknown("ProgramType", prog_type) + + if prog_type == ProgramType.REMARK: + # bytes 1-4 are a single BE u32 RemarkID instead of cond/cond2. + remark_id: int | None = ( + (body[1] << 24) | (body[2] << 16) | (body[3] << 8) | body[4] + ) + cond = 0 + cond2 = 0 + else: + remark_id = None + cond = (body[1] << 8) | body[2] + cond2 = (body[3] << 8) | body[4] + + cmd = body[5] + par = body[6] + pr2 = (body[7] << 8) | body[8] + days = body[11] + hour = body[12] + minute = body[13] + return { + "prog_type": prog_type, + "cond": cond, + "cond2": cond2, + "cmd": cmd, + "par": par, + "pr2": pr2, + "days": days, + "hour": hour, + "minute": minute, + "remark_id": remark_id, + } + + +def _encode_common(p: Program) -> bytearray: + """Encode the layout-independent fields into a fresh 14-byte buffer. + + Bytes 9 and 10 (month/day) are left zero — the layout-specific + encoder fills them in. + """ + buf = bytearray(PROGRAM_BYTES) + buf[0] = p.prog_type & 0xFF + if p.prog_type == ProgramType.REMARK and p.remark_id is not None: + rid = p.remark_id & 0xFFFFFFFF + buf[1] = (rid >> 24) & 0xFF + buf[2] = (rid >> 16) & 0xFF + buf[3] = (rid >> 8) & 0xFF + buf[4] = rid & 0xFF + else: + buf[1] = (p.cond >> 8) & 0xFF + buf[2] = p.cond & 0xFF + buf[3] = (p.cond2 >> 8) & 0xFF + buf[4] = p.cond2 & 0xFF + buf[5] = p.cmd & 0xFF + buf[6] = p.par & 0xFF + buf[7] = (p.pr2 >> 8) & 0xFF + buf[8] = p.pr2 & 0xFF + # 9, 10 filled by encode_{wire,file}_bytes + buf[11] = p.days & 0xFF + buf[12] = p.hour & 0xFF + buf[13] = p.minute & 0xFF + return buf + + +@dataclass(frozen=True, slots=True) +class Program: + """One programming line, decoded into semantic fields. + + Field semantics deliberately match the C# accessor names on + ``clsProgram`` so cross-referencing the reverse-engineered source + is mechanical. + + ``slot`` is the table index (1-based to match PC Access's + "program number") when the record came from a .pca file or a + wire ``ProgramData`` reply. ``None`` for hand-built Programs. + + ``remark_id`` is set only when ``prog_type == REMARK``; ``cond`` + and ``cond2`` are then zeroed in the dataclass (the 32-bit ID + lives in those wire bytes instead). + + **A note on ``month`` / ``day`` for EVENT programs:** for the + YEARLY and TIMED program types, ``month`` and ``day`` carry their + obvious calendar semantics — and the file decoder applies the + Mon/Day byte swap so the field values are always semantically + correct. For EVENT programs (``prog_type == EVENT``), the two + bytes at offsets 9-10 instead encode a 16-bit *event identifier* + (see ``clsProgram.Evt`` at clsProgram.cs:152-163). The fields + still hold the raw byte values — what would have been the + "month" byte ends up in ``self.month`` and "day" byte in + ``self.day`` — but they don't mean calendar month/day. Use the + :attr:`event_id` property to read them as the intended u16. + """ + + slot: int | None = None + prog_type: int = 0 + cond: int = 0 + cond2: int = 0 + cmd: int = 0 + par: int = 0 + pr2: int = 0 + month: int = 0 + day: int = 0 + days: int = 0 + hour: int = 0 + minute: int = 0 + remark_id: int | None = None + + # ---- decode ------------------------------------------------------ + + @classmethod + def from_wire_bytes(cls, body: bytes, *, slot: int | None = None) -> Program: + """Decode a Program from the on-the-wire 14-byte body. + + "Wire" here means the payload that ``clsOLMsgProgramData`` + sends after its 2-byte BE ProgramNumber header — bytes 9/10 + are always ``[month, day]`` regardless of ``prog_type``. + """ + f = _decode_common(body) + return cls(slot=slot, month=body[9], day=body[10], **f) # type: ignore[arg-type] + + @classmethod + def from_file_record(cls, body: bytes, *, slot: int | None = None) -> Program: + """Decode a Program from a ``.pca`` table slot. + + Same layout as wire form *except* when ``prog_type == EVENT``, + in which case the on-disk bytes at offsets 9/10 are swapped + to ``[day, month]`` (see clsProgram.Read at clsProgram.cs:471). + We swap back so the resulting Program has ``month`` and + ``day`` in semantic positions. + """ + f = _decode_common(body) + if f["prog_type"] == ProgramType.EVENT: + month, day = body[10], body[9] + else: + month, day = body[9], body[10] + return cls(slot=slot, month=month, day=day, **f) # type: ignore[arg-type] + + # ---- encode ------------------------------------------------------ + + def encode_wire_bytes(self) -> bytes: + """Encode to the on-the-wire 14-byte body (no Mon/Day swap).""" + buf = _encode_common(self) + buf[9] = self.month & 0xFF + buf[10] = self.day & 0xFF + return bytes(buf) + + def encode_file_record(self) -> bytes: + """Encode to the ``.pca`` 14-byte slot layout. + + Applies the Mon/Day swap for ``EVENT``-typed programs so the + result round-trips byte-for-byte with what + ``clsProgram.Write`` would produce. + """ + buf = _encode_common(self) + if self.prog_type == ProgramType.EVENT: + buf[9] = self.day & 0xFF + buf[10] = self.month & 0xFF + else: + buf[9] = self.month & 0xFF + buf[10] = self.day & 0xFF + return bytes(buf) + + # ---- convenience ------------------------------------------------- + + @property + def event_id(self) -> int: + """The 16-bit event identifier (only meaningful for EVENT type). + + Composed as ``(month << 8) | day`` per ``clsProgram.Evt``. For + non-EVENT program types this is a curiosity at best — it will + still be a 16-bit value but the calendar fields it draws from + carry their direct meaning instead. + """ + return ((self.month & 0xFF) << 8) | (self.day & 0xFF) + + def is_empty(self) -> bool: + """True iff the encoded record would be all-zero. + + Matches the panel's notion of a "free" slot. PC Access + treats these as available for new programs. + """ + return ( + self.prog_type == ProgramType.FREE + and self.cond == 0 + and self.cond2 == 0 + and self.cmd == 0 + and self.par == 0 + and self.pr2 == 0 + and self.month == 0 + and self.day == 0 + and self.days == 0 + and self.hour == 0 + and self.minute == 0 + and self.remark_id is None + ) + + +def decode_program_table(blob: bytes) -> tuple[Program, ...]: + """Decode a 1500-slot ``.pca`` Programs block (``21,000`` bytes). + + Each Program's ``slot`` is set to its 1-based table index — same + convention PC Access uses in its program editor. + """ + expected = MAX_PROGRAMS * PROGRAM_BYTES + if len(blob) != expected: + raise ValueError( + f"programs block must be {expected} bytes, got {len(blob)}" + ) + out: list[Program] = [] + for i in range(MAX_PROGRAMS): + off = i * PROGRAM_BYTES + record = blob[off : off + PROGRAM_BYTES] + out.append(Program.from_file_record(record, slot=i + 1)) + return tuple(out) + + +def iter_defined(programs: tuple[Program, ...]): + """Yield only non-empty programs (slots actually in use).""" + return (p for p in programs if not p.is_empty()) diff --git a/tests/test_e2e_program_echo.py b/tests/test_e2e_program_echo.py new file mode 100644 index 0000000..932dc0b --- /dev/null +++ b/tests/test_e2e_program_echo.py @@ -0,0 +1,136 @@ +"""End-to-end wire round-trip: client → MockPanel → program decoded. + +Seeds the MockPanel with a known :class:`Program`, drives the v2 +``UploadProgram`` opcode through a real TCP socket, and asserts the +decoded round-trip equals the seeded Program. Proves the on-the-wire +framing (2-byte BE ProgramNumber header + 14-byte body wrapped in a +``ProgramData`` reply) lines up with our decoder. +""" + +from __future__ import annotations + +import struct + +import pytest + +from omni_pca.connection import OmniConnection +from omni_pca.mock_panel import MockPanel, MockState +from omni_pca.opcodes import OmniLink2MessageType +from omni_pca.programs import Days, Program, ProgramType + +CONTROLLER_KEY = bytes.fromhex("000102030405060708090a0b0c0d0e0f") + + +def _seeded() -> Program: + """A TIMED program with non-trivial fields in every slot. + + Picks values that would fail if any byte got swapped or zeroed. + """ + return Program( + slot=42, + prog_type=int(ProgramType.TIMED), + cond=0x8D09, + cond2=0x9B09, + cmd=0x44, + par=3, + pr2=0x0100, + month=8, + day=12, + days=int(Days.MONDAY | Days.TUESDAY | Days.WEDNESDAY | Days.THURSDAY | Days.FRIDAY), + hour=7, + minute=15, + ) + + +@pytest.mark.asyncio +async def test_v2_upload_program_round_trips_through_mock_panel() -> None: + seeded = _seeded() + panel = MockPanel( + controller_key=CONTROLLER_KEY, + state=MockState(programs={42: seeded.encode_wire_bytes()}), + ) + async with ( + panel.serve(transport="tcp") as (host, port), + OmniConnection( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as conn, + ): + # UploadProgram request body: [number_hi, number_lo, request_reason] + payload = struct.pack(">HB", 42, 0) + reply = await conn.request(OmniLink2MessageType.UploadProgram, payload) + assert reply.opcode == int(OmniLink2MessageType.ProgramData) + + # Reply payload: [number_hi, number_lo] + 14-byte body + assert len(reply.payload) == 2 + 14 + echoed_number = (reply.payload[0] << 8) | reply.payload[1] + assert echoed_number == 42 + + decoded = Program.from_wire_bytes(reply.payload[2:], slot=42) + + # Compare field-by-field — slot was passed through unchanged. + assert decoded.prog_type == seeded.prog_type + assert decoded.cond == seeded.cond + assert decoded.cond2 == seeded.cond2 + assert decoded.cmd == seeded.cmd + assert decoded.par == seeded.par + assert decoded.pr2 == seeded.pr2 + assert decoded.month == seeded.month + assert decoded.day == seeded.day + assert decoded.days == seeded.days + assert decoded.hour == seeded.hour + assert decoded.minute == seeded.minute + + +@pytest.mark.asyncio +async def test_v2_upload_program_empty_slot_returns_zero_body() -> None: + """An unseeded slot should respond with 14 zero bytes (matches real panel).""" + panel = MockPanel(controller_key=CONTROLLER_KEY, state=MockState()) + async with ( + panel.serve(transport="tcp") as (host, port), + OmniConnection( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as conn, + ): + payload = struct.pack(">HB", 99, 0) + reply = await conn.request(OmniLink2MessageType.UploadProgram, payload) + assert reply.opcode == int(OmniLink2MessageType.ProgramData) + assert reply.payload == bytes([0, 99]) + b"\x00" * 14 + decoded = Program.from_wire_bytes(reply.payload[2:], slot=99) + assert decoded.is_empty() + + +@pytest.mark.asyncio +async def test_v2_upload_program_event_type_no_swap_on_wire() -> None: + """EVENT-typed programs must NOT swap Mon/Day on the wire (clsOLMsgProgramData + doesn't apply the file-layout swap).""" + seeded = Program( + slot=7, + prog_type=int(ProgramType.EVENT), + cond=0x0C04, + cmd=int(OmniLink2MessageType.Ack), # arbitrary; just non-zero + month=5, # in WIRE layout: byte 9 = month, byte 10 = day + day=12, + ) + panel = MockPanel( + controller_key=CONTROLLER_KEY, + state=MockState(programs={7: seeded.encode_wire_bytes()}), + ) + async with ( + panel.serve(transport="tcp") as (host, port), + OmniConnection( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as conn, + ): + payload = struct.pack(">HB", 7, 0) + reply = await conn.request(OmniLink2MessageType.UploadProgram, payload) + body = reply.payload[2:] + # Byte 9 should be 5 (month), byte 10 should be 12 (day) -- the + # exact wire-layout encoding of an EVENT program with month=5, + # day=12. If the mock swapped (treating it as file layout), we'd + # see byte 9 = 12 and byte 10 = 5. + assert body[9] == 5 + assert body[10] == 12 + # And the decoded values match what we seeded. + decoded = Program.from_wire_bytes(body, slot=7) + assert decoded.month == 5 + assert decoded.day == 12 diff --git a/tests/test_pca_file.py b/tests/test_pca_file.py index 560ca05..93434a3 100644 --- a/tests/test_pca_file.py +++ b/tests/test_pca_file.py @@ -94,6 +94,122 @@ def test_full_pca_parse_against_real_fixture() -> None: pass +# ---- Programs block extraction against the live decrypted fixture ---- +# +# These tests need the plaintext .pca dump at the path below — gitignored. +# If absent, they skip cleanly. If present, they assert the decode against +# the values established in the Phase 1 RE pass (Programs block, slot 22, +# the TIMED/EVENT/YEARLY type-distribution counts). + +_FIXTURE = "/home/kdm/home-auto/HAI/pca-re/extracted/Our_House.pca.plain" + + +def _load_programs_blob_or_skip() -> bytes: + from pathlib import Path + + p = Path(_FIXTURE) + if not p.exists(): + pytest.skip(f"fixture not available: {_FIXTURE}") + from omni_pca.pca_file import ( + _CAP_OMNI_PRO_II, + PcaReader, + _parse_header, + _walk_to_connection, + ) + + r = PcaReader(p.read_bytes()) + _parse_header(r) + return _walk_to_connection(r, _CAP_OMNI_PRO_II) + + +def test_programs_block_decodes_against_live_fixture() -> None: + """All 1500 slots decode without raising; counts match Phase 1 recon.""" + from collections import Counter + + from omni_pca.programs import ProgramType, decode_program_table, iter_defined + + blob = _load_programs_blob_or_skip() + assert len(blob) == 1500 * 14 + + programs = decode_program_table(blob) + assert len(programs) == 1500 + defined = list(iter_defined(programs)) + assert len(defined) == 330 + + types = Counter(p.prog_type for p in defined) + assert types[int(ProgramType.TIMED)] == 209 + assert types[int(ProgramType.EVENT)] == 105 + assert types[int(ProgramType.YEARLY)] == 16 + + +def test_programs_block_round_trips_byte_for_byte() -> None: + """The strongest correctness signal: decode → encode → compare. + + If a single byte of the 21,000-byte blob is off, this test catches it. + """ + from omni_pca.programs import decode_program_table + + blob = _load_programs_blob_or_skip() + programs = decode_program_table(blob) + rebuilt = b"".join(p.encode_file_record() for p in programs) + assert rebuilt == blob + + +def test_programs_sanity_invariants() -> None: + """Coarse invariants on the 330 defined programs. + + The byte-for-byte round-trip test above is the load-bearing + correctness signal. This adds light coverage to catch a Mon/Day + swap regression specifically on YEARLY-typed programs (which use + bytes 9/10 as a true calendar date). + + What we DON'T assert and why: + + * **EVENT** programs encode a u16 event identifier in bytes 9/10 + (see ``clsProgram.Evt`` at lines 152-163), not a calendar date. + * **TIMED** programs use bytes 12/13 either as absolute hour:minute + (0-23 : 0-59) *or* as a sunrise/sunset-relative offset + (Owner's Manual: ±0-120 minutes), with a flag we haven't + reverse-engineered yet. So hour=26 / minute=246 are valid wire + values in the absence of that flag decoder. + """ + from omni_pca.programs import ProgramType, decode_program_table, iter_defined + + blob = _load_programs_blob_or_skip() + programs = decode_program_table(blob) + defined = list(iter_defined(programs)) + + yearly = [p for p in defined if p.prog_type == int(ProgramType.YEARLY)] + assert yearly, "fixture should have YEARLY programs" + for p in yearly: + assert 1 <= p.month <= 12, ( + f"slot {p.slot} YEARLY: month={p.month}" + ) + assert 1 <= p.day <= 31, ( + f"slot {p.slot} YEARLY: day={p.day}" + ) + + +def test_pca_account_dataclass_has_programs_field() -> None: + """``PcaAccount`` exposes ``programs`` with the expected type + default. + + Verifies the API surface without needing a working .pca decrypt + key — the integration from raw blob through ``decode_program_table`` + is covered by the other three live-fixture tests above. + """ + from omni_pca.pca_file import PcaAccount + + fields = {f.name: f for f in PcaAccount.__dataclass_fields__.values()} + assert "programs" in fields + # default_factory or default — the field should be an empty tuple + # when no programs are decoded. + inst = PcaAccount( + version_tag="PCA03", file_version=3, + model=16, firmware_major=2, firmware_minor=12, firmware_revision=1, + ) + assert inst.programs == () + + def test_pca_reader_io_state_introspection() -> None: r = PcaReader(b"abcdef") assert isinstance(r.buf, io.BytesIO) diff --git a/tests/test_programs.py b/tests/test_programs.py new file mode 100644 index 0000000..0dc2f1d --- /dev/null +++ b/tests/test_programs.py @@ -0,0 +1,247 @@ +"""Unit tests for omni_pca.programs. + +Three layers of evidence (no external oracle, so we triangulate): + +* **Golden bytes per ProgramType** — hand-curated byte vectors that + exercise the layout-specific paths (Mon/Day swap for Event, Remark + variant with RemarkID at bytes 1-4). +* **Round-trip property** — random 14-byte inputs survive + ``decode(b).encode() == b`` for both wire and file layouts. +* **Unknown-enum tolerance** — bytes outside ``ProgramType`` / + ``Command`` enum domains pass through without raising. +""" + +from __future__ import annotations + +import os +import struct +import warnings + +import pytest + +from omni_pca.programs import ( + MAX_PROGRAMS, + PROGRAM_BYTES, + Days, + Program, + ProgramType, + decode_program_table, + iter_defined, +) + +# ---- golden bytes --------------------------------------------------------- + + +def test_timed_decodes_canonical_example() -> None: + """The worked example from the docs page — TIMED program.""" + body = bytes.fromhex("018d099b094403010008 0c3e070f".replace(" ", "")) + p = Program.from_file_record(body, slot=22) + assert p.slot == 22 + assert p.prog_type == ProgramType.TIMED + assert p.cond == 0x8D09 + assert p.cond2 == 0x9B09 + assert p.cmd == 0x44 + assert p.par == 3 + assert p.pr2 == 0x0100 + assert p.month == 8 + assert p.day == 12 + assert p.days == 0x3E + assert p.hour == 7 + assert p.minute == 15 + assert p.remark_id is None + # round-trip on file form + assert p.encode_file_record() == body + + +def test_event_swaps_mon_day_on_file_layout() -> None: + """EVENT programs store [day, month] at offsets 9/10 on disk. + + File bytes 9-10 = ``05 0c`` should decode to day=5, month=12 (not + month=5, day=12). Encoding back must preserve the swap so the + raw .pca slot bytes don't drift. + """ + body = bytes.fromhex("020c04000001010000050cff070f") # 14 bytes + assert len(body) == 14 + p = Program.from_file_record(body, slot=1) + assert p.prog_type == ProgramType.EVENT + # Disk had [05, 0c] but EVENT swap means [day, mon]. + assert p.day == 5 + assert p.month == 12 + # Round-trip MUST re-apply the swap. + assert p.encode_file_record() == body + + +def test_event_no_swap_on_wire_layout() -> None: + """Same EVENT-type bytes via the wire decoder: NO swap. + + On the wire ``clsOLMsgProgramData`` always stores [month, day] at + offsets 9/10 regardless of prog_type — only the .pca file form + swaps. This test catches a regression where we accidentally swap + in the wire path. + """ + body = bytes.fromhex("020c04000001010000050cff070f") + p = Program.from_wire_bytes(body, slot=1) + assert p.prog_type == ProgramType.EVENT + # Wire form: byte 9 = month, byte 10 = day. NO swap. + assert p.month == 5 + assert p.day == 12 + assert p.encode_wire_bytes() == body + + +def test_yearly_program() -> None: + """YEARLY programs use month + day fields semantically — no swap.""" + body = bytes.fromhex("03000000000100008b010a0f00000001")[:14] + p = Program.from_file_record(body, slot=10) + assert p.prog_type == ProgramType.YEARLY + assert p.month == 0x01 + assert p.day == 0x0A + assert p.encode_file_record() == body + + +def test_remark_uses_bytes_1_to_4_as_remark_id() -> None: + """REMARK programs (prog_type=4) pack a 32-bit BE RemarkID into + bytes 1-4 in place of cond + cond2.""" + remark_id = 0xDEADBEEF + body = ( + bytes([int(ProgramType.REMARK)]) + + struct.pack(">I", remark_id) + + bytes(9) + ) + assert len(body) == 14 + p = Program.from_file_record(body) + assert p.prog_type == ProgramType.REMARK + assert p.remark_id == remark_id + # cond / cond2 are zeroed in the dataclass — the bytes there are + # the RemarkID, not condition fields. + assert p.cond == 0 + assert p.cond2 == 0 + # Round-trip restores the RemarkID bytes verbatim. + assert p.encode_file_record() == body + + +def test_all_zero_slot_is_empty() -> None: + """A free slot decodes cleanly and round-trips.""" + p = Program.from_file_record(b"\x00" * 14, slot=999) + assert p.is_empty() + assert p.prog_type == ProgramType.FREE + assert p.encode_file_record() == b"\x00" * 14 + assert p.encode_wire_bytes() == b"\x00" * 14 + + +# ---- round-trip property --------------------------------------------------- + + +def test_random_round_trip_wire() -> None: + """500 random 14-byte inputs: ``decode_wire → encode_wire == input``. + + The wire path is the simpler one (no Mon/Day swap), so it should + round-trip every byte pattern losslessly. + """ + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + for _ in range(500): + body = os.urandom(14) + # Skip Remark inputs in this round — the dataclass discards + # cond/cond2 for Remark types and re-derives them from + # remark_id, but with no separate cond field we'd lose + # bytes that happen to differ; the next test covers Remark + # explicitly. + if body[0] == int(ProgramType.REMARK): + continue + p = Program.from_wire_bytes(body) + assert p.encode_wire_bytes() == body + + +def test_random_round_trip_file() -> None: + """500 random 14-byte inputs through the file (Mon/Day swap) form.""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + for _ in range(500): + body = os.urandom(14) + if body[0] == int(ProgramType.REMARK): + continue + p = Program.from_file_record(body) + assert p.encode_file_record() == body + + +def test_remark_round_trip() -> None: + """Remark variant round-trip — explicitly, with random RemarkIDs.""" + for _ in range(200): + remark_id_bytes = os.urandom(4) + body = ( + bytes([int(ProgramType.REMARK)]) + + remark_id_bytes + + os.urandom(9) + ) + p_file = Program.from_file_record(body) + assert p_file.encode_file_record() == body + p_wire = Program.from_wire_bytes(body) + assert p_wire.encode_wire_bytes() == body + + +# ---- unknown-enum tolerance ------------------------------------------------ + + +def test_unknown_prog_type_passes_through_with_warning() -> None: + """Bytes outside ProgramType (0..10) decode to a raw int + warning. + + Reset the once-per-process cache first; otherwise earlier random + round-trip tests may have already seen this value and silenced + the warning. + """ + import omni_pca.programs as pm + pm._warned_unknown.clear() + body = bytes([0x42]) + bytes(13) + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + p = Program.from_wire_bytes(body) + assert p.prog_type == 0x42 + assert p.encode_wire_bytes() == body + assert any("ProgramType" in str(w.message) for w in caught) + + +def test_unknown_cmd_passes_through() -> None: + """Unrecognised cmd bytes decode without raising.""" + body = bytes([int(ProgramType.TIMED), 0, 0, 0, 0, 0xFA, 0]) + bytes(7) + p = Program.from_wire_bytes(body) + assert p.cmd == 0xFA + assert p.encode_wire_bytes() == body + + +# ---- table decode ---------------------------------------------------------- + + +def test_decode_program_table_size_validation() -> None: + with pytest.raises(ValueError, match="must be 21000 bytes"): + decode_program_table(b"\x00" * 100) + + +def test_decode_program_table_round_trip_all_zero() -> None: + """All-zero 21000-byte blob round-trips, slot numbers are 1..1500.""" + blob = b"\x00" * (MAX_PROGRAMS * PROGRAM_BYTES) + programs = decode_program_table(blob) + assert len(programs) == MAX_PROGRAMS + assert programs[0].slot == 1 + assert programs[-1].slot == MAX_PROGRAMS + assert all(p.is_empty() for p in programs) + assert list(iter_defined(programs)) == [] + rebuilt = b"".join(p.encode_file_record() for p in programs) + assert rebuilt == blob + + +def test_iter_defined_filters_empty() -> None: + p1 = Program(prog_type=int(ProgramType.TIMED), slot=1, cmd=1, hour=8) + p2 = Program(slot=2) # empty + p3 = Program(prog_type=int(ProgramType.EVENT), slot=3, cmd=2) + defined = list(iter_defined((p1, p2, p3))) + assert defined == [p1, p3] + + +# ---- Days bitmask sanity -------------------------------------------------- + + +def test_days_bitmask_values() -> None: + """Sanity-check the enuDays values against the C# definition.""" + assert Days.MONDAY == 0x02 + assert Days.SUNDAY == 0x80 + assert Days.MONDAY | Days.WEDNESDAY | Days.FRIDAY == 0x2A