Compare commits

...

5 Commits

Author SHA1 Message Date
390f3a9dc0 mock_panel: MockState.from_pca builds state from a real .pca file
Convenience constructor that runs parse_pca_file and seeds:
* model_byte + firmware_major/minor/revision from the .pca header,
  so SystemInformation replies match the panel the file came from
* programs dict from every non-empty Program record in the 1500-slot
  table, encoded back to wire bytes for direct UploadProgram /
  UploadPrograms service

Per-object name/state (zones/units/areas/thermostats) isn't in the
pca_file extraction yet — those default to empty unless the caller
overrides. Easy to extend later when pca_file grows zone/unit name
parsing.

Net effect: anyone can now point a MockPanel at any .pca file and
get a hermetic replay of that install's programs over both v1 and
v2 wire dialects:

    state = MockState.from_pca("My_House.pca", key=KEY_EXPORT)
    panel = MockPanel(controller_key=k, state=state)

New e2e test materialises the live fixture, builds the mock from it,
streams all 330 programs back through OmniClient.iter_programs, and
asserts the slot indexes match.
2026-05-12 20:25:02 -06:00
e57fbc41e3 HA: optional .pca file as alternate source for panel programs
Adds CONF_PCA_PATH + CONF_PCA_KEY config-flow fields. When set, the
coordinator parses programs from the .pca file at that path instead
of streaming them over the wire on every entry refresh. Useful for:

* deployments where wire enumeration is slow (1500-slot iteration)
* offline snapshots when the panel is unreachable
* deterministic test setups against a known fixture

The config-flow validates the file is readable and decrypts cleanly,
surfacing pca_not_found / pca_decode_failed errors via the strings/
en.json translations.

The .pca path is checked first in _discover_programs; if absent the
wire path runs as before. So existing deployments are unaffected.

Tests cover the success path (live fixture, 330 programs) and the
two validation failures (missing file, garbage bytes).
2026-05-12 19:15:32 -06:00
b412dc0f37 HA: discover programs over the wire + diagnostic sensor
Coordinator's _discover_programs is no longer a placeholder. It now
drives client.iter_programs() (v2 path) or the v1 adapter's forward
to OmniClientV1.iter_programs (v1 path), populating OmniData.programs
with decoded Program records keyed by slot. Errors are logged and
swallowed so partial enumeration doesn't break entry setup —
programs are non-critical telemetry.

OmniData.programs is now dict[int, Program] rather than the
ProgramProperties dict that was an empty placeholder. The
ProgramProperties dataclass remains in models.py for the Properties
opcode reply path; only the coordinator's value type changed.

New OmniProgramsSensor on the sensor platform: a single diagnostic
entity per panel whose state is the count of defined programs and
whose 'programs' attribute lists each program's slot, type name,
and schedule fields. Easy to consume from automations and the
developer-states UI.

Mock fixture seeds three programs (TIMED+TIMED+EVENT at slots 12 /
42 / 99). New integration test verifies the sensor enumerates them
in slot-ascending order with the expected per-record fields.

Full suite: 494 passed, 1 skipped (fixture-gated).
2026-05-12 19:10:32 -06:00
4ad20c9350 clients: iter_programs() for both v1 and v2 wire dialects
v2 path adds an iterator over UploadProgram with request_reason=1
("next defined after slot"), mirroring the C# ReadConfig loop at
clsHAC.cs:4985 (seed call) and 5331 (per-reply re-issue). The mock
panel now honours reason=1: walks state.programs for the next
slot strictly greater than the requested one, returns EOD when none.

v1 path wraps OmniConnectionV1.iter_streaming(UploadPrograms) and
decodes each ProgramData reply into a Program. The panel already
streams in slot-ascending order from the previous commit, so the
client just decodes-and-yields.

Both methods return AsyncIterator[Program] for HA-side consumption.
Tests cover populated and empty states for both dialects, plus the
raw v2 reason=1 semantics on a single request.
2026-05-12 19:07:42 -06:00
933d326dd3 mock_panel: v1 UploadPrograms streaming + program-echo tests
MockPanel only handled the v2 (single-slot, request/reply)
UploadProgram path. v1 panels use a streaming variant:
client sends UploadPrograms (bare), panel emits one ProgramData
per defined slot, ack-walked by the client, terminated by EOD.

Wire layout is byte-identical to v2 — only the envelope opcode
and stream pattern differ (clsHAC.OL1ReadConfig at clsHAC.cs:4403,
4538-4540, 4642-4651). The mock now mirrors the UploadNames
streaming pattern with its own cursor.

Tests cover both the populated-state stream-then-EOD case and
the empty-state immediate-EOD case, alongside the existing v2
single-slot round-trip tests.
2026-05-12 18:21:05 -06:00
15 changed files with 846 additions and 41 deletions

View File

@ -16,6 +16,8 @@ from homeassistant.exceptions import ConfigEntryNotReady
from .const import (
CONF_CONTROLLER_KEY,
CONF_PCA_KEY,
CONF_PCA_PATH,
CONF_TRANSPORT,
DEFAULT_TRANSPORT,
DOMAIN,
@ -57,6 +59,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return False
transport: str = entry.data.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
pca_path: str = entry.data.get(CONF_PCA_PATH, "") or ""
pca_key: int = entry.data.get(CONF_PCA_KEY, 0)
coordinator = OmniDataUpdateCoordinator(
hass,
entry,
@ -64,6 +68,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
port=port,
controller_key=controller_key,
transport=transport,
pca_path=pca_path or None,
pca_key=pca_key,
)
try:

View File

@ -20,6 +20,8 @@ from omni_pca.connection import (
from .const import (
CONF_CONTROLLER_KEY,
CONF_PCA_KEY,
CONF_PCA_PATH,
CONF_TRANSPORT,
CONTROLLER_KEY_HEX_LEN,
DEFAULT_PORT,
@ -70,6 +72,15 @@ _USER_SCHEMA = vol.Schema(
vol.Required(CONF_TRANSPORT, default=DEFAULT_TRANSPORT): vol.In(
[TRANSPORT_TCP, TRANSPORT_UDP]
),
# Optional: load panel programs from a saved .pca file on the HA
# filesystem (e.g. /config/pca/My_House.pca) instead of streaming
# them from the panel on every restart. Useful when wire
# enumeration is slow or unreliable. CONF_PCA_KEY is the
# per-install key from PCA01.CFG (a uint32, 0 for plain-text).
vol.Optional(CONF_PCA_PATH, default=""): str,
vol.Optional(CONF_PCA_KEY, default=0): vol.All(
vol.Coerce(int), vol.Range(min=0, max=0xFFFFFFFF)
),
}
)
@ -90,6 +101,8 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
host: str = user_input[CONF_HOST].strip()
port: int = user_input[CONF_PORT]
transport: str = user_input.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
pca_path: str = (user_input.get(CONF_PCA_PATH) or "").strip()
pca_key: int = user_input.get(CONF_PCA_KEY, 0)
unique_id = f"{host}:{port}"
await self.async_set_unique_id(unique_id)
@ -101,19 +114,24 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
LOGGER.debug("controller key rejected: %s", err)
errors[CONF_CONTROLLER_KEY] = "invalid_key"
else:
title, error = await self._probe(host, port, key, transport)
if error is not None:
errors["base"] = error
else:
return self.async_create_entry(
title=title or f"Omni Panel ({host})",
data={
CONF_HOST: host,
CONF_PORT: port,
CONF_CONTROLLER_KEY: key.hex(),
CONF_TRANSPORT: transport,
},
)
if pca_path and (pca_err := await self._validate_pca(pca_path, pca_key)):
errors[CONF_PCA_PATH] = pca_err
if not errors:
title, error = await self._probe(host, port, key, transport)
if error is not None:
errors["base"] = error
else:
return self.async_create_entry(
title=title or f"Omni Panel ({host})",
data={
CONF_HOST: host,
CONF_PORT: port,
CONF_CONTROLLER_KEY: key.hex(),
CONF_TRANSPORT: transport,
CONF_PCA_PATH: pca_path,
CONF_PCA_KEY: pca_key,
},
)
return self.async_show_form(
step_id="user",
@ -121,6 +139,33 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
errors=errors,
)
async def _validate_pca(self, path: str, key: int) -> str | None:
"""Validate the user-supplied .pca path is readable and decrypts.
Returns ``None`` on success or an error code string on failure
(matches the {code: message} keys in strings.json).
"""
from pathlib import Path
from omni_pca.pca_file import parse_pca_file
p = Path(path)
if not p.is_file():
return "pca_not_found"
try:
data = await self.hass.async_add_executor_job(p.read_bytes)
acct = await self.hass.async_add_executor_job(
lambda: parse_pca_file(data, key=key)
)
except Exception as err:
LOGGER.debug("pca file rejected: %s", err)
return "pca_decode_failed"
# Sanity: programs block decoded cleanly. Empty is allowed
# (legitimate brand-new install with no programs).
if not isinstance(acct.programs, tuple):
return "pca_decode_failed"
return None
async def async_step_reauth(
self, entry_data: Mapping[str, Any]
) -> ConfigFlowResult:

View File

@ -14,6 +14,13 @@ DEFAULT_TIMEOUT: Final = 5.0
CONF_CONTROLLER_KEY: Final = "controller_key"
CONF_TRANSPORT: Final = "transport"
# Optional: when set, load panel programs from a .pca file at this path
# instead of enumerating them over the wire on every entry refresh. The
# .pca file is decrypted with CONF_PCA_KEY (the per-install key from
# PCA01.CFG, or 0 for a plain-text dump). Both must be set together.
CONF_PCA_PATH: Final = "pca_path"
CONF_PCA_KEY: Final = "pca_key"
TRANSPORT_TCP: Final = "tcp"
TRANSPORT_UDP: Final = "udp"
DEFAULT_TRANSPORT: Final = TRANSPORT_TCP

View File

@ -62,7 +62,6 @@ from omni_pca.models import (
AreaStatus,
ButtonProperties,
ObjectType,
ProgramProperties,
SystemInformation,
SystemStatus,
ThermostatProperties,
@ -73,6 +72,7 @@ from omni_pca.models import (
ZoneStatus,
)
from omni_pca.opcodes import OmniLink2MessageType
from omni_pca.programs import Program
from .const import (
DOMAIN,
@ -108,7 +108,7 @@ class OmniData:
areas: dict[int, AreaProperties] = field(default_factory=dict)
thermostats: dict[int, ThermostatProperties] = field(default_factory=dict)
buttons: dict[int, ButtonProperties] = field(default_factory=dict)
programs: dict[int, ProgramProperties] = field(default_factory=dict)
programs: dict[int, Program] = field(default_factory=dict)
zone_status: dict[int, ZoneStatus] = field(default_factory=dict)
unit_status: dict[int, UnitStatus] = field(default_factory=dict)
@ -138,6 +138,8 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
port: int,
controller_key: bytes,
transport: str = "tcp",
pca_path: str | None = None,
pca_key: int = 0,
) -> None:
super().__init__(
hass,
@ -150,6 +152,8 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
self._port = port
self._controller_key = controller_key
self._transport = transport
self._pca_path = pca_path
self._pca_key = pca_key
self._client: OmniClient | None = None
self._discovery_done = False
self._discovered: OmniData | None = None
@ -382,15 +386,70 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
async def _discover_programs(
self, client: OmniClient
) -> dict[int, ProgramProperties]:
# Programs aren't reachable via the Properties opcode (the C# side
# uses a separate request/reply pair), so we just return an empty
# dict. We keep the field on OmniData so Phase B can plug in real
# discovery the moment the library exposes it. AMBIGUITY: the spec
# asks for "named programs" — there's no on-the-wire path for that
# in v1.0 of omni_pca, so an empty mapping is the honest answer.
_ = client, ProgramProperties
return {}
) -> dict[int, Program]:
"""Enumerate defined panel programs.
Two sources, in order of preference:
1. ``CONF_PCA_PATH`` is configured parse the .pca file and
extract the programs block. Avoids streaming 1500 records on
every entry refresh and works against an offline snapshot.
2. Otherwise enumerate over the wire:
* v2 (TCP): ``client.iter_programs()`` drives UploadProgram
with request_reason=1 ("next defined after slot").
* v1 (UDP): adapter forwards to OmniClientV1.iter_programs(),
a bare UploadPrograms stream ack-walked to EOD.
Both paths yield :class:`omni_pca.programs.Program` and skip
empty slots. Errors are logged and swallowed programs are
non-critical discovery, so a partial list beats blocking setup.
"""
if self._pca_path:
return await self._discover_programs_from_pca()
out: dict[int, Program] = {}
try:
async for prog in client.iter_programs():
if prog.slot is not None:
out[prog.slot] = prog
except (OmniConnectionError, RequestTimeoutError):
raise
except Exception:
LOGGER.debug(
"program enumeration interrupted (kept %d)", len(out), exc_info=True
)
return out
async def _discover_programs_from_pca(self) -> dict[int, Program]:
"""Parse the configured .pca file and pull out its programs block.
Runs the disk I/O on the executor since :mod:`pca_file` does
sync reads. Any failure (missing file, bad key, malformed block)
is logged and downgraded to an empty dict the rest of
discovery still works.
"""
from pathlib import Path
from omni_pca.pca_file import parse_pca_file
try:
data = await self.hass.async_add_executor_job(
Path(self._pca_path).read_bytes
)
acct = await self.hass.async_add_executor_job(
lambda: parse_pca_file(data, key=self._pca_key)
)
except Exception:
LOGGER.warning(
"failed to load programs from %s — falling back to empty list",
self._pca_path, exc_info=True,
)
return {}
out: dict[int, Program] = {}
for prog in acct.programs:
if prog.slot is None or prog.is_empty():
continue
out[prog.slot] = prog
return out
async def _walk_properties(
self,

View File

@ -69,6 +69,7 @@ async def async_setup_entry(
entities.append(OmniSystemModelSensor(coordinator))
entities.append(OmniLastEventSensor(coordinator))
entities.append(OmniProgramsSensor(coordinator))
async_add_entities(entities)
@ -261,3 +262,55 @@ class OmniLastEventSensor(
if hasattr(ev, key):
result[key] = getattr(ev, key)
return result
class OmniProgramsSensor(
CoordinatorEntity[OmniDataUpdateCoordinator], SensorEntity
):
"""Diagnostic sensor exposing the panel's automation programs.
State value is the count of defined programs. The ``programs``
attribute carries a list of per-program summaries a stable,
JSON-serializable view automations and template sensors can read.
"""
_attr_has_entity_name = True
_attr_entity_category = EntityCategory.DIAGNOSTIC
_attr_state_class = SensorStateClass.MEASUREMENT
def __init__(self, coordinator: OmniDataUpdateCoordinator) -> None:
super().__init__(coordinator)
self._attr_unique_id = f"{coordinator.unique_id}-programs"
self._attr_name = "Panel Programs"
self._attr_device_info = coordinator.device_info
@property
def native_value(self) -> int:
return len(self.coordinator.data.programs)
@property
def extra_state_attributes(self) -> dict[str, Any]:
from omni_pca.programs import ProgramType
summaries: list[dict[str, Any]] = []
for slot in sorted(self.coordinator.data.programs):
p = self.coordinator.data.programs[slot]
try:
type_name = ProgramType(p.prog_type).name
except ValueError:
type_name = f"UNKNOWN({p.prog_type})"
summaries.append(
{
"slot": slot,
"type": type_name,
"cmd": p.cmd,
"par": p.par,
"pr2": p.pr2,
"month": p.month,
"day": p.day,
"days": p.days,
"hour": p.hour,
"minute": p.minute,
}
)
return {"programs": summaries}

View File

@ -3,11 +3,14 @@
"step": {
"user": {
"title": "Connect to your Omni panel",
"description": "Enter the IP/hostname of your HAI/Leviton Omni Pro II controller and the 32-character hex Controller Key. Use `omni-pca decode-pca <file>.pca --field controller_key` to extract the key from a PC Access export.",
"description": "Enter the IP/hostname of your HAI/Leviton Omni Pro II controller and the 32-character hex Controller Key. Use `omni-pca decode-pca <file>.pca --field controller_key` to extract the key from a PC Access export. Optionally provide a path to a saved `.pca` file to load panel programs from disk instead of streaming them from the controller.",
"data": {
"host": "Host",
"port": "Port",
"controller_key": "Controller Key (32 hex chars)"
"controller_key": "Controller Key (32 hex chars)",
"transport": "Transport",
"pca_path": ".pca file path (optional)",
"pca_key": ".pca per-install key (integer; 0 if plain-text)"
}
},
"reauth_confirm": {
@ -22,6 +25,8 @@
"cannot_connect": "Could not reach the panel. Check the host, port, and that TCP/4369 is open.",
"invalid_auth": "The Controller Key was rejected by the panel.",
"invalid_key": "Controller Key must be exactly 32 hexadecimal characters (16 bytes).",
"pca_not_found": "No file at the supplied .pca path.",
"pca_decode_failed": "Could not decode the .pca file. Check the per-install key.",
"unknown": "An unexpected error occurred. Check the Home Assistant logs."
},
"abort": {

View File

@ -3,11 +3,14 @@
"step": {
"user": {
"title": "Connect to your Omni panel",
"description": "Enter the IP/hostname of your HAI/Leviton Omni Pro II controller and the 32-character hex Controller Key. Use `omni-pca decode-pca <file>.pca --field controller_key` to extract the key from a PC Access export.",
"description": "Enter the IP/hostname of your HAI/Leviton Omni Pro II controller and the 32-character hex Controller Key. Use `omni-pca decode-pca <file>.pca --field controller_key` to extract the key from a PC Access export. Optionally provide a path to a saved `.pca` file to load panel programs from disk instead of streaming them from the controller.",
"data": {
"host": "Host",
"port": "Port",
"controller_key": "Controller Key (32 hex chars)"
"controller_key": "Controller Key (32 hex chars)",
"transport": "Transport",
"pca_path": ".pca file path (optional)",
"pca_key": ".pca per-install key (integer; 0 if plain-text)"
}
},
"reauth_confirm": {
@ -22,6 +25,8 @@
"cannot_connect": "Could not reach the panel. Check the host, port, and that TCP/4369 is open.",
"invalid_auth": "The Controller Key was rejected by the panel.",
"invalid_key": "Controller Key must be exactly 32 hexadecimal characters (16 bytes).",
"pca_not_found": "No file at the supplied .pca path.",
"pca_decode_failed": "Could not decode the .pca file. Check the per-install key.",
"unknown": "An unexpected error occurred. Check the Home Assistant logs."
},
"abort": {

View File

@ -607,6 +607,45 @@ class OmniClient:
"""
await self.execute_command(Command.CLEAR_MESSAGE, parameter2=index)
# ---- program enumeration --------------------------------------------
async def iter_programs(self) -> AsyncIterator["Program"]:
"""Stream every defined program from the panel.
v2 has no bulk "send all programs" opcode; instead the panel
exposes an iterator semantic via ``UploadProgram`` with
``request_reason=1`` ("next defined after this slot"). We seed
with slot 0 and follow each reply's ``ProgramNumber`` back into
the next request until the panel sends EOD.
Mirrors the C# ReadConfig loop at ``clsHAC.OL2ReadConfigProcessProgramData``
(clsHAC.cs:5323-5332) and the seed call at clsHAC.cs:4985.
Yields decoded :class:`omni_pca.programs.Program` instances, one
per defined slot in ascending slot order. Empty slots are
skipped by the panel the iterator only sees defined programs.
"""
from .programs import Program # local import: avoids cycle in __init__
slot = 0
while True:
payload = bytes([(slot >> 8) & 0xFF, slot & 0xFF, 1])
reply = await self._conn.request(
OmniLink2MessageType.UploadProgram, payload
)
if reply.opcode == int(OmniLink2MessageType.EOD):
return
if reply.opcode != int(OmniLink2MessageType.ProgramData):
raise OmniConnectionError(
f"unexpected opcode {reply.opcode} during UploadProgram iteration "
f"(expected {int(OmniLink2MessageType.ProgramData)})"
)
if len(reply.payload) < 2 + 14:
raise OmniConnectionError(
f"ProgramData payload too short ({len(reply.payload)} bytes)"
)
slot = (reply.payload[0] << 8) | reply.payload[1]
yield Program.from_wire_bytes(reply.payload[2 : 2 + 14], slot=slot)
# ---- helpers (status) -----------------------------------------------
async def _fetch_status_range(

View File

@ -52,7 +52,7 @@ import secrets
from collections.abc import AsyncIterator, Callable
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import ClassVar
from typing import Any, ClassVar
from .commands import Command
from .crypto import (
@ -266,6 +266,52 @@ class MockState:
self.thermostats = _promote_dict(self.thermostats, MockThermostatState)
self.buttons = _promote_dict(self.buttons, MockButtonState)
@classmethod
def from_pca(
cls,
path_or_bytes: str | bytes,
key: int,
**overrides: Any,
) -> MockState:
"""Build a MockState seeded from a real .pca file.
Populated from the .pca:
* ``model_byte`` + ``firmware_*`` drive SystemInformation replies
so a connected client sees the panel the .pca came from.
* ``programs`` every non-empty Program record from the 1500-slot
table, encoded back to wire bytes so UploadProgram / UploadPrograms
serve them exactly as a real panel would.
Everything else uses MockState defaults (or whatever the caller
passes as ``**overrides``). Per-object name/state tables aren't
in the .pca header that pca_file currently extracts, so zones /
units / areas / thermostats default to empty unless the caller
explicitly provides them.
``key=0`` only works for files where the export keystream was
already applied (e.g., the result of ``decrypt_pca_bytes`` with
the same key); use ``KEY_EXPORT`` (391549495) for unmodified
PC Access exports.
"""
from .pca_file import parse_pca_file
acct = parse_pca_file(path_or_bytes, key=key)
programs = {
p.slot: p.encode_wire_bytes()
for p in acct.programs
if p.slot is not None and not p.is_empty()
}
defaults: dict[str, Any] = {
"model_byte": acct.model,
"firmware_major": acct.firmware_major,
"firmware_minor": acct.firmware_minor,
"firmware_revision": acct.firmware_revision,
"programs": programs,
}
defaults.update(overrides)
return cls(**defaults)
# ---- name-bytes helpers (kept for back-compat with old callers) -----
def zone_name_bytes(self, idx: int) -> bytes:
@ -344,6 +390,9 @@ class MockPanel:
# v1 UploadNames cursor: index into self._v1_name_stream() while a
# streaming download is in flight, ``None`` when no stream active.
self._upload_names_cursor: int | None = None
# v1 UploadPrograms cursor: index into self._v1_program_stream() while
# a streaming download is in flight, ``None`` when no stream active.
self._upload_programs_cursor: int | None = None
# -------- public observables (handy in tests) --------
@ -709,18 +758,37 @@ class MockPanel:
return _build_nak(opcode), ()
def _reply_program_data(self, payload: bytes) -> Message:
"""Single-shot v2 program read.
"""v2 program read — single-slot OR iterator.
Request payload: ``[number_hi, number_lo, request_reason]`` (3 bytes
per ``clsOL2MsgUploadProgram``). Reply payload: ``[number_hi,
number_lo] + raw_14_byte_body`` per ``clsOL2MsgProgramData``.
If the slot is missing from ``state.programs`` we serve 14 zero
bytes same as a real panel reporting an empty slot.
``request_reason`` semantics mirror the C# ReadConfig flow at
clsHAC.cs:4985 / 5331:
0 return the exact requested slot (zero body if undefined).
1 "next defined": return the lowest slot strictly greater
than the requested number. If none, return EOD. The
C# client iterates by feeding back each received slot
number with reason=1 until EOD.
Any other reason value is treated as reason=0 (we have no other
captures showing alternate semantics).
"""
if len(payload) < 2:
return _build_nak(OmniLink2MessageType.UploadProgram)
number = (payload[0] << 8) | payload[1]
reason = payload[2] if len(payload) >= 3 else 0
if reason == 1:
# "Next defined after this slot." If start_slot=0 (initial
# call) and no programs are defined, we fall straight to EOD.
next_slot = min(
(s for s in self.state.programs if s > number), default=None
)
if next_slot is None:
return encode_v2(OmniLink2MessageType.EOD, b"")
number = next_slot
body = self.state.programs.get(number, b"\x00" * 14)
if len(body) != 14:
return _build_nak(OmniLink2MessageType.UploadProgram)
@ -1236,10 +1304,14 @@ class MockPanel:
return self._v1_reply_auxiliary_status(payload), ()
if opcode == OmniLinkMessageType.UploadNames:
return self._v1_start_upload_names_stream(), ()
if opcode == OmniLinkMessageType.UploadPrograms:
return self._v1_start_upload_programs_stream(), ()
if opcode == OmniLinkMessageType.Ack:
# During an active UploadNames stream, each client Ack
# advances the cursor. With no active stream, drop silently
# (Ack as a request opcode is only meaningful mid-stream).
# During an active stream, each client Ack advances the
# appropriate cursor. With no active stream, Ack as a request
# opcode is only meaningful mid-stream — NAK it.
if self._upload_programs_cursor is not None:
return self._v1_advance_upload_programs_stream(), ()
if self._upload_names_cursor is not None:
return self._v1_advance_upload_names_stream(), ()
return _build_v1_nak(opcode), ()
@ -1450,6 +1522,46 @@ class MockPanel:
t, n, name = names[self._upload_names_cursor]
return self._v1_namedata_msg(t, n, name)
# ---- UploadPrograms streaming ----
#
# Wire flow per clsHAC.OL1ReadConfig (clsHAC.cs:4403, 4538-4540, 4642-4651):
# client → UploadPrograms (bare)
# panel → ProgramData (slot N body)
# client → Ack
# panel → ProgramData (slot N+1 body) ...
# panel → EOD
#
# ProgramData body layout matches v2 exactly (clsOLMsgProgramData
# mirrors clsOL2MsgProgramData byte-for-byte) — both prepend a 2-byte
# BE ProgramNumber to the 14-byte wire body. Only the outer envelope
# opcode differs (v1 vs v2).
def _v1_program_stream(self) -> list[int]:
"""Sorted list of defined program slot numbers."""
return sorted(self.state.programs)
def _v1_programdata_msg(self, slot: int) -> Message:
body = self.state.programs.get(slot, b"\x00" * 14)
payload = bytes([(slot >> 8) & 0xFF, slot & 0xFF]) + body
return encode_v1(OmniLinkMessageType.ProgramData, payload)
def _v1_start_upload_programs_stream(self) -> Message:
slots = self._v1_program_stream()
if not slots:
self._upload_programs_cursor = None
return _build_v1_eod()
self._upload_programs_cursor = 0
return self._v1_programdata_msg(slots[0])
def _v1_advance_upload_programs_stream(self) -> Message:
slots = self._v1_program_stream()
assert self._upload_programs_cursor is not None
self._upload_programs_cursor += 1
if self._upload_programs_cursor >= len(slots):
self._upload_programs_cursor = None
return _build_v1_eod()
return self._v1_programdata_msg(slots[self._upload_programs_cursor])
# ---- v1 Command / ExecuteSecurityCommand wrappers ----
# The wire payload format is byte-identical to v2 (clsOLMsgCommand.cs
# vs clsOL2MsgCommand.cs); only the outer opcode and the reply Ack

View File

@ -172,6 +172,16 @@ class OmniClientV1Adapter:
async def list_message_names(self) -> dict[int, str]:
return (await self._ensure_names()).get(7, {}) # NameType.MESSAGE
# ---- programs ------------------------------------------------------
def iter_programs(self):
"""Forward to OmniClientV1.iter_programs (streaming UploadPrograms).
Same async-iterator shape as :meth:`OmniClient.iter_programs` so the
coordinator does not need a transport branch.
"""
return self._client.iter_programs()
# ---- properties synthesis ------------------------------------------
async def get_object_properties(

View File

@ -208,6 +208,39 @@ class OmniClientV1:
async def list_button_names(self) -> dict[int, str]:
return (await self.list_all_names()).get(int(NameType.BUTTON), {})
# ---- programs (streaming UploadPrograms) -----------------------------
async def iter_programs(self) -> AsyncIterator["Program"]:
"""Stream every defined program from the panel.
v1 has no per-slot request a bare ``UploadPrograms`` triggers
the panel to dump every defined program in ascending slot order,
each as a separate ``ProgramData`` reply that we must
``Acknowledge`` to advance.
Reference: clsHAC.cs:4403 (bare UploadPrograms send), 4642-4651
(per-reply ack-walk), 4538-4540 (dispatch).
Yields decoded :class:`omni_pca.programs.Program` instances.
Empty slots are not transmitted the iterator only sees defined
programs.
"""
from ..programs import Program
async for reply in self._conn.iter_streaming(
OmniLinkMessageType.UploadPrograms
):
if reply.opcode != int(OmniLinkMessageType.ProgramData):
raise OmniProtocolError(
f"unexpected opcode {reply.opcode} during UploadPrograms stream "
f"(expected {int(OmniLinkMessageType.ProgramData)})"
)
if len(reply.payload) < 2 + 14:
raise OmniProtocolError(
f"ProgramData payload too short ({len(reply.payload)} bytes)"
)
slot = (reply.payload[0] << 8) | reply.payload[1]
yield Program.from_wire_bytes(reply.payload[2 : 2 + 14], slot=slot)
# ---- write methods (Command + ExecuteSecurityCommand) ----------------
#
# The Command and ExecuteSecurityCommand payloads are byte-identical

View File

@ -64,6 +64,26 @@ def _short_scan_interval(monkeypatch: pytest.MonkeyPatch) -> None:
@pytest.fixture
def populated_state() -> MockState:
"""A lightly-populated mock state covering every entity platform."""
from omni_pca.programs import Days, Program, ProgramType
programs = {
slot: prog.encode_wire_bytes()
for slot, prog in {
12: Program(
slot=12, prog_type=int(ProgramType.TIMED),
cmd=3, hour=6, minute=0,
days=int(Days.MONDAY | Days.FRIDAY),
),
42: Program(
slot=42, prog_type=int(ProgramType.TIMED),
cmd=4, hour=22, minute=30,
days=int(Days.SUNDAY),
),
99: Program(
slot=99, prog_type=int(ProgramType.EVENT),
cmd=5, month=5, day=12,
),
}.items()
}
return MockState(
zones={
1: MockZoneState(name="FRONT_DOOR"),
@ -84,6 +104,7 @@ def populated_state() -> MockState:
1: MockButtonState(name="GOOD_MORNING"),
},
user_codes={1: 1234},
programs=programs,
)

View File

@ -0,0 +1,131 @@
"""HA-side integration: optional .pca file source for panel programs.
When ``CONF_PCA_PATH`` is set in the entry data, the coordinator should
parse the .pca file at that path (with ``CONF_PCA_KEY`` as the per-install
key) and use those programs *instead* of streaming them over the wire.
The wire-based discovery for everything else (zones, units, etc.) is
unaffected.
"""
from __future__ import annotations
from collections.abc import AsyncIterator
from pathlib import Path
from typing import Any
import pytest
from custom_components.omni_pca.const import (
CONF_CONTROLLER_KEY,
CONF_PCA_KEY,
CONF_PCA_PATH,
DOMAIN,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_HOST, CONF_PORT
from homeassistant.core import HomeAssistant
from pytest_homeassistant_custom_component.common import MockConfigEntry
from .conftest import CONTROLLER_KEY_HEX
LIVE_FIXTURE_PLAIN = Path(
"/home/kdm/home-auto/HAI/pca-re/extracted/Our_House.pca.plain"
)
def _materialize_encrypted_fixture(tmp_path: Path) -> tuple[Path, int]:
"""Re-encrypt the plain fixture so parse_pca_file can decrypt it.
parse_pca_file always runs the XOR keystream. The plain dump bypasses
that, so we re-apply the keystream with KEY_EXPORT and write the
result to tmp_path. Returns (file_path, key) the coordinator should use.
"""
from omni_pca.pca_file import KEY_EXPORT, decrypt_pca_bytes
plain = LIVE_FIXTURE_PLAIN.read_bytes()
# XOR is symmetric — "decrypt" of plain bytes with the export key
# produces a valid encrypted .pca that parse_pca_file can read back.
encrypted = decrypt_pca_bytes(plain, KEY_EXPORT)
fixture = tmp_path / "Test_House.pca"
fixture.write_bytes(encrypted)
return fixture, KEY_EXPORT
@pytest.fixture
async def configured_with_pca(
hass: HomeAssistant, panel: tuple[Any, str, int], tmp_path: Path
) -> AsyncIterator[ConfigEntry]:
"""Config entry pointing at a .pca file fixture for programs."""
if not LIVE_FIXTURE_PLAIN.is_file():
pytest.skip(f"live .pca fixture missing: {LIVE_FIXTURE_PLAIN}")
fixture_path, pca_key = _materialize_encrypted_fixture(tmp_path)
_, host, port = panel
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: host,
CONF_PORT: port,
CONF_CONTROLLER_KEY: CONTROLLER_KEY_HEX,
CONF_PCA_PATH: str(fixture_path),
CONF_PCA_KEY: pca_key,
},
title=f"Mock Omni @ {host}:{port} (with .pca)",
unique_id=f"{host}:{port}",
)
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
try:
yield entry
finally:
if entry.entry_id in hass.data.get(DOMAIN, {}):
await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
async def test_pca_source_overrides_wire_programs(
hass: HomeAssistant, configured_with_pca: ConfigEntry
) -> None:
"""The fixture .pca has 330 defined programs (Phase 1 recon). The mock
panel only seeded 3 in conftest. When pca_path is set, the .pca count
wins proving the coordinator routed through _discover_programs_from_pca,
not iter_programs."""
coordinator = hass.data[DOMAIN][configured_with_pca.entry_id]
assert len(coordinator.data.programs) == 330
# Sanity: the diagnostic sensor reflects the .pca count, not the mock seed.
sensors = [
s for s in hass.states.async_all("sensor")
if "panel_programs" in s.entity_id
]
assert len(sensors) == 1
assert int(sensors[0].state) == 330
async def test_pca_path_validation_rejects_missing_file(
hass: HomeAssistant, tmp_path: Path
) -> None:
"""The config-flow validator returns ``pca_not_found`` for an absent
file. We exercise the helper directly to avoid spinning a full mock
panel just for the validation branch."""
from custom_components.omni_pca.config_flow import OmniConfigFlow
flow = OmniConfigFlow()
flow.hass = hass
err = await flow._validate_pca(str(tmp_path / "does-not-exist.pca"), 0)
assert err == "pca_not_found"
async def test_pca_path_validation_rejects_garbage(
hass: HomeAssistant, tmp_path: Path
) -> None:
"""A file that doesn't decode as a .pca returns ``pca_decode_failed``."""
from custom_components.omni_pca.config_flow import OmniConfigFlow
garbage = tmp_path / "garbage.pca"
garbage.write_bytes(b"not a real pca file" * 1000)
flow = OmniConfigFlow()
flow.hass = hass
err = await flow._validate_pca(str(garbage), 0)
assert err == "pca_decode_failed"

View File

@ -78,6 +78,35 @@ async def test_button_entity_for_panel_button(
assert len(states) == 1
async def test_programs_sensor_reflects_seeded_panel(
hass: HomeAssistant, configured_panel
) -> None:
"""The diagnostic Panel Programs sensor enumerates discovered programs.
Mock seed has 3 programs at slots 12 / 42 / 99 (see
:func:`populated_state`); after discovery the coordinator stashes them
on ``OmniData.programs`` and the sensor surfaces a count + per-slot
summary attribute.
"""
programs_states = [
s for s in hass.states.async_all("sensor")
if s.entity_id.endswith("_panel_programs") or "panel_programs" in s.entity_id
]
assert len(programs_states) == 1, (
f"expected one panel_programs sensor, got {[s.entity_id for s in programs_states]}"
)
s = programs_states[0]
assert int(s.state) == 3
summaries = s.attributes["programs"]
assert [p["slot"] for p in summaries] == [12, 42, 99]
# Spot-check the per-record fields landed in summary form.
by_slot = {p["slot"]: p for p in summaries}
assert by_slot[12]["type"] == "TIMED"
assert by_slot[12]["hour"] == 6 and by_slot[12]["minute"] == 0
assert by_slot[99]["type"] == "EVENT"
assert by_slot[99]["month"] == 5 and by_slot[99]["day"] == 12
async def test_event_entity_per_panel(
hass: HomeAssistant, configured_panel
) -> None:

View File

@ -1,10 +1,16 @@
"""End-to-end wire round-trip: client → MockPanel → program decoded.
Seeds the MockPanel with a known :class:`Program`, drives the v2
``UploadProgram`` opcode through a real TCP socket, and asserts the
decoded round-trip equals the seeded Program. Proves the on-the-wire
framing (2-byte BE ProgramNumber header + 14-byte body wrapped in a
``ProgramData`` reply) lines up with our decoder.
Seeds the MockPanel with known :class:`Program` records, exercises
both wire dialects, and asserts the decoded result equals what was
seeded.
* v2 (TCP, request/response per slot): drives ``UploadProgram`` once
per slot. Proves the per-program framing (2-byte BE ProgramNumber +
14-byte body wrapped in a ``ProgramData`` reply).
* v1 (UDP, streaming): drives bare ``UploadPrograms``, ack-walks the
streamed ``ProgramData`` replies to ``EOD``. Proves the streaming
lock-step matches the panel's behaviour described in
``clsHAC.OL1ReadConfig`` (clsHAC.cs:4403, 4538-4540, 4642-4651).
"""
from __future__ import annotations
@ -15,8 +21,9 @@ import pytest
from omni_pca.connection import OmniConnection
from omni_pca.mock_panel import MockPanel, MockState
from omni_pca.opcodes import OmniLink2MessageType
from omni_pca.opcodes import OmniLink2MessageType, OmniLinkMessageType
from omni_pca.programs import Days, Program, ProgramType
from omni_pca.v1 import OmniClientV1
CONTROLLER_KEY = bytes.fromhex("000102030405060708090a0b0c0d0e0f")
@ -134,3 +141,246 @@ async def test_v2_upload_program_event_type_no_swap_on_wire() -> None:
decoded = Program.from_wire_bytes(body, slot=7)
assert decoded.month == 5
assert decoded.day == 12
# ---- v1 streaming -------------------------------------------------------
def _decode_v1_programdata(payload: bytes) -> tuple[int, Program]:
"""Strip the BE ProgramNumber prefix from a v1 ``ProgramData`` payload,
decode the 14-byte body. Mirrors the v2 helper inline above."""
assert len(payload) >= 2 + 14
slot = (payload[0] << 8) | payload[1]
return slot, Program.from_wire_bytes(payload[2 : 2 + 14], slot=slot)
@pytest.mark.asyncio
async def test_v1_upload_programs_streams_all_seeded_slots() -> None:
"""The v1 ``UploadPrograms`` opcode is bare; the panel streams one
``ProgramData`` reply per defined slot, each followed by a client Ack,
terminated by ``EOD``. Order is by ascending slot index which is
what we feed back from ``sorted(state.programs)``."""
seeded = {
12: Program(slot=12, prog_type=int(ProgramType.TIMED), cmd=3, hour=6, minute=0,
days=int(Days.MONDAY | Days.FRIDAY)),
42: Program(slot=42, prog_type=int(ProgramType.TIMED), cond=0x8D09, cond2=0x9B09,
cmd=0x44, par=3, pr2=0x0100, month=8, day=12,
days=int(Days.MONDAY), hour=7, minute=15),
99: Program(slot=99, prog_type=int(ProgramType.EVENT), cmd=5, month=5, day=12),
}
panel = MockPanel(
controller_key=CONTROLLER_KEY,
state=MockState(programs={s: p.encode_wire_bytes() for s, p in seeded.items()}),
)
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
received: dict[int, Program] = {}
async for reply in c.connection.iter_streaming(
OmniLinkMessageType.UploadPrograms
):
assert reply.opcode == int(OmniLinkMessageType.ProgramData)
slot, prog = _decode_v1_programdata(reply.payload)
received[slot] = prog
assert set(received) == set(seeded)
for slot, want in seeded.items():
got = received[slot]
# Field-by-field — same checks as the v2 test, plus a slot equality.
assert got.slot == slot
assert got.prog_type == want.prog_type
assert got.cond == want.cond
assert got.cond2 == want.cond2
assert got.cmd == want.cmd
assert got.par == want.par
assert got.pr2 == want.pr2
assert got.month == want.month
assert got.day == want.day
assert got.days == want.days
assert got.hour == want.hour
assert got.minute == want.minute
@pytest.mark.asyncio
async def test_v1_upload_programs_empty_state_yields_immediate_eod() -> None:
"""No programs defined → the streaming iterator terminates without
yielding anything (the panel jumps straight to EOD)."""
panel = MockPanel(controller_key=CONTROLLER_KEY, state=MockState())
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
replies = [
r async for r in c.connection.iter_streaming(
OmniLinkMessageType.UploadPrograms
)
]
assert replies == []
# ---- v2 iter_programs (reason=1 "next defined" iteration) ---------------
@pytest.mark.asyncio
async def test_v2_upload_program_reason1_returns_next_defined_slot() -> None:
"""``request_reason=1`` should return the lowest defined slot strictly
greater than the requested number the C# panel uses this to iterate
(clsHAC.cs:5331)."""
seeded = {
5: Program(slot=5, prog_type=int(ProgramType.TIMED), cmd=3),
12: Program(slot=12, prog_type=int(ProgramType.TIMED), cmd=3),
99: Program(slot=99, prog_type=int(ProgramType.EVENT), cmd=5),
}
panel = MockPanel(
controller_key=CONTROLLER_KEY,
state=MockState(programs={s: p.encode_wire_bytes() for s, p in seeded.items()}),
)
async with (
panel.serve(transport="tcp") as (host, port),
OmniConnection(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as conn,
):
# Seed slot 0 with reason=1 → first defined slot (5).
reply = await conn.request(
OmniLink2MessageType.UploadProgram, struct.pack(">HB", 0, 1)
)
assert reply.opcode == int(OmniLink2MessageType.ProgramData)
assert (reply.payload[0] << 8) | reply.payload[1] == 5
# From slot 5 with reason=1 → slot 12.
reply = await conn.request(
OmniLink2MessageType.UploadProgram, struct.pack(">HB", 5, 1)
)
assert (reply.payload[0] << 8) | reply.payload[1] == 12
# From slot 12 with reason=1 → slot 99.
reply = await conn.request(
OmniLink2MessageType.UploadProgram, struct.pack(">HB", 12, 1)
)
assert (reply.payload[0] << 8) | reply.payload[1] == 99
# From slot 99 with reason=1 → EOD (no more).
reply = await conn.request(
OmniLink2MessageType.UploadProgram, struct.pack(">HB", 99, 1)
)
assert reply.opcode == int(OmniLink2MessageType.EOD)
@pytest.mark.asyncio
async def test_v2_client_iter_programs_enumerates_all_seeded() -> None:
"""High-level OmniClient.iter_programs() drives the reason=1 iteration
and yields decoded Program records in slot-ascending order."""
from omni_pca.client import OmniClient
seeded = {
12: Program(slot=12, prog_type=int(ProgramType.TIMED), cmd=3, hour=6, minute=0,
days=int(Days.MONDAY | Days.FRIDAY)),
42: Program(slot=42, prog_type=int(ProgramType.TIMED), cond=0x8D09, cond2=0x9B09,
cmd=0x44, par=3, pr2=0x0100, month=8, day=12,
days=int(Days.MONDAY), hour=7, minute=15),
99: Program(slot=99, prog_type=int(ProgramType.EVENT), cmd=5, month=5, day=12),
}
panel = MockPanel(
controller_key=CONTROLLER_KEY,
state=MockState(programs={s: p.encode_wire_bytes() for s, p in seeded.items()}),
)
async with panel.serve(transport="tcp") as (host, port):
async with OmniClient(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
received = [p async for p in c.iter_programs()]
assert [p.slot for p in received] == [12, 42, 99]
for got, want in zip(received, seeded.values()):
assert got.prog_type == want.prog_type
assert got.cmd == want.cmd
assert got.hour == want.hour
assert got.minute == want.minute
@pytest.mark.asyncio
async def test_v2_client_iter_programs_empty_state_yields_nothing() -> None:
from omni_pca.client import OmniClient
panel = MockPanel(controller_key=CONTROLLER_KEY, state=MockState())
async with panel.serve(transport="tcp") as (host, port):
async with OmniClient(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
received = [p async for p in c.iter_programs()]
assert received == []
# ---- v1 client iter_programs (high-level wrapper over iter_streaming) ----
@pytest.mark.asyncio
async def test_mockstate_from_pca_serves_real_panel_programs() -> None:
"""End-to-end: build MockState from the live .pca, drive iter_programs
over v2 wire, decode every yielded Program. This exercises the full
file mock wire decoder pipeline with real on-disk data.
The fixture is the same plain-text dump tests/test_pca_file.py uses;
we re-encrypt with KEY_EXPORT on the fly so parse_pca_file accepts it.
"""
from pathlib import Path
from omni_pca.client import OmniClient
from omni_pca.mock_panel import MockState
from omni_pca.pca_file import KEY_EXPORT, decrypt_pca_bytes
plain = Path("/home/kdm/home-auto/HAI/pca-re/extracted/Our_House.pca.plain")
if not plain.is_file():
pytest.skip(f"live fixture missing: {plain}")
encrypted = decrypt_pca_bytes(plain.read_bytes(), KEY_EXPORT)
state = MockState.from_pca(encrypted, key=KEY_EXPORT)
# SystemInfo fields were populated from the .pca header.
assert state.model_byte == 16 # OMNI_PRO_II
assert state.firmware_major == 2
# Programs: 330 defined per Phase 1 recon.
assert len(state.programs) == 330
panel = MockPanel(controller_key=CONTROLLER_KEY, state=state)
async with panel.serve(transport="tcp") as (host, port):
async with OmniClient(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
decoded = [p async for p in c.iter_programs()]
# Every defined slot streamed back, in ascending slot order.
assert len(decoded) == 330
assert [p.slot for p in decoded] == sorted(state.programs)
# Spot check: every decoded record has a known ProgramType.
for p in decoded:
assert p.prog_type in {1, 2, 3} # TIMED / EVENT / YEARLY from this fixture
@pytest.mark.asyncio
async def test_v1_client_iter_programs_enumerates_all_seeded() -> None:
seeded = {
12: Program(slot=12, prog_type=int(ProgramType.TIMED), cmd=3, hour=6, minute=0,
days=int(Days.MONDAY | Days.FRIDAY)),
42: Program(slot=42, prog_type=int(ProgramType.TIMED), cond=0x8D09, cond2=0x9B09,
cmd=0x44, par=3, pr2=0x0100, month=8, day=12,
days=int(Days.MONDAY), hour=7, minute=15),
99: Program(slot=99, prog_type=int(ProgramType.EVENT), cmd=5, month=5, day=12),
}
panel = MockPanel(
controller_key=CONTROLLER_KEY,
state=MockState(programs={s: p.encode_wire_bytes() for s, p in seeded.items()}),
)
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
received = [p async for p in c.iter_programs()]
assert [p.slot for p in received] == [12, 42, 99]
for got, want in zip(received, seeded.values()):
assert got.prog_type == want.prog_type
assert got.cmd == want.cmd
assert got.cond == want.cond
assert got.cond2 == want.cond2
assert got.hour == want.hour
assert got.minute == want.minute