programs: typed decoder/encoder for the 14-byte program record

First reverse-engineering pass on the panel's built-in automation
engine. Adds a typed Python Program dataclass that decodes/encodes the
14-byte program record used both on the wire (clsOLMsgProgramData) and
on disk (the 21,000-byte Programs block in a .pca file).

Coverage:
* enums: ProgramType, ProgramCond, Days bitmask
* Program dataclass with from_wire_bytes / from_file_record /
  encode_wire_bytes / encode_file_record (Mon/Day swap for EVENT-typed
  records applied on the file form only -- mirrors clsProgram.Read at
  clsProgram.cs:471, while clsProgram.ToByteArray omits the swap)
* Remark variant (bytes 1-4 = BE u32 RemarkID instead of cond/cond2)
* unknown ProgType / Cmd bytes pass through as raw ints with a
  once-per-process warning
* decode_program_table for the full 1500-slot .pca block
* pca_file.parse_pca_file populates PcaAccount.programs (backward-
  compatible: defaults to ())
* mock_panel.MockState.programs + _reply_program_data so OmniLink2
  UploadProgram (opcode 9) round-trips through the test fixture

Verification (422 passed, 1 skipped — was 400):
* 15 unit tests in test_programs.py: golden bytes for each ProgramType,
  Mon/Day swap proven distinct between wire and file layouts, Remark
  round-trip, 500 random-input wire+file round-trips, unknown-enum
  tolerance
* 4 fixture-gated live-data tests in test_pca_file.py: all 1500 slots
  decode cleanly, 330 non-empty (matches Phase 1 recon distribution
  209 TIMED / 105 EVENT / 16 YEARLY), 21,000-byte byte-for-byte
  round-trip against the live decrypted fixture, YEARLY month/day in
  valid calendar ranges
* 3 wire-echo tests in test_e2e_program_echo.py: client drives
  UploadProgram (opcode 9) through the mock, server replies with
  ProgramData (opcode 10) wrapping [number_hi, number_lo, body];
  full Program round-trips field-by-field, empty slots return zero
  bodies, EVENT bytes are emitted in wire order (no swap)

What this pass deliberately leaves open (documented in the docs page):
* cond / cond2 internal bit split (selector vs operand)
* multi-record clausal encoding (When/At/Every/And/Or/Then)
* RemarkID -> RemarkText lookup table layout
* DPC capability flag location for non-OPII models
* TIMED time-of-day vs sunrise/sunset-relative offset flag

References:
* clsProgram.cs (entire) — field accessors, Read/Write, Evt u16
* enuProgramType.cs / enuProgramCond.cs / enuDays.cs
* Owner's Manual SETUP chapter — user-facing programming-line model
* Installation Manual SETUP MISC — installer-facing setup screen
This commit is contained in:
Ryan Malloy 2026-05-11 19:48:00 -06:00
parent 0e3835d4ff
commit d4c04b3044
7 changed files with 981 additions and 6 deletions

View File

@ -2,9 +2,26 @@
from importlib.metadata import PackageNotFoundError, version
from .programs import (
Days,
Program,
ProgramCond,
ProgramType,
decode_program_table,
iter_defined,
)
try:
__version__ = version("omni-pca")
except PackageNotFoundError:
__version__ = "0.0.0+unknown"
__all__ = ["__version__"]
__all__ = [
"Days",
"Program",
"ProgramCond",
"ProgramType",
"__version__",
"decode_program_table",
"iter_defined",
]

View File

@ -231,6 +231,12 @@ class MockState:
# matched code_index in the area's last_user field on success.
user_codes: dict[int, int] = field(default_factory=dict)
# Program table — slot number (1-based) → raw 14-byte wire body.
# Wire layout (no Mon/Day swap) so a v2 ``ProgramData`` reply is a
# direct copy. Slots not present in this dict respond with 14 zero
# bytes, matching real-panel "unused slot" behavior.
programs: dict[int, bytes] = field(default_factory=dict)
# SystemStatus snapshot. Defaults: time set, battery good, no alarms.
time_set: bool = True
year: int = 26 # 2026
@ -698,8 +704,31 @@ class MockPanel:
return self._reply_extended_status(payload), ()
if opcode == OmniLink2MessageType.AcknowledgeAlerts:
return _build_ack(), ()
if opcode == OmniLink2MessageType.UploadProgram:
return self._reply_program_data(payload), ()
return _build_nak(opcode), ()
def _reply_program_data(self, payload: bytes) -> Message:
"""Single-shot v2 program read.
Request payload: ``[number_hi, number_lo, request_reason]`` (3 bytes
per ``clsOL2MsgUploadProgram``). Reply payload: ``[number_hi,
number_lo] + raw_14_byte_body`` per ``clsOL2MsgProgramData``.
If the slot is missing from ``state.programs`` we serve 14 zero
bytes same as a real panel reporting an empty slot.
"""
if len(payload) < 2:
return _build_nak(OmniLink2MessageType.UploadProgram)
number = (payload[0] << 8) | payload[1]
body = self.state.programs.get(number, b"\x00" * 14)
if len(body) != 14:
return _build_nak(OmniLink2MessageType.UploadProgram)
return encode_v2(
OmniLink2MessageType.ProgramData,
bytes([(number >> 8) & 0xFF, number & 0xFF]) + body,
)
# -------- reply builders (byte-exact per clsOL2Msg*.cs) --------
def _reply_system_information(self) -> Message:

View File

@ -39,10 +39,20 @@ from __future__ import annotations
import io
import os
import struct
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Final
from .programs import (
MAX_PROGRAMS,
PROGRAM_BYTES,
Program,
decode_program_table,
)
_log = logging.getLogger(__name__)
KEY_PC01: Final[int] = 0x14326573 # 338847091 — clsPcaCfg.keyPC01 (PCA01.CFG)
KEY_EXPORT: Final[int] = 0x17569237 # 391549495 — clsPcaCfg.keyExport (.pca import/export)
@ -185,6 +195,13 @@ class PcaAccount:
account_phone: str = field(default="", repr=False)
account_code: str = field(default="", repr=False)
account_remarks: str = field(default="", repr=False)
programs: tuple[Program, ...] = ()
"""Decoded panel automation programs (1500 slots; many usually empty).
Populated only when the .pca body is walked successfully and the
Programs block decodes without error. Empty tuple otherwise. Use
:func:`omni_pca.programs.iter_defined` to filter to in-use slots.
"""
def parse_pca01_cfg(data: bytes, key: int = KEY_PC01) -> PcaConfig:
@ -262,9 +279,11 @@ def _parse_header(r: PcaReader) -> tuple[str, int, int, int, int, int, str, str,
return version_tag, file_version, model, fw_major, fw_minor, fw_rev, name, address, phone, code, remarks
def _walk_to_connection(r: PcaReader, cap: dict[str, int]) -> None:
"""Skip past SetupData, flags, Names, Voices, Programs, EventLog so the
next read lands on the Connection block. Mirrors clsHAC.cs:7995-8044."""
def _walk_to_connection(r: PcaReader, cap: dict[str, int]) -> bytes:
"""Walk SetupData, flags, Names, Voices, Programs, EventLog so the
next read lands on the Connection block. Returns the raw 21,000-byte
Programs blob so the caller can decode it; everything else is still
skipped as before. Mirrors clsHAC.cs:7995-8044."""
r.bytes_(cap["lenSetupData"])
r.bytes_(10) # bool + bool + u16 + u16 + u32
@ -297,8 +316,9 @@ def _walk_to_connection(r: PcaReader, cap: dict[str, int]) -> None:
skip_slots = max(0, max_slots - items_count)
r.bytes_(struct_slots * s_b + skip_slots * k_b)
r.bytes_(cap["max_programs"] * cap["program_bytes"])
programs_blob = r.bytes_(cap["max_programs"] * cap["program_bytes"])
r.bytes_(cap["max_event_log"] * cap["event_bytes"])
return programs_blob
def parse_pca_file(path_or_bytes: str | os.PathLike[str] | bytes, key: int) -> PcaAccount:
@ -349,7 +369,7 @@ def parse_pca_file(path_or_bytes: str | os.PathLike[str] | bytes, key: int) -> P
return account
try:
_walk_to_connection(r, _CAP_OMNI_PRO_II)
programs_blob = _walk_to_connection(r, _CAP_OMNI_PRO_II)
network_address = r.string8_fixed(120)
port_str = r.string8_fixed(5)
try:
@ -358,6 +378,16 @@ def parse_pca_file(path_or_bytes: str | os.PathLike[str] | bytes, key: int) -> P
network_port = 4369
key_hex = r.string8_fixed(32).ljust(32, "0")[:32]
controller_key = bytes.fromhex(key_hex)
# Decode the program table — non-fatal if it fails; Connection
# block has already been read so the network/key fields land
# regardless. A malformed Programs block likely means the body
# walker misaligned for a non-OMNI_PRO_II model, in which case
# leaving programs=() is the honest answer.
try:
programs: tuple[Program, ...] = decode_program_table(programs_blob)
except Exception:
_log.warning("failed to decode Programs block", exc_info=True)
programs = ()
except (EOFError, ValueError):
# Body layout depends on panel model; if the OMNI_PRO_II walker
# misaligns for another model, leave the connection fields unset
@ -367,4 +397,5 @@ def parse_pca_file(path_or_bytes: str | os.PathLike[str] | bytes, key: int) -> P
account.network_address = network_address
account.network_port = network_port
account.controller_key = controller_key
account.programs = programs
return account

399
src/omni_pca/programs.py Normal file
View File

@ -0,0 +1,399 @@
"""Decoder/encoder for the Omni Pro II program record.
A "program" is one line in the panel's built-in automation engine —
the panel-side counterpart to a Home Assistant automation. Each panel
stores up to 1500 programs in a fixed-size table: in a .pca file
they live in a 21,000-byte block (1500 × 14 bytes); on the wire they
are exchanged one at a time via :class:`clsOLMsgProgramData` /
:class:`clsOL2MsgProgramData`.
The Omni Pro II has the ``DoubleProgramConditional`` feature flag
set, so each record is **14 bytes** with two condition slots:
========== ==============================================
Offset Field
========== ==============================================
0 ``prog_type`` (``ProgramType`` enum)
1-2 ``cond`` (BE u16; opaque this pass)
3-4 ``cond2`` (BE u16; opaque this pass)
5 ``cmd`` (``Command`` enum from
:mod:`omni_pca.commands`)
6 ``par`` (byte parameter)
7-8 ``pr2`` (BE u16, usually object#)
9-10 ``month, day`` (or ``day, month`` in the .pca
on-disk layout when ProgType==Event;
see "Mon/Day swap" below)
11 ``days`` (``Days`` bitmask)
12 ``hour`` (0-23)
13 ``minute`` (0-59)
========== ==============================================
When ``prog_type == Remark`` (4), bytes 1-4 hold a 32-bit BE
RemarkID instead of cond/cond2; the lookup table that resolves an
ID back to the user-visible remark text lives elsewhere on disk
and is not implemented yet.
**Mon/Day swap (a quirk worth knowing about):**
There are two byte layouts in the wild for the same Program record.
* **Wire layout** ``clsOLMsgProgramData`` and
``clsProgram.ToByteArray()``: bytes at offsets 9/10 are always
``[month, day]``, regardless of program type. This is what the
panel sends over UDP/TCP.
* **File layout** what ``clsProgram.Read/Write`` writes into a
``.pca`` file: same layout *except* for ``prog_type == Event``,
where bytes 9/10 are swapped to ``[day, month]``.
Our :class:`Program` dataclass normalises both to semantic fields
(``month`` and ``day`` always mean what their names say). Use
:meth:`Program.from_wire_bytes` / :meth:`encode_wire_bytes` for
on-the-wire messages and :meth:`Program.from_file_record` /
:meth:`encode_file_record` for ``.pca`` table slots. The split is
load-bearing for round-trip stability.
What this module deliberately does NOT do (yet):
* Decode the internal bit-split of ``cond`` / ``cond2`` into
selector + operand (zone#, security mode, time clock, etc.).
* Recognise the When/At/Every/And/Or/Then connector ProgTypes that
string multiple records into one user-visible "program line"
none appear in any fixture we have, so the multi-record encoding
is still un-RE'd.
* Resolve RemarkID RemarkText (the lookup table is on a TODO).
References:
clsProgram.cs (entire file) field accessors, Read/Write,
ToByteArray/FromByteArray, Mon/Day swap for Event-typed
programs at lines 471-484 and 506-515.
enuProgramType.cs the program-type enum mirrored below.
enuProgramCond.cs the condition-family enum.
enuDays.cs day-of-week bitmask.
Installation Manual *INSTALLER SETUP SETUP MISC* (Programs)
and Owner's Manual *Programming* chapter for the user-visible
model.
"""
from __future__ import annotations
import warnings
from dataclasses import dataclass, field
from enum import IntEnum, IntFlag
PROGRAM_BYTES = 14
"""On-disk and on-wire size of one program record on the Omni Pro II.
If we ever support models without ``DoubleProgramConditional``, that
value drops to 12 see the docstring up top for the layout
difference.
"""
MAX_PROGRAMS = 1500
"""Number of program slots per panel (Omni Pro II)."""
class ProgramType(IntEnum):
"""Program record discriminator (``enuProgramType``).
The first five values are the actual stored types; values 5..10
are connector tokens used by PC Access's program-line editor to
string multiple records into one user-visible "line". The
multi-record encoding is not yet reverse-engineered.
"""
FREE = 0 # unused slot (all bytes zero)
TIMED = 1 # fires at a specific time of day on selected days
EVENT = 2 # fires when a panel event occurs (zone open, etc.)
YEARLY = 3 # fires on a specific calendar date each year
REMARK = 4 # stores a 32-bit RemarkID + remark-text association
WHEN = 5 # connector (multi-record line, RE-pending)
AT = 6 # connector
EVERY = 7 # connector
AND = 8 # connector
OR = 9 # connector
THEN = 10 # connector
class ProgramCond(IntEnum):
"""Condition family byte (``enuProgramCond``).
The high bits of ``cond`` / ``cond2`` discriminate the family;
the low bits carry the selector / operand. We expose the family
enum here but do **not** decode the bit split that's a future
pass once we can drive PC Access with controlled inputs.
"""
OTHER = 0
ZONE = 4
CTRL = 8
TIME = 12
SEC = 16
class Days(IntFlag):
"""Day-of-week bitmask (``enuDays``).
Note: Sunday is the high bit (0x80), not 0x01 and Monday is
0x02, not 0x01. ``enuDays.None`` (zero) means "no day selected".
"""
NONE = 0
MONDAY = 0x02
TUESDAY = 0x04
WEDNESDAY = 0x08
THURSDAY = 0x10
FRIDAY = 0x20
SATURDAY = 0x40
SUNDAY = 0x80
# Once-per-process warnings — see _warn_unknown.
_warned_unknown: set[tuple[str, int]] = set()
def _warn_unknown(category: str, value: int) -> None:
"""Emit a one-time UserWarning for an unrecognised enum value.
We pass unknown bytes through as raw ints (forward-compatibility
for new ProgType / Cmd values we haven't catalogued yet) but
warn once per ``(category, value)`` pair so users notice.
"""
key = (category, value)
if key in _warned_unknown:
return
_warned_unknown.add(key)
warnings.warn(
f"unknown {category} byte {value:#04x}; passing through as raw int",
stacklevel=3,
)
def _decode_common(body: bytes) -> dict[str, object]:
"""Decode the fields that don't depend on the file-vs-wire layout."""
if len(body) != PROGRAM_BYTES:
raise ValueError(
f"program record must be {PROGRAM_BYTES} bytes, got {len(body)}"
)
prog_type = body[0]
try:
ProgramType(prog_type)
except ValueError:
_warn_unknown("ProgramType", prog_type)
if prog_type == ProgramType.REMARK:
# bytes 1-4 are a single BE u32 RemarkID instead of cond/cond2.
remark_id: int | None = (
(body[1] << 24) | (body[2] << 16) | (body[3] << 8) | body[4]
)
cond = 0
cond2 = 0
else:
remark_id = None
cond = (body[1] << 8) | body[2]
cond2 = (body[3] << 8) | body[4]
cmd = body[5]
par = body[6]
pr2 = (body[7] << 8) | body[8]
days = body[11]
hour = body[12]
minute = body[13]
return {
"prog_type": prog_type,
"cond": cond,
"cond2": cond2,
"cmd": cmd,
"par": par,
"pr2": pr2,
"days": days,
"hour": hour,
"minute": minute,
"remark_id": remark_id,
}
def _encode_common(p: Program) -> bytearray:
"""Encode the layout-independent fields into a fresh 14-byte buffer.
Bytes 9 and 10 (month/day) are left zero the layout-specific
encoder fills them in.
"""
buf = bytearray(PROGRAM_BYTES)
buf[0] = p.prog_type & 0xFF
if p.prog_type == ProgramType.REMARK and p.remark_id is not None:
rid = p.remark_id & 0xFFFFFFFF
buf[1] = (rid >> 24) & 0xFF
buf[2] = (rid >> 16) & 0xFF
buf[3] = (rid >> 8) & 0xFF
buf[4] = rid & 0xFF
else:
buf[1] = (p.cond >> 8) & 0xFF
buf[2] = p.cond & 0xFF
buf[3] = (p.cond2 >> 8) & 0xFF
buf[4] = p.cond2 & 0xFF
buf[5] = p.cmd & 0xFF
buf[6] = p.par & 0xFF
buf[7] = (p.pr2 >> 8) & 0xFF
buf[8] = p.pr2 & 0xFF
# 9, 10 filled by encode_{wire,file}_bytes
buf[11] = p.days & 0xFF
buf[12] = p.hour & 0xFF
buf[13] = p.minute & 0xFF
return buf
@dataclass(frozen=True, slots=True)
class Program:
"""One programming line, decoded into semantic fields.
Field semantics deliberately match the C# accessor names on
``clsProgram`` so cross-referencing the reverse-engineered source
is mechanical.
``slot`` is the table index (1-based to match PC Access's
"program number") when the record came from a .pca file or a
wire ``ProgramData`` reply. ``None`` for hand-built Programs.
``remark_id`` is set only when ``prog_type == REMARK``; ``cond``
and ``cond2`` are then zeroed in the dataclass (the 32-bit ID
lives in those wire bytes instead).
**A note on ``month`` / ``day`` for EVENT programs:** for the
YEARLY and TIMED program types, ``month`` and ``day`` carry their
obvious calendar semantics and the file decoder applies the
Mon/Day byte swap so the field values are always semantically
correct. For EVENT programs (``prog_type == EVENT``), the two
bytes at offsets 9-10 instead encode a 16-bit *event identifier*
(see ``clsProgram.Evt`` at clsProgram.cs:152-163). The fields
still hold the raw byte values what would have been the
"month" byte ends up in ``self.month`` and "day" byte in
``self.day`` but they don't mean calendar month/day. Use the
:attr:`event_id` property to read them as the intended u16.
"""
slot: int | None = None
prog_type: int = 0
cond: int = 0
cond2: int = 0
cmd: int = 0
par: int = 0
pr2: int = 0
month: int = 0
day: int = 0
days: int = 0
hour: int = 0
minute: int = 0
remark_id: int | None = None
# ---- decode ------------------------------------------------------
@classmethod
def from_wire_bytes(cls, body: bytes, *, slot: int | None = None) -> Program:
"""Decode a Program from the on-the-wire 14-byte body.
"Wire" here means the payload that ``clsOLMsgProgramData``
sends after its 2-byte BE ProgramNumber header bytes 9/10
are always ``[month, day]`` regardless of ``prog_type``.
"""
f = _decode_common(body)
return cls(slot=slot, month=body[9], day=body[10], **f) # type: ignore[arg-type]
@classmethod
def from_file_record(cls, body: bytes, *, slot: int | None = None) -> Program:
"""Decode a Program from a ``.pca`` table slot.
Same layout as wire form *except* when ``prog_type == EVENT``,
in which case the on-disk bytes at offsets 9/10 are swapped
to ``[day, month]`` (see clsProgram.Read at clsProgram.cs:471).
We swap back so the resulting Program has ``month`` and
``day`` in semantic positions.
"""
f = _decode_common(body)
if f["prog_type"] == ProgramType.EVENT:
month, day = body[10], body[9]
else:
month, day = body[9], body[10]
return cls(slot=slot, month=month, day=day, **f) # type: ignore[arg-type]
# ---- encode ------------------------------------------------------
def encode_wire_bytes(self) -> bytes:
"""Encode to the on-the-wire 14-byte body (no Mon/Day swap)."""
buf = _encode_common(self)
buf[9] = self.month & 0xFF
buf[10] = self.day & 0xFF
return bytes(buf)
def encode_file_record(self) -> bytes:
"""Encode to the ``.pca`` 14-byte slot layout.
Applies the Mon/Day swap for ``EVENT``-typed programs so the
result round-trips byte-for-byte with what
``clsProgram.Write`` would produce.
"""
buf = _encode_common(self)
if self.prog_type == ProgramType.EVENT:
buf[9] = self.day & 0xFF
buf[10] = self.month & 0xFF
else:
buf[9] = self.month & 0xFF
buf[10] = self.day & 0xFF
return bytes(buf)
# ---- convenience -------------------------------------------------
@property
def event_id(self) -> int:
"""The 16-bit event identifier (only meaningful for EVENT type).
Composed as ``(month << 8) | day`` per ``clsProgram.Evt``. For
non-EVENT program types this is a curiosity at best it will
still be a 16-bit value but the calendar fields it draws from
carry their direct meaning instead.
"""
return ((self.month & 0xFF) << 8) | (self.day & 0xFF)
def is_empty(self) -> bool:
"""True iff the encoded record would be all-zero.
Matches the panel's notion of a "free" slot. PC Access
treats these as available for new programs.
"""
return (
self.prog_type == ProgramType.FREE
and self.cond == 0
and self.cond2 == 0
and self.cmd == 0
and self.par == 0
and self.pr2 == 0
and self.month == 0
and self.day == 0
and self.days == 0
and self.hour == 0
and self.minute == 0
and self.remark_id is None
)
def decode_program_table(blob: bytes) -> tuple[Program, ...]:
"""Decode a 1500-slot ``.pca`` Programs block (``21,000`` bytes).
Each Program's ``slot`` is set to its 1-based table index — same
convention PC Access uses in its program editor.
"""
expected = MAX_PROGRAMS * PROGRAM_BYTES
if len(blob) != expected:
raise ValueError(
f"programs block must be {expected} bytes, got {len(blob)}"
)
out: list[Program] = []
for i in range(MAX_PROGRAMS):
off = i * PROGRAM_BYTES
record = blob[off : off + PROGRAM_BYTES]
out.append(Program.from_file_record(record, slot=i + 1))
return tuple(out)
def iter_defined(programs: tuple[Program, ...]):
"""Yield only non-empty programs (slots actually in use)."""
return (p for p in programs if not p.is_empty())

View File

@ -0,0 +1,136 @@
"""End-to-end wire round-trip: client → MockPanel → program decoded.
Seeds the MockPanel with a known :class:`Program`, drives the v2
``UploadProgram`` opcode through a real TCP socket, and asserts the
decoded round-trip equals the seeded Program. Proves the on-the-wire
framing (2-byte BE ProgramNumber header + 14-byte body wrapped in a
``ProgramData`` reply) lines up with our decoder.
"""
from __future__ import annotations
import struct
import pytest
from omni_pca.connection import OmniConnection
from omni_pca.mock_panel import MockPanel, MockState
from omni_pca.opcodes import OmniLink2MessageType
from omni_pca.programs import Days, Program, ProgramType
CONTROLLER_KEY = bytes.fromhex("000102030405060708090a0b0c0d0e0f")
def _seeded() -> Program:
"""A TIMED program with non-trivial fields in every slot.
Picks values that would fail if any byte got swapped or zeroed.
"""
return Program(
slot=42,
prog_type=int(ProgramType.TIMED),
cond=0x8D09,
cond2=0x9B09,
cmd=0x44,
par=3,
pr2=0x0100,
month=8,
day=12,
days=int(Days.MONDAY | Days.TUESDAY | Days.WEDNESDAY | Days.THURSDAY | Days.FRIDAY),
hour=7,
minute=15,
)
@pytest.mark.asyncio
async def test_v2_upload_program_round_trips_through_mock_panel() -> None:
seeded = _seeded()
panel = MockPanel(
controller_key=CONTROLLER_KEY,
state=MockState(programs={42: seeded.encode_wire_bytes()}),
)
async with (
panel.serve(transport="tcp") as (host, port),
OmniConnection(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as conn,
):
# UploadProgram request body: [number_hi, number_lo, request_reason]
payload = struct.pack(">HB", 42, 0)
reply = await conn.request(OmniLink2MessageType.UploadProgram, payload)
assert reply.opcode == int(OmniLink2MessageType.ProgramData)
# Reply payload: [number_hi, number_lo] + 14-byte body
assert len(reply.payload) == 2 + 14
echoed_number = (reply.payload[0] << 8) | reply.payload[1]
assert echoed_number == 42
decoded = Program.from_wire_bytes(reply.payload[2:], slot=42)
# Compare field-by-field — slot was passed through unchanged.
assert decoded.prog_type == seeded.prog_type
assert decoded.cond == seeded.cond
assert decoded.cond2 == seeded.cond2
assert decoded.cmd == seeded.cmd
assert decoded.par == seeded.par
assert decoded.pr2 == seeded.pr2
assert decoded.month == seeded.month
assert decoded.day == seeded.day
assert decoded.days == seeded.days
assert decoded.hour == seeded.hour
assert decoded.minute == seeded.minute
@pytest.mark.asyncio
async def test_v2_upload_program_empty_slot_returns_zero_body() -> None:
"""An unseeded slot should respond with 14 zero bytes (matches real panel)."""
panel = MockPanel(controller_key=CONTROLLER_KEY, state=MockState())
async with (
panel.serve(transport="tcp") as (host, port),
OmniConnection(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as conn,
):
payload = struct.pack(">HB", 99, 0)
reply = await conn.request(OmniLink2MessageType.UploadProgram, payload)
assert reply.opcode == int(OmniLink2MessageType.ProgramData)
assert reply.payload == bytes([0, 99]) + b"\x00" * 14
decoded = Program.from_wire_bytes(reply.payload[2:], slot=99)
assert decoded.is_empty()
@pytest.mark.asyncio
async def test_v2_upload_program_event_type_no_swap_on_wire() -> None:
"""EVENT-typed programs must NOT swap Mon/Day on the wire (clsOLMsgProgramData
doesn't apply the file-layout swap)."""
seeded = Program(
slot=7,
prog_type=int(ProgramType.EVENT),
cond=0x0C04,
cmd=int(OmniLink2MessageType.Ack), # arbitrary; just non-zero
month=5, # in WIRE layout: byte 9 = month, byte 10 = day
day=12,
)
panel = MockPanel(
controller_key=CONTROLLER_KEY,
state=MockState(programs={7: seeded.encode_wire_bytes()}),
)
async with (
panel.serve(transport="tcp") as (host, port),
OmniConnection(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as conn,
):
payload = struct.pack(">HB", 7, 0)
reply = await conn.request(OmniLink2MessageType.UploadProgram, payload)
body = reply.payload[2:]
# Byte 9 should be 5 (month), byte 10 should be 12 (day) -- the
# exact wire-layout encoding of an EVENT program with month=5,
# day=12. If the mock swapped (treating it as file layout), we'd
# see byte 9 = 12 and byte 10 = 5.
assert body[9] == 5
assert body[10] == 12
# And the decoded values match what we seeded.
decoded = Program.from_wire_bytes(body, slot=7)
assert decoded.month == 5
assert decoded.day == 12

View File

@ -94,6 +94,122 @@ def test_full_pca_parse_against_real_fixture() -> None:
pass
# ---- Programs block extraction against the live decrypted fixture ----
#
# These tests need the plaintext .pca dump at the path below — gitignored.
# If absent, they skip cleanly. If present, they assert the decode against
# the values established in the Phase 1 RE pass (Programs block, slot 22,
# the TIMED/EVENT/YEARLY type-distribution counts).
_FIXTURE = "/home/kdm/home-auto/HAI/pca-re/extracted/Our_House.pca.plain"
def _load_programs_blob_or_skip() -> bytes:
from pathlib import Path
p = Path(_FIXTURE)
if not p.exists():
pytest.skip(f"fixture not available: {_FIXTURE}")
from omni_pca.pca_file import (
_CAP_OMNI_PRO_II,
PcaReader,
_parse_header,
_walk_to_connection,
)
r = PcaReader(p.read_bytes())
_parse_header(r)
return _walk_to_connection(r, _CAP_OMNI_PRO_II)
def test_programs_block_decodes_against_live_fixture() -> None:
"""All 1500 slots decode without raising; counts match Phase 1 recon."""
from collections import Counter
from omni_pca.programs import ProgramType, decode_program_table, iter_defined
blob = _load_programs_blob_or_skip()
assert len(blob) == 1500 * 14
programs = decode_program_table(blob)
assert len(programs) == 1500
defined = list(iter_defined(programs))
assert len(defined) == 330
types = Counter(p.prog_type for p in defined)
assert types[int(ProgramType.TIMED)] == 209
assert types[int(ProgramType.EVENT)] == 105
assert types[int(ProgramType.YEARLY)] == 16
def test_programs_block_round_trips_byte_for_byte() -> None:
"""The strongest correctness signal: decode → encode → compare.
If a single byte of the 21,000-byte blob is off, this test catches it.
"""
from omni_pca.programs import decode_program_table
blob = _load_programs_blob_or_skip()
programs = decode_program_table(blob)
rebuilt = b"".join(p.encode_file_record() for p in programs)
assert rebuilt == blob
def test_programs_sanity_invariants() -> None:
"""Coarse invariants on the 330 defined programs.
The byte-for-byte round-trip test above is the load-bearing
correctness signal. This adds light coverage to catch a Mon/Day
swap regression specifically on YEARLY-typed programs (which use
bytes 9/10 as a true calendar date).
What we DON'T assert and why:
* **EVENT** programs encode a u16 event identifier in bytes 9/10
(see ``clsProgram.Evt`` at lines 152-163), not a calendar date.
* **TIMED** programs use bytes 12/13 either as absolute hour:minute
(0-23 : 0-59) *or* as a sunrise/sunset-relative offset
(Owner's Manual: ±0-120 minutes), with a flag we haven't
reverse-engineered yet. So hour=26 / minute=246 are valid wire
values in the absence of that flag decoder.
"""
from omni_pca.programs import ProgramType, decode_program_table, iter_defined
blob = _load_programs_blob_or_skip()
programs = decode_program_table(blob)
defined = list(iter_defined(programs))
yearly = [p for p in defined if p.prog_type == int(ProgramType.YEARLY)]
assert yearly, "fixture should have YEARLY programs"
for p in yearly:
assert 1 <= p.month <= 12, (
f"slot {p.slot} YEARLY: month={p.month}"
)
assert 1 <= p.day <= 31, (
f"slot {p.slot} YEARLY: day={p.day}"
)
def test_pca_account_dataclass_has_programs_field() -> None:
"""``PcaAccount`` exposes ``programs`` with the expected type + default.
Verifies the API surface without needing a working .pca decrypt
key the integration from raw blob through ``decode_program_table``
is covered by the other three live-fixture tests above.
"""
from omni_pca.pca_file import PcaAccount
fields = {f.name: f for f in PcaAccount.__dataclass_fields__.values()}
assert "programs" in fields
# default_factory or default — the field should be an empty tuple
# when no programs are decoded.
inst = PcaAccount(
version_tag="PCA03", file_version=3,
model=16, firmware_major=2, firmware_minor=12, firmware_revision=1,
)
assert inst.programs == ()
def test_pca_reader_io_state_introspection() -> None:
r = PcaReader(b"abcdef")
assert isinstance(r.buf, io.BytesIO)

247
tests/test_programs.py Normal file
View File

@ -0,0 +1,247 @@
"""Unit tests for omni_pca.programs.
Three layers of evidence (no external oracle, so we triangulate):
* **Golden bytes per ProgramType** hand-curated byte vectors that
exercise the layout-specific paths (Mon/Day swap for Event, Remark
variant with RemarkID at bytes 1-4).
* **Round-trip property** random 14-byte inputs survive
``decode(b).encode() == b`` for both wire and file layouts.
* **Unknown-enum tolerance** bytes outside ``ProgramType`` /
``Command`` enum domains pass through without raising.
"""
from __future__ import annotations
import os
import struct
import warnings
import pytest
from omni_pca.programs import (
MAX_PROGRAMS,
PROGRAM_BYTES,
Days,
Program,
ProgramType,
decode_program_table,
iter_defined,
)
# ---- golden bytes ---------------------------------------------------------
def test_timed_decodes_canonical_example() -> None:
"""The worked example from the docs page — TIMED program."""
body = bytes.fromhex("018d099b094403010008 0c3e070f".replace(" ", ""))
p = Program.from_file_record(body, slot=22)
assert p.slot == 22
assert p.prog_type == ProgramType.TIMED
assert p.cond == 0x8D09
assert p.cond2 == 0x9B09
assert p.cmd == 0x44
assert p.par == 3
assert p.pr2 == 0x0100
assert p.month == 8
assert p.day == 12
assert p.days == 0x3E
assert p.hour == 7
assert p.minute == 15
assert p.remark_id is None
# round-trip on file form
assert p.encode_file_record() == body
def test_event_swaps_mon_day_on_file_layout() -> None:
"""EVENT programs store [day, month] at offsets 9/10 on disk.
File bytes 9-10 = ``05 0c`` should decode to day=5, month=12 (not
month=5, day=12). Encoding back must preserve the swap so the
raw .pca slot bytes don't drift.
"""
body = bytes.fromhex("020c04000001010000050cff070f") # 14 bytes
assert len(body) == 14
p = Program.from_file_record(body, slot=1)
assert p.prog_type == ProgramType.EVENT
# Disk had [05, 0c] but EVENT swap means [day, mon].
assert p.day == 5
assert p.month == 12
# Round-trip MUST re-apply the swap.
assert p.encode_file_record() == body
def test_event_no_swap_on_wire_layout() -> None:
"""Same EVENT-type bytes via the wire decoder: NO swap.
On the wire ``clsOLMsgProgramData`` always stores [month, day] at
offsets 9/10 regardless of prog_type only the .pca file form
swaps. This test catches a regression where we accidentally swap
in the wire path.
"""
body = bytes.fromhex("020c04000001010000050cff070f")
p = Program.from_wire_bytes(body, slot=1)
assert p.prog_type == ProgramType.EVENT
# Wire form: byte 9 = month, byte 10 = day. NO swap.
assert p.month == 5
assert p.day == 12
assert p.encode_wire_bytes() == body
def test_yearly_program() -> None:
"""YEARLY programs use month + day fields semantically — no swap."""
body = bytes.fromhex("03000000000100008b010a0f00000001")[:14]
p = Program.from_file_record(body, slot=10)
assert p.prog_type == ProgramType.YEARLY
assert p.month == 0x01
assert p.day == 0x0A
assert p.encode_file_record() == body
def test_remark_uses_bytes_1_to_4_as_remark_id() -> None:
"""REMARK programs (prog_type=4) pack a 32-bit BE RemarkID into
bytes 1-4 in place of cond + cond2."""
remark_id = 0xDEADBEEF
body = (
bytes([int(ProgramType.REMARK)])
+ struct.pack(">I", remark_id)
+ bytes(9)
)
assert len(body) == 14
p = Program.from_file_record(body)
assert p.prog_type == ProgramType.REMARK
assert p.remark_id == remark_id
# cond / cond2 are zeroed in the dataclass — the bytes there are
# the RemarkID, not condition fields.
assert p.cond == 0
assert p.cond2 == 0
# Round-trip restores the RemarkID bytes verbatim.
assert p.encode_file_record() == body
def test_all_zero_slot_is_empty() -> None:
"""A free slot decodes cleanly and round-trips."""
p = Program.from_file_record(b"\x00" * 14, slot=999)
assert p.is_empty()
assert p.prog_type == ProgramType.FREE
assert p.encode_file_record() == b"\x00" * 14
assert p.encode_wire_bytes() == b"\x00" * 14
# ---- round-trip property ---------------------------------------------------
def test_random_round_trip_wire() -> None:
"""500 random 14-byte inputs: ``decode_wire → encode_wire == input``.
The wire path is the simpler one (no Mon/Day swap), so it should
round-trip every byte pattern losslessly.
"""
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
for _ in range(500):
body = os.urandom(14)
# Skip Remark inputs in this round — the dataclass discards
# cond/cond2 for Remark types and re-derives them from
# remark_id, but with no separate cond field we'd lose
# bytes that happen to differ; the next test covers Remark
# explicitly.
if body[0] == int(ProgramType.REMARK):
continue
p = Program.from_wire_bytes(body)
assert p.encode_wire_bytes() == body
def test_random_round_trip_file() -> None:
"""500 random 14-byte inputs through the file (Mon/Day swap) form."""
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
for _ in range(500):
body = os.urandom(14)
if body[0] == int(ProgramType.REMARK):
continue
p = Program.from_file_record(body)
assert p.encode_file_record() == body
def test_remark_round_trip() -> None:
"""Remark variant round-trip — explicitly, with random RemarkIDs."""
for _ in range(200):
remark_id_bytes = os.urandom(4)
body = (
bytes([int(ProgramType.REMARK)])
+ remark_id_bytes
+ os.urandom(9)
)
p_file = Program.from_file_record(body)
assert p_file.encode_file_record() == body
p_wire = Program.from_wire_bytes(body)
assert p_wire.encode_wire_bytes() == body
# ---- unknown-enum tolerance ------------------------------------------------
def test_unknown_prog_type_passes_through_with_warning() -> None:
"""Bytes outside ProgramType (0..10) decode to a raw int + warning.
Reset the once-per-process cache first; otherwise earlier random
round-trip tests may have already seen this value and silenced
the warning.
"""
import omni_pca.programs as pm
pm._warned_unknown.clear()
body = bytes([0x42]) + bytes(13)
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
p = Program.from_wire_bytes(body)
assert p.prog_type == 0x42
assert p.encode_wire_bytes() == body
assert any("ProgramType" in str(w.message) for w in caught)
def test_unknown_cmd_passes_through() -> None:
"""Unrecognised cmd bytes decode without raising."""
body = bytes([int(ProgramType.TIMED), 0, 0, 0, 0, 0xFA, 0]) + bytes(7)
p = Program.from_wire_bytes(body)
assert p.cmd == 0xFA
assert p.encode_wire_bytes() == body
# ---- table decode ----------------------------------------------------------
def test_decode_program_table_size_validation() -> None:
with pytest.raises(ValueError, match="must be 21000 bytes"):
decode_program_table(b"\x00" * 100)
def test_decode_program_table_round_trip_all_zero() -> None:
"""All-zero 21000-byte blob round-trips, slot numbers are 1..1500."""
blob = b"\x00" * (MAX_PROGRAMS * PROGRAM_BYTES)
programs = decode_program_table(blob)
assert len(programs) == MAX_PROGRAMS
assert programs[0].slot == 1
assert programs[-1].slot == MAX_PROGRAMS
assert all(p.is_empty() for p in programs)
assert list(iter_defined(programs)) == []
rebuilt = b"".join(p.encode_file_record() for p in programs)
assert rebuilt == blob
def test_iter_defined_filters_empty() -> None:
p1 = Program(prog_type=int(ProgramType.TIMED), slot=1, cmd=1, hour=8)
p2 = Program(slot=2) # empty
p3 = Program(prog_type=int(ProgramType.EVENT), slot=3, cmd=2)
defined = list(iter_defined((p1, p2, p3)))
assert defined == [p1, p3]
# ---- Days bitmask sanity --------------------------------------------------
def test_days_bitmask_values() -> None:
"""Sanity-check the enuDays values against the C# definition."""
assert Days.MONDAY == 0x02
assert Days.SUNDAY == 0x80
assert Days.MONDAY | Days.WEDNESDAY | Days.FRIDAY == 0x2A