Compare commits
5 Commits
290ba5a78d
...
390f3a9dc0
| Author | SHA1 | Date | |
|---|---|---|---|
| 390f3a9dc0 | |||
| e57fbc41e3 | |||
| b412dc0f37 | |||
| 4ad20c9350 | |||
| 933d326dd3 |
@ -16,6 +16,8 @@ from homeassistant.exceptions import ConfigEntryNotReady
|
||||
|
||||
from .const import (
|
||||
CONF_CONTROLLER_KEY,
|
||||
CONF_PCA_KEY,
|
||||
CONF_PCA_PATH,
|
||||
CONF_TRANSPORT,
|
||||
DEFAULT_TRANSPORT,
|
||||
DOMAIN,
|
||||
@ -57,6 +59,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
return False
|
||||
|
||||
transport: str = entry.data.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
|
||||
pca_path: str = entry.data.get(CONF_PCA_PATH, "") or ""
|
||||
pca_key: int = entry.data.get(CONF_PCA_KEY, 0)
|
||||
coordinator = OmniDataUpdateCoordinator(
|
||||
hass,
|
||||
entry,
|
||||
@ -64,6 +68,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
port=port,
|
||||
controller_key=controller_key,
|
||||
transport=transport,
|
||||
pca_path=pca_path or None,
|
||||
pca_key=pca_key,
|
||||
)
|
||||
|
||||
try:
|
||||
|
||||
@ -20,6 +20,8 @@ from omni_pca.connection import (
|
||||
|
||||
from .const import (
|
||||
CONF_CONTROLLER_KEY,
|
||||
CONF_PCA_KEY,
|
||||
CONF_PCA_PATH,
|
||||
CONF_TRANSPORT,
|
||||
CONTROLLER_KEY_HEX_LEN,
|
||||
DEFAULT_PORT,
|
||||
@ -70,6 +72,15 @@ _USER_SCHEMA = vol.Schema(
|
||||
vol.Required(CONF_TRANSPORT, default=DEFAULT_TRANSPORT): vol.In(
|
||||
[TRANSPORT_TCP, TRANSPORT_UDP]
|
||||
),
|
||||
# Optional: load panel programs from a saved .pca file on the HA
|
||||
# filesystem (e.g. /config/pca/My_House.pca) instead of streaming
|
||||
# them from the panel on every restart. Useful when wire
|
||||
# enumeration is slow or unreliable. CONF_PCA_KEY is the
|
||||
# per-install key from PCA01.CFG (a uint32, 0 for plain-text).
|
||||
vol.Optional(CONF_PCA_PATH, default=""): str,
|
||||
vol.Optional(CONF_PCA_KEY, default=0): vol.All(
|
||||
vol.Coerce(int), vol.Range(min=0, max=0xFFFFFFFF)
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
@ -90,6 +101,8 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||
host: str = user_input[CONF_HOST].strip()
|
||||
port: int = user_input[CONF_PORT]
|
||||
transport: str = user_input.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
|
||||
pca_path: str = (user_input.get(CONF_PCA_PATH) or "").strip()
|
||||
pca_key: int = user_input.get(CONF_PCA_KEY, 0)
|
||||
unique_id = f"{host}:{port}"
|
||||
|
||||
await self.async_set_unique_id(unique_id)
|
||||
@ -101,19 +114,24 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||
LOGGER.debug("controller key rejected: %s", err)
|
||||
errors[CONF_CONTROLLER_KEY] = "invalid_key"
|
||||
else:
|
||||
title, error = await self._probe(host, port, key, transport)
|
||||
if error is not None:
|
||||
errors["base"] = error
|
||||
else:
|
||||
return self.async_create_entry(
|
||||
title=title or f"Omni Panel ({host})",
|
||||
data={
|
||||
CONF_HOST: host,
|
||||
CONF_PORT: port,
|
||||
CONF_CONTROLLER_KEY: key.hex(),
|
||||
CONF_TRANSPORT: transport,
|
||||
},
|
||||
)
|
||||
if pca_path and (pca_err := await self._validate_pca(pca_path, pca_key)):
|
||||
errors[CONF_PCA_PATH] = pca_err
|
||||
if not errors:
|
||||
title, error = await self._probe(host, port, key, transport)
|
||||
if error is not None:
|
||||
errors["base"] = error
|
||||
else:
|
||||
return self.async_create_entry(
|
||||
title=title or f"Omni Panel ({host})",
|
||||
data={
|
||||
CONF_HOST: host,
|
||||
CONF_PORT: port,
|
||||
CONF_CONTROLLER_KEY: key.hex(),
|
||||
CONF_TRANSPORT: transport,
|
||||
CONF_PCA_PATH: pca_path,
|
||||
CONF_PCA_KEY: pca_key,
|
||||
},
|
||||
)
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
@ -121,6 +139,33 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
async def _validate_pca(self, path: str, key: int) -> str | None:
|
||||
"""Validate the user-supplied .pca path is readable and decrypts.
|
||||
|
||||
Returns ``None`` on success or an error code string on failure
|
||||
(matches the {code: message} keys in strings.json).
|
||||
"""
|
||||
from pathlib import Path
|
||||
|
||||
from omni_pca.pca_file import parse_pca_file
|
||||
|
||||
p = Path(path)
|
||||
if not p.is_file():
|
||||
return "pca_not_found"
|
||||
try:
|
||||
data = await self.hass.async_add_executor_job(p.read_bytes)
|
||||
acct = await self.hass.async_add_executor_job(
|
||||
lambda: parse_pca_file(data, key=key)
|
||||
)
|
||||
except Exception as err:
|
||||
LOGGER.debug("pca file rejected: %s", err)
|
||||
return "pca_decode_failed"
|
||||
# Sanity: programs block decoded cleanly. Empty is allowed
|
||||
# (legitimate brand-new install with no programs).
|
||||
if not isinstance(acct.programs, tuple):
|
||||
return "pca_decode_failed"
|
||||
return None
|
||||
|
||||
async def async_step_reauth(
|
||||
self, entry_data: Mapping[str, Any]
|
||||
) -> ConfigFlowResult:
|
||||
|
||||
@ -14,6 +14,13 @@ DEFAULT_TIMEOUT: Final = 5.0
|
||||
CONF_CONTROLLER_KEY: Final = "controller_key"
|
||||
CONF_TRANSPORT: Final = "transport"
|
||||
|
||||
# Optional: when set, load panel programs from a .pca file at this path
|
||||
# instead of enumerating them over the wire on every entry refresh. The
|
||||
# .pca file is decrypted with CONF_PCA_KEY (the per-install key from
|
||||
# PCA01.CFG, or 0 for a plain-text dump). Both must be set together.
|
||||
CONF_PCA_PATH: Final = "pca_path"
|
||||
CONF_PCA_KEY: Final = "pca_key"
|
||||
|
||||
TRANSPORT_TCP: Final = "tcp"
|
||||
TRANSPORT_UDP: Final = "udp"
|
||||
DEFAULT_TRANSPORT: Final = TRANSPORT_TCP
|
||||
|
||||
@ -62,7 +62,6 @@ from omni_pca.models import (
|
||||
AreaStatus,
|
||||
ButtonProperties,
|
||||
ObjectType,
|
||||
ProgramProperties,
|
||||
SystemInformation,
|
||||
SystemStatus,
|
||||
ThermostatProperties,
|
||||
@ -73,6 +72,7 @@ from omni_pca.models import (
|
||||
ZoneStatus,
|
||||
)
|
||||
from omni_pca.opcodes import OmniLink2MessageType
|
||||
from omni_pca.programs import Program
|
||||
|
||||
from .const import (
|
||||
DOMAIN,
|
||||
@ -108,7 +108,7 @@ class OmniData:
|
||||
areas: dict[int, AreaProperties] = field(default_factory=dict)
|
||||
thermostats: dict[int, ThermostatProperties] = field(default_factory=dict)
|
||||
buttons: dict[int, ButtonProperties] = field(default_factory=dict)
|
||||
programs: dict[int, ProgramProperties] = field(default_factory=dict)
|
||||
programs: dict[int, Program] = field(default_factory=dict)
|
||||
|
||||
zone_status: dict[int, ZoneStatus] = field(default_factory=dict)
|
||||
unit_status: dict[int, UnitStatus] = field(default_factory=dict)
|
||||
@ -138,6 +138,8 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
|
||||
port: int,
|
||||
controller_key: bytes,
|
||||
transport: str = "tcp",
|
||||
pca_path: str | None = None,
|
||||
pca_key: int = 0,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
hass,
|
||||
@ -150,6 +152,8 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
|
||||
self._port = port
|
||||
self._controller_key = controller_key
|
||||
self._transport = transport
|
||||
self._pca_path = pca_path
|
||||
self._pca_key = pca_key
|
||||
self._client: OmniClient | None = None
|
||||
self._discovery_done = False
|
||||
self._discovered: OmniData | None = None
|
||||
@ -382,15 +386,70 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
|
||||
|
||||
async def _discover_programs(
|
||||
self, client: OmniClient
|
||||
) -> dict[int, ProgramProperties]:
|
||||
# Programs aren't reachable via the Properties opcode (the C# side
|
||||
# uses a separate request/reply pair), so we just return an empty
|
||||
# dict. We keep the field on OmniData so Phase B can plug in real
|
||||
# discovery the moment the library exposes it. AMBIGUITY: the spec
|
||||
# asks for "named programs" — there's no on-the-wire path for that
|
||||
# in v1.0 of omni_pca, so an empty mapping is the honest answer.
|
||||
_ = client, ProgramProperties
|
||||
return {}
|
||||
) -> dict[int, Program]:
|
||||
"""Enumerate defined panel programs.
|
||||
|
||||
Two sources, in order of preference:
|
||||
|
||||
1. ``CONF_PCA_PATH`` is configured → parse the .pca file and
|
||||
extract the programs block. Avoids streaming 1500 records on
|
||||
every entry refresh and works against an offline snapshot.
|
||||
2. Otherwise → enumerate over the wire:
|
||||
* v2 (TCP): ``client.iter_programs()`` drives UploadProgram
|
||||
with request_reason=1 ("next defined after slot").
|
||||
* v1 (UDP): adapter forwards to OmniClientV1.iter_programs(),
|
||||
a bare UploadPrograms stream ack-walked to EOD.
|
||||
|
||||
Both paths yield :class:`omni_pca.programs.Program` and skip
|
||||
empty slots. Errors are logged and swallowed — programs are
|
||||
non-critical discovery, so a partial list beats blocking setup.
|
||||
"""
|
||||
if self._pca_path:
|
||||
return await self._discover_programs_from_pca()
|
||||
out: dict[int, Program] = {}
|
||||
try:
|
||||
async for prog in client.iter_programs():
|
||||
if prog.slot is not None:
|
||||
out[prog.slot] = prog
|
||||
except (OmniConnectionError, RequestTimeoutError):
|
||||
raise
|
||||
except Exception:
|
||||
LOGGER.debug(
|
||||
"program enumeration interrupted (kept %d)", len(out), exc_info=True
|
||||
)
|
||||
return out
|
||||
|
||||
async def _discover_programs_from_pca(self) -> dict[int, Program]:
|
||||
"""Parse the configured .pca file and pull out its programs block.
|
||||
|
||||
Runs the disk I/O on the executor since :mod:`pca_file` does
|
||||
sync reads. Any failure (missing file, bad key, malformed block)
|
||||
is logged and downgraded to an empty dict — the rest of
|
||||
discovery still works.
|
||||
"""
|
||||
from pathlib import Path
|
||||
|
||||
from omni_pca.pca_file import parse_pca_file
|
||||
|
||||
try:
|
||||
data = await self.hass.async_add_executor_job(
|
||||
Path(self._pca_path).read_bytes
|
||||
)
|
||||
acct = await self.hass.async_add_executor_job(
|
||||
lambda: parse_pca_file(data, key=self._pca_key)
|
||||
)
|
||||
except Exception:
|
||||
LOGGER.warning(
|
||||
"failed to load programs from %s — falling back to empty list",
|
||||
self._pca_path, exc_info=True,
|
||||
)
|
||||
return {}
|
||||
out: dict[int, Program] = {}
|
||||
for prog in acct.programs:
|
||||
if prog.slot is None or prog.is_empty():
|
||||
continue
|
||||
out[prog.slot] = prog
|
||||
return out
|
||||
|
||||
async def _walk_properties(
|
||||
self,
|
||||
|
||||
@ -69,6 +69,7 @@ async def async_setup_entry(
|
||||
|
||||
entities.append(OmniSystemModelSensor(coordinator))
|
||||
entities.append(OmniLastEventSensor(coordinator))
|
||||
entities.append(OmniProgramsSensor(coordinator))
|
||||
|
||||
async_add_entities(entities)
|
||||
|
||||
@ -261,3 +262,55 @@ class OmniLastEventSensor(
|
||||
if hasattr(ev, key):
|
||||
result[key] = getattr(ev, key)
|
||||
return result
|
||||
|
||||
|
||||
class OmniProgramsSensor(
|
||||
CoordinatorEntity[OmniDataUpdateCoordinator], SensorEntity
|
||||
):
|
||||
"""Diagnostic sensor exposing the panel's automation programs.
|
||||
|
||||
State value is the count of defined programs. The ``programs``
|
||||
attribute carries a list of per-program summaries — a stable,
|
||||
JSON-serializable view automations and template sensors can read.
|
||||
"""
|
||||
|
||||
_attr_has_entity_name = True
|
||||
_attr_entity_category = EntityCategory.DIAGNOSTIC
|
||||
_attr_state_class = SensorStateClass.MEASUREMENT
|
||||
|
||||
def __init__(self, coordinator: OmniDataUpdateCoordinator) -> None:
|
||||
super().__init__(coordinator)
|
||||
self._attr_unique_id = f"{coordinator.unique_id}-programs"
|
||||
self._attr_name = "Panel Programs"
|
||||
self._attr_device_info = coordinator.device_info
|
||||
|
||||
@property
|
||||
def native_value(self) -> int:
|
||||
return len(self.coordinator.data.programs)
|
||||
|
||||
@property
|
||||
def extra_state_attributes(self) -> dict[str, Any]:
|
||||
from omni_pca.programs import ProgramType
|
||||
|
||||
summaries: list[dict[str, Any]] = []
|
||||
for slot in sorted(self.coordinator.data.programs):
|
||||
p = self.coordinator.data.programs[slot]
|
||||
try:
|
||||
type_name = ProgramType(p.prog_type).name
|
||||
except ValueError:
|
||||
type_name = f"UNKNOWN({p.prog_type})"
|
||||
summaries.append(
|
||||
{
|
||||
"slot": slot,
|
||||
"type": type_name,
|
||||
"cmd": p.cmd,
|
||||
"par": p.par,
|
||||
"pr2": p.pr2,
|
||||
"month": p.month,
|
||||
"day": p.day,
|
||||
"days": p.days,
|
||||
"hour": p.hour,
|
||||
"minute": p.minute,
|
||||
}
|
||||
)
|
||||
return {"programs": summaries}
|
||||
|
||||
@ -3,11 +3,14 @@
|
||||
"step": {
|
||||
"user": {
|
||||
"title": "Connect to your Omni panel",
|
||||
"description": "Enter the IP/hostname of your HAI/Leviton Omni Pro II controller and the 32-character hex Controller Key. Use `omni-pca decode-pca <file>.pca --field controller_key` to extract the key from a PC Access export.",
|
||||
"description": "Enter the IP/hostname of your HAI/Leviton Omni Pro II controller and the 32-character hex Controller Key. Use `omni-pca decode-pca <file>.pca --field controller_key` to extract the key from a PC Access export. Optionally provide a path to a saved `.pca` file to load panel programs from disk instead of streaming them from the controller.",
|
||||
"data": {
|
||||
"host": "Host",
|
||||
"port": "Port",
|
||||
"controller_key": "Controller Key (32 hex chars)"
|
||||
"controller_key": "Controller Key (32 hex chars)",
|
||||
"transport": "Transport",
|
||||
"pca_path": ".pca file path (optional)",
|
||||
"pca_key": ".pca per-install key (integer; 0 if plain-text)"
|
||||
}
|
||||
},
|
||||
"reauth_confirm": {
|
||||
@ -22,6 +25,8 @@
|
||||
"cannot_connect": "Could not reach the panel. Check the host, port, and that TCP/4369 is open.",
|
||||
"invalid_auth": "The Controller Key was rejected by the panel.",
|
||||
"invalid_key": "Controller Key must be exactly 32 hexadecimal characters (16 bytes).",
|
||||
"pca_not_found": "No file at the supplied .pca path.",
|
||||
"pca_decode_failed": "Could not decode the .pca file. Check the per-install key.",
|
||||
"unknown": "An unexpected error occurred. Check the Home Assistant logs."
|
||||
},
|
||||
"abort": {
|
||||
|
||||
@ -3,11 +3,14 @@
|
||||
"step": {
|
||||
"user": {
|
||||
"title": "Connect to your Omni panel",
|
||||
"description": "Enter the IP/hostname of your HAI/Leviton Omni Pro II controller and the 32-character hex Controller Key. Use `omni-pca decode-pca <file>.pca --field controller_key` to extract the key from a PC Access export.",
|
||||
"description": "Enter the IP/hostname of your HAI/Leviton Omni Pro II controller and the 32-character hex Controller Key. Use `omni-pca decode-pca <file>.pca --field controller_key` to extract the key from a PC Access export. Optionally provide a path to a saved `.pca` file to load panel programs from disk instead of streaming them from the controller.",
|
||||
"data": {
|
||||
"host": "Host",
|
||||
"port": "Port",
|
||||
"controller_key": "Controller Key (32 hex chars)"
|
||||
"controller_key": "Controller Key (32 hex chars)",
|
||||
"transport": "Transport",
|
||||
"pca_path": ".pca file path (optional)",
|
||||
"pca_key": ".pca per-install key (integer; 0 if plain-text)"
|
||||
}
|
||||
},
|
||||
"reauth_confirm": {
|
||||
@ -22,6 +25,8 @@
|
||||
"cannot_connect": "Could not reach the panel. Check the host, port, and that TCP/4369 is open.",
|
||||
"invalid_auth": "The Controller Key was rejected by the panel.",
|
||||
"invalid_key": "Controller Key must be exactly 32 hexadecimal characters (16 bytes).",
|
||||
"pca_not_found": "No file at the supplied .pca path.",
|
||||
"pca_decode_failed": "Could not decode the .pca file. Check the per-install key.",
|
||||
"unknown": "An unexpected error occurred. Check the Home Assistant logs."
|
||||
},
|
||||
"abort": {
|
||||
|
||||
@ -607,6 +607,45 @@ class OmniClient:
|
||||
"""
|
||||
await self.execute_command(Command.CLEAR_MESSAGE, parameter2=index)
|
||||
|
||||
# ---- program enumeration --------------------------------------------
|
||||
|
||||
async def iter_programs(self) -> AsyncIterator["Program"]:
|
||||
"""Stream every defined program from the panel.
|
||||
|
||||
v2 has no bulk "send all programs" opcode; instead the panel
|
||||
exposes an iterator semantic via ``UploadProgram`` with
|
||||
``request_reason=1`` ("next defined after this slot"). We seed
|
||||
with slot 0 and follow each reply's ``ProgramNumber`` back into
|
||||
the next request until the panel sends EOD.
|
||||
|
||||
Mirrors the C# ReadConfig loop at ``clsHAC.OL2ReadConfigProcessProgramData``
|
||||
(clsHAC.cs:5323-5332) and the seed call at clsHAC.cs:4985.
|
||||
|
||||
Yields decoded :class:`omni_pca.programs.Program` instances, one
|
||||
per defined slot in ascending slot order. Empty slots are
|
||||
skipped by the panel — the iterator only sees defined programs.
|
||||
"""
|
||||
from .programs import Program # local import: avoids cycle in __init__
|
||||
slot = 0
|
||||
while True:
|
||||
payload = bytes([(slot >> 8) & 0xFF, slot & 0xFF, 1])
|
||||
reply = await self._conn.request(
|
||||
OmniLink2MessageType.UploadProgram, payload
|
||||
)
|
||||
if reply.opcode == int(OmniLink2MessageType.EOD):
|
||||
return
|
||||
if reply.opcode != int(OmniLink2MessageType.ProgramData):
|
||||
raise OmniConnectionError(
|
||||
f"unexpected opcode {reply.opcode} during UploadProgram iteration "
|
||||
f"(expected {int(OmniLink2MessageType.ProgramData)})"
|
||||
)
|
||||
if len(reply.payload) < 2 + 14:
|
||||
raise OmniConnectionError(
|
||||
f"ProgramData payload too short ({len(reply.payload)} bytes)"
|
||||
)
|
||||
slot = (reply.payload[0] << 8) | reply.payload[1]
|
||||
yield Program.from_wire_bytes(reply.payload[2 : 2 + 14], slot=slot)
|
||||
|
||||
# ---- helpers (status) -----------------------------------------------
|
||||
|
||||
async def _fetch_status_range(
|
||||
|
||||
@ -52,7 +52,7 @@ import secrets
|
||||
from collections.abc import AsyncIterator, Callable
|
||||
from contextlib import asynccontextmanager
|
||||
from dataclasses import dataclass, field
|
||||
from typing import ClassVar
|
||||
from typing import Any, ClassVar
|
||||
|
||||
from .commands import Command
|
||||
from .crypto import (
|
||||
@ -266,6 +266,52 @@ class MockState:
|
||||
self.thermostats = _promote_dict(self.thermostats, MockThermostatState)
|
||||
self.buttons = _promote_dict(self.buttons, MockButtonState)
|
||||
|
||||
@classmethod
|
||||
def from_pca(
|
||||
cls,
|
||||
path_or_bytes: str | bytes,
|
||||
key: int,
|
||||
**overrides: Any,
|
||||
) -> MockState:
|
||||
"""Build a MockState seeded from a real .pca file.
|
||||
|
||||
Populated from the .pca:
|
||||
|
||||
* ``model_byte`` + ``firmware_*`` — drive SystemInformation replies
|
||||
so a connected client sees the panel the .pca came from.
|
||||
* ``programs`` — every non-empty Program record from the 1500-slot
|
||||
table, encoded back to wire bytes so UploadProgram / UploadPrograms
|
||||
serve them exactly as a real panel would.
|
||||
|
||||
Everything else uses MockState defaults (or whatever the caller
|
||||
passes as ``**overrides``). Per-object name/state tables aren't
|
||||
in the .pca header that pca_file currently extracts, so zones /
|
||||
units / areas / thermostats default to empty unless the caller
|
||||
explicitly provides them.
|
||||
|
||||
``key=0`` only works for files where the export keystream was
|
||||
already applied (e.g., the result of ``decrypt_pca_bytes`` with
|
||||
the same key); use ``KEY_EXPORT`` (391549495) for unmodified
|
||||
PC Access exports.
|
||||
"""
|
||||
from .pca_file import parse_pca_file
|
||||
|
||||
acct = parse_pca_file(path_or_bytes, key=key)
|
||||
programs = {
|
||||
p.slot: p.encode_wire_bytes()
|
||||
for p in acct.programs
|
||||
if p.slot is not None and not p.is_empty()
|
||||
}
|
||||
defaults: dict[str, Any] = {
|
||||
"model_byte": acct.model,
|
||||
"firmware_major": acct.firmware_major,
|
||||
"firmware_minor": acct.firmware_minor,
|
||||
"firmware_revision": acct.firmware_revision,
|
||||
"programs": programs,
|
||||
}
|
||||
defaults.update(overrides)
|
||||
return cls(**defaults)
|
||||
|
||||
# ---- name-bytes helpers (kept for back-compat with old callers) -----
|
||||
|
||||
def zone_name_bytes(self, idx: int) -> bytes:
|
||||
@ -344,6 +390,9 @@ class MockPanel:
|
||||
# v1 UploadNames cursor: index into self._v1_name_stream() while a
|
||||
# streaming download is in flight, ``None`` when no stream active.
|
||||
self._upload_names_cursor: int | None = None
|
||||
# v1 UploadPrograms cursor: index into self._v1_program_stream() while
|
||||
# a streaming download is in flight, ``None`` when no stream active.
|
||||
self._upload_programs_cursor: int | None = None
|
||||
|
||||
# -------- public observables (handy in tests) --------
|
||||
|
||||
@ -709,18 +758,37 @@ class MockPanel:
|
||||
return _build_nak(opcode), ()
|
||||
|
||||
def _reply_program_data(self, payload: bytes) -> Message:
|
||||
"""Single-shot v2 program read.
|
||||
"""v2 program read — single-slot OR iterator.
|
||||
|
||||
Request payload: ``[number_hi, number_lo, request_reason]`` (3 bytes
|
||||
per ``clsOL2MsgUploadProgram``). Reply payload: ``[number_hi,
|
||||
number_lo] + raw_14_byte_body`` per ``clsOL2MsgProgramData``.
|
||||
|
||||
If the slot is missing from ``state.programs`` we serve 14 zero
|
||||
bytes — same as a real panel reporting an empty slot.
|
||||
``request_reason`` semantics mirror the C# ReadConfig flow at
|
||||
clsHAC.cs:4985 / 5331:
|
||||
|
||||
0 → return the exact requested slot (zero body if undefined).
|
||||
1 → "next defined": return the lowest slot strictly greater
|
||||
than the requested number. If none, return EOD. The
|
||||
C# client iterates by feeding back each received slot
|
||||
number with reason=1 until EOD.
|
||||
|
||||
Any other reason value is treated as reason=0 (we have no other
|
||||
captures showing alternate semantics).
|
||||
"""
|
||||
if len(payload) < 2:
|
||||
return _build_nak(OmniLink2MessageType.UploadProgram)
|
||||
number = (payload[0] << 8) | payload[1]
|
||||
reason = payload[2] if len(payload) >= 3 else 0
|
||||
if reason == 1:
|
||||
# "Next defined after this slot." If start_slot=0 (initial
|
||||
# call) and no programs are defined, we fall straight to EOD.
|
||||
next_slot = min(
|
||||
(s for s in self.state.programs if s > number), default=None
|
||||
)
|
||||
if next_slot is None:
|
||||
return encode_v2(OmniLink2MessageType.EOD, b"")
|
||||
number = next_slot
|
||||
body = self.state.programs.get(number, b"\x00" * 14)
|
||||
if len(body) != 14:
|
||||
return _build_nak(OmniLink2MessageType.UploadProgram)
|
||||
@ -1236,10 +1304,14 @@ class MockPanel:
|
||||
return self._v1_reply_auxiliary_status(payload), ()
|
||||
if opcode == OmniLinkMessageType.UploadNames:
|
||||
return self._v1_start_upload_names_stream(), ()
|
||||
if opcode == OmniLinkMessageType.UploadPrograms:
|
||||
return self._v1_start_upload_programs_stream(), ()
|
||||
if opcode == OmniLinkMessageType.Ack:
|
||||
# During an active UploadNames stream, each client Ack
|
||||
# advances the cursor. With no active stream, drop silently
|
||||
# (Ack as a request opcode is only meaningful mid-stream).
|
||||
# During an active stream, each client Ack advances the
|
||||
# appropriate cursor. With no active stream, Ack as a request
|
||||
# opcode is only meaningful mid-stream — NAK it.
|
||||
if self._upload_programs_cursor is not None:
|
||||
return self._v1_advance_upload_programs_stream(), ()
|
||||
if self._upload_names_cursor is not None:
|
||||
return self._v1_advance_upload_names_stream(), ()
|
||||
return _build_v1_nak(opcode), ()
|
||||
@ -1450,6 +1522,46 @@ class MockPanel:
|
||||
t, n, name = names[self._upload_names_cursor]
|
||||
return self._v1_namedata_msg(t, n, name)
|
||||
|
||||
# ---- UploadPrograms streaming ----
|
||||
#
|
||||
# Wire flow per clsHAC.OL1ReadConfig (clsHAC.cs:4403, 4538-4540, 4642-4651):
|
||||
# client → UploadPrograms (bare)
|
||||
# panel → ProgramData (slot N body)
|
||||
# client → Ack
|
||||
# panel → ProgramData (slot N+1 body) ...
|
||||
# panel → EOD
|
||||
#
|
||||
# ProgramData body layout matches v2 exactly (clsOLMsgProgramData
|
||||
# mirrors clsOL2MsgProgramData byte-for-byte) — both prepend a 2-byte
|
||||
# BE ProgramNumber to the 14-byte wire body. Only the outer envelope
|
||||
# opcode differs (v1 vs v2).
|
||||
|
||||
def _v1_program_stream(self) -> list[int]:
|
||||
"""Sorted list of defined program slot numbers."""
|
||||
return sorted(self.state.programs)
|
||||
|
||||
def _v1_programdata_msg(self, slot: int) -> Message:
|
||||
body = self.state.programs.get(slot, b"\x00" * 14)
|
||||
payload = bytes([(slot >> 8) & 0xFF, slot & 0xFF]) + body
|
||||
return encode_v1(OmniLinkMessageType.ProgramData, payload)
|
||||
|
||||
def _v1_start_upload_programs_stream(self) -> Message:
|
||||
slots = self._v1_program_stream()
|
||||
if not slots:
|
||||
self._upload_programs_cursor = None
|
||||
return _build_v1_eod()
|
||||
self._upload_programs_cursor = 0
|
||||
return self._v1_programdata_msg(slots[0])
|
||||
|
||||
def _v1_advance_upload_programs_stream(self) -> Message:
|
||||
slots = self._v1_program_stream()
|
||||
assert self._upload_programs_cursor is not None
|
||||
self._upload_programs_cursor += 1
|
||||
if self._upload_programs_cursor >= len(slots):
|
||||
self._upload_programs_cursor = None
|
||||
return _build_v1_eod()
|
||||
return self._v1_programdata_msg(slots[self._upload_programs_cursor])
|
||||
|
||||
# ---- v1 Command / ExecuteSecurityCommand wrappers ----
|
||||
# The wire payload format is byte-identical to v2 (clsOLMsgCommand.cs
|
||||
# vs clsOL2MsgCommand.cs); only the outer opcode and the reply Ack
|
||||
|
||||
@ -172,6 +172,16 @@ class OmniClientV1Adapter:
|
||||
async def list_message_names(self) -> dict[int, str]:
|
||||
return (await self._ensure_names()).get(7, {}) # NameType.MESSAGE
|
||||
|
||||
# ---- programs ------------------------------------------------------
|
||||
|
||||
def iter_programs(self):
|
||||
"""Forward to OmniClientV1.iter_programs (streaming UploadPrograms).
|
||||
|
||||
Same async-iterator shape as :meth:`OmniClient.iter_programs` so the
|
||||
coordinator does not need a transport branch.
|
||||
"""
|
||||
return self._client.iter_programs()
|
||||
|
||||
# ---- properties synthesis ------------------------------------------
|
||||
|
||||
async def get_object_properties(
|
||||
|
||||
@ -208,6 +208,39 @@ class OmniClientV1:
|
||||
async def list_button_names(self) -> dict[int, str]:
|
||||
return (await self.list_all_names()).get(int(NameType.BUTTON), {})
|
||||
|
||||
# ---- programs (streaming UploadPrograms) -----------------------------
|
||||
|
||||
async def iter_programs(self) -> AsyncIterator["Program"]:
|
||||
"""Stream every defined program from the panel.
|
||||
|
||||
v1 has no per-slot request — a bare ``UploadPrograms`` triggers
|
||||
the panel to dump every defined program in ascending slot order,
|
||||
each as a separate ``ProgramData`` reply that we must
|
||||
``Acknowledge`` to advance.
|
||||
|
||||
Reference: clsHAC.cs:4403 (bare UploadPrograms send), 4642-4651
|
||||
(per-reply ack-walk), 4538-4540 (dispatch).
|
||||
|
||||
Yields decoded :class:`omni_pca.programs.Program` instances.
|
||||
Empty slots are not transmitted — the iterator only sees defined
|
||||
programs.
|
||||
"""
|
||||
from ..programs import Program
|
||||
async for reply in self._conn.iter_streaming(
|
||||
OmniLinkMessageType.UploadPrograms
|
||||
):
|
||||
if reply.opcode != int(OmniLinkMessageType.ProgramData):
|
||||
raise OmniProtocolError(
|
||||
f"unexpected opcode {reply.opcode} during UploadPrograms stream "
|
||||
f"(expected {int(OmniLinkMessageType.ProgramData)})"
|
||||
)
|
||||
if len(reply.payload) < 2 + 14:
|
||||
raise OmniProtocolError(
|
||||
f"ProgramData payload too short ({len(reply.payload)} bytes)"
|
||||
)
|
||||
slot = (reply.payload[0] << 8) | reply.payload[1]
|
||||
yield Program.from_wire_bytes(reply.payload[2 : 2 + 14], slot=slot)
|
||||
|
||||
# ---- write methods (Command + ExecuteSecurityCommand) ----------------
|
||||
#
|
||||
# The Command and ExecuteSecurityCommand payloads are byte-identical
|
||||
|
||||
@ -64,6 +64,26 @@ def _short_scan_interval(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
@pytest.fixture
|
||||
def populated_state() -> MockState:
|
||||
"""A lightly-populated mock state covering every entity platform."""
|
||||
from omni_pca.programs import Days, Program, ProgramType
|
||||
programs = {
|
||||
slot: prog.encode_wire_bytes()
|
||||
for slot, prog in {
|
||||
12: Program(
|
||||
slot=12, prog_type=int(ProgramType.TIMED),
|
||||
cmd=3, hour=6, minute=0,
|
||||
days=int(Days.MONDAY | Days.FRIDAY),
|
||||
),
|
||||
42: Program(
|
||||
slot=42, prog_type=int(ProgramType.TIMED),
|
||||
cmd=4, hour=22, minute=30,
|
||||
days=int(Days.SUNDAY),
|
||||
),
|
||||
99: Program(
|
||||
slot=99, prog_type=int(ProgramType.EVENT),
|
||||
cmd=5, month=5, day=12,
|
||||
),
|
||||
}.items()
|
||||
}
|
||||
return MockState(
|
||||
zones={
|
||||
1: MockZoneState(name="FRONT_DOOR"),
|
||||
@ -84,6 +104,7 @@ def populated_state() -> MockState:
|
||||
1: MockButtonState(name="GOOD_MORNING"),
|
||||
},
|
||||
user_codes={1: 1234},
|
||||
programs=programs,
|
||||
)
|
||||
|
||||
|
||||
|
||||
131
tests/ha_integration/test_pca_source.py
Normal file
131
tests/ha_integration/test_pca_source.py
Normal file
@ -0,0 +1,131 @@
|
||||
"""HA-side integration: optional .pca file source for panel programs.
|
||||
|
||||
When ``CONF_PCA_PATH`` is set in the entry data, the coordinator should
|
||||
parse the .pca file at that path (with ``CONF_PCA_KEY`` as the per-install
|
||||
key) and use those programs *instead* of streaming them over the wire.
|
||||
The wire-based discovery for everything else (zones, units, etc.) is
|
||||
unaffected.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import AsyncIterator
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from custom_components.omni_pca.const import (
|
||||
CONF_CONTROLLER_KEY,
|
||||
CONF_PCA_KEY,
|
||||
CONF_PCA_PATH,
|
||||
DOMAIN,
|
||||
)
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import CONF_HOST, CONF_PORT
|
||||
from homeassistant.core import HomeAssistant
|
||||
from pytest_homeassistant_custom_component.common import MockConfigEntry
|
||||
|
||||
from .conftest import CONTROLLER_KEY_HEX
|
||||
|
||||
LIVE_FIXTURE_PLAIN = Path(
|
||||
"/home/kdm/home-auto/HAI/pca-re/extracted/Our_House.pca.plain"
|
||||
)
|
||||
|
||||
|
||||
def _materialize_encrypted_fixture(tmp_path: Path) -> tuple[Path, int]:
|
||||
"""Re-encrypt the plain fixture so parse_pca_file can decrypt it.
|
||||
|
||||
parse_pca_file always runs the XOR keystream. The plain dump bypasses
|
||||
that, so we re-apply the keystream with KEY_EXPORT and write the
|
||||
result to tmp_path. Returns (file_path, key) the coordinator should use.
|
||||
"""
|
||||
from omni_pca.pca_file import KEY_EXPORT, decrypt_pca_bytes
|
||||
|
||||
plain = LIVE_FIXTURE_PLAIN.read_bytes()
|
||||
# XOR is symmetric — "decrypt" of plain bytes with the export key
|
||||
# produces a valid encrypted .pca that parse_pca_file can read back.
|
||||
encrypted = decrypt_pca_bytes(plain, KEY_EXPORT)
|
||||
fixture = tmp_path / "Test_House.pca"
|
||||
fixture.write_bytes(encrypted)
|
||||
return fixture, KEY_EXPORT
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def configured_with_pca(
|
||||
hass: HomeAssistant, panel: tuple[Any, str, int], tmp_path: Path
|
||||
) -> AsyncIterator[ConfigEntry]:
|
||||
"""Config entry pointing at a .pca file fixture for programs."""
|
||||
if not LIVE_FIXTURE_PLAIN.is_file():
|
||||
pytest.skip(f"live .pca fixture missing: {LIVE_FIXTURE_PLAIN}")
|
||||
|
||||
fixture_path, pca_key = _materialize_encrypted_fixture(tmp_path)
|
||||
|
||||
_, host, port = panel
|
||||
entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={
|
||||
CONF_HOST: host,
|
||||
CONF_PORT: port,
|
||||
CONF_CONTROLLER_KEY: CONTROLLER_KEY_HEX,
|
||||
CONF_PCA_PATH: str(fixture_path),
|
||||
CONF_PCA_KEY: pca_key,
|
||||
},
|
||||
title=f"Mock Omni @ {host}:{port} (with .pca)",
|
||||
unique_id=f"{host}:{port}",
|
||||
)
|
||||
entry.add_to_hass(hass)
|
||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
try:
|
||||
yield entry
|
||||
finally:
|
||||
if entry.entry_id in hass.data.get(DOMAIN, {}):
|
||||
await hass.config_entries.async_unload(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
||||
async def test_pca_source_overrides_wire_programs(
|
||||
hass: HomeAssistant, configured_with_pca: ConfigEntry
|
||||
) -> None:
|
||||
"""The fixture .pca has 330 defined programs (Phase 1 recon). The mock
|
||||
panel only seeded 3 in conftest. When pca_path is set, the .pca count
|
||||
wins — proving the coordinator routed through _discover_programs_from_pca,
|
||||
not iter_programs."""
|
||||
coordinator = hass.data[DOMAIN][configured_with_pca.entry_id]
|
||||
assert len(coordinator.data.programs) == 330
|
||||
|
||||
# Sanity: the diagnostic sensor reflects the .pca count, not the mock seed.
|
||||
sensors = [
|
||||
s for s in hass.states.async_all("sensor")
|
||||
if "panel_programs" in s.entity_id
|
||||
]
|
||||
assert len(sensors) == 1
|
||||
assert int(sensors[0].state) == 330
|
||||
|
||||
|
||||
async def test_pca_path_validation_rejects_missing_file(
|
||||
hass: HomeAssistant, tmp_path: Path
|
||||
) -> None:
|
||||
"""The config-flow validator returns ``pca_not_found`` for an absent
|
||||
file. We exercise the helper directly to avoid spinning a full mock
|
||||
panel just for the validation branch."""
|
||||
from custom_components.omni_pca.config_flow import OmniConfigFlow
|
||||
|
||||
flow = OmniConfigFlow()
|
||||
flow.hass = hass
|
||||
err = await flow._validate_pca(str(tmp_path / "does-not-exist.pca"), 0)
|
||||
assert err == "pca_not_found"
|
||||
|
||||
|
||||
async def test_pca_path_validation_rejects_garbage(
|
||||
hass: HomeAssistant, tmp_path: Path
|
||||
) -> None:
|
||||
"""A file that doesn't decode as a .pca returns ``pca_decode_failed``."""
|
||||
from custom_components.omni_pca.config_flow import OmniConfigFlow
|
||||
|
||||
garbage = tmp_path / "garbage.pca"
|
||||
garbage.write_bytes(b"not a real pca file" * 1000)
|
||||
flow = OmniConfigFlow()
|
||||
flow.hass = hass
|
||||
err = await flow._validate_pca(str(garbage), 0)
|
||||
assert err == "pca_decode_failed"
|
||||
@ -78,6 +78,35 @@ async def test_button_entity_for_panel_button(
|
||||
assert len(states) == 1
|
||||
|
||||
|
||||
async def test_programs_sensor_reflects_seeded_panel(
|
||||
hass: HomeAssistant, configured_panel
|
||||
) -> None:
|
||||
"""The diagnostic Panel Programs sensor enumerates discovered programs.
|
||||
|
||||
Mock seed has 3 programs at slots 12 / 42 / 99 (see
|
||||
:func:`populated_state`); after discovery the coordinator stashes them
|
||||
on ``OmniData.programs`` and the sensor surfaces a count + per-slot
|
||||
summary attribute.
|
||||
"""
|
||||
programs_states = [
|
||||
s for s in hass.states.async_all("sensor")
|
||||
if s.entity_id.endswith("_panel_programs") or "panel_programs" in s.entity_id
|
||||
]
|
||||
assert len(programs_states) == 1, (
|
||||
f"expected one panel_programs sensor, got {[s.entity_id for s in programs_states]}"
|
||||
)
|
||||
s = programs_states[0]
|
||||
assert int(s.state) == 3
|
||||
summaries = s.attributes["programs"]
|
||||
assert [p["slot"] for p in summaries] == [12, 42, 99]
|
||||
# Spot-check the per-record fields landed in summary form.
|
||||
by_slot = {p["slot"]: p for p in summaries}
|
||||
assert by_slot[12]["type"] == "TIMED"
|
||||
assert by_slot[12]["hour"] == 6 and by_slot[12]["minute"] == 0
|
||||
assert by_slot[99]["type"] == "EVENT"
|
||||
assert by_slot[99]["month"] == 5 and by_slot[99]["day"] == 12
|
||||
|
||||
|
||||
async def test_event_entity_per_panel(
|
||||
hass: HomeAssistant, configured_panel
|
||||
) -> None:
|
||||
|
||||
@ -1,10 +1,16 @@
|
||||
"""End-to-end wire round-trip: client → MockPanel → program decoded.
|
||||
|
||||
Seeds the MockPanel with a known :class:`Program`, drives the v2
|
||||
``UploadProgram`` opcode through a real TCP socket, and asserts the
|
||||
decoded round-trip equals the seeded Program. Proves the on-the-wire
|
||||
framing (2-byte BE ProgramNumber header + 14-byte body wrapped in a
|
||||
``ProgramData`` reply) lines up with our decoder.
|
||||
Seeds the MockPanel with known :class:`Program` records, exercises
|
||||
both wire dialects, and asserts the decoded result equals what was
|
||||
seeded.
|
||||
|
||||
* v2 (TCP, request/response per slot): drives ``UploadProgram`` once
|
||||
per slot. Proves the per-program framing (2-byte BE ProgramNumber +
|
||||
14-byte body wrapped in a ``ProgramData`` reply).
|
||||
* v1 (UDP, streaming): drives bare ``UploadPrograms``, ack-walks the
|
||||
streamed ``ProgramData`` replies to ``EOD``. Proves the streaming
|
||||
lock-step matches the panel's behaviour described in
|
||||
``clsHAC.OL1ReadConfig`` (clsHAC.cs:4403, 4538-4540, 4642-4651).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@ -15,8 +21,9 @@ import pytest
|
||||
|
||||
from omni_pca.connection import OmniConnection
|
||||
from omni_pca.mock_panel import MockPanel, MockState
|
||||
from omni_pca.opcodes import OmniLink2MessageType
|
||||
from omni_pca.opcodes import OmniLink2MessageType, OmniLinkMessageType
|
||||
from omni_pca.programs import Days, Program, ProgramType
|
||||
from omni_pca.v1 import OmniClientV1
|
||||
|
||||
CONTROLLER_KEY = bytes.fromhex("000102030405060708090a0b0c0d0e0f")
|
||||
|
||||
@ -134,3 +141,246 @@ async def test_v2_upload_program_event_type_no_swap_on_wire() -> None:
|
||||
decoded = Program.from_wire_bytes(body, slot=7)
|
||||
assert decoded.month == 5
|
||||
assert decoded.day == 12
|
||||
|
||||
|
||||
# ---- v1 streaming -------------------------------------------------------
|
||||
|
||||
|
||||
def _decode_v1_programdata(payload: bytes) -> tuple[int, Program]:
|
||||
"""Strip the BE ProgramNumber prefix from a v1 ``ProgramData`` payload,
|
||||
decode the 14-byte body. Mirrors the v2 helper inline above."""
|
||||
assert len(payload) >= 2 + 14
|
||||
slot = (payload[0] << 8) | payload[1]
|
||||
return slot, Program.from_wire_bytes(payload[2 : 2 + 14], slot=slot)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_v1_upload_programs_streams_all_seeded_slots() -> None:
|
||||
"""The v1 ``UploadPrograms`` opcode is bare; the panel streams one
|
||||
``ProgramData`` reply per defined slot, each followed by a client Ack,
|
||||
terminated by ``EOD``. Order is by ascending slot index — which is
|
||||
what we feed back from ``sorted(state.programs)``."""
|
||||
seeded = {
|
||||
12: Program(slot=12, prog_type=int(ProgramType.TIMED), cmd=3, hour=6, minute=0,
|
||||
days=int(Days.MONDAY | Days.FRIDAY)),
|
||||
42: Program(slot=42, prog_type=int(ProgramType.TIMED), cond=0x8D09, cond2=0x9B09,
|
||||
cmd=0x44, par=3, pr2=0x0100, month=8, day=12,
|
||||
days=int(Days.MONDAY), hour=7, minute=15),
|
||||
99: Program(slot=99, prog_type=int(ProgramType.EVENT), cmd=5, month=5, day=12),
|
||||
}
|
||||
panel = MockPanel(
|
||||
controller_key=CONTROLLER_KEY,
|
||||
state=MockState(programs={s: p.encode_wire_bytes() for s, p in seeded.items()}),
|
||||
)
|
||||
async with panel.serve(transport="udp") as (host, port):
|
||||
async with OmniClientV1(
|
||||
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
|
||||
) as c:
|
||||
received: dict[int, Program] = {}
|
||||
async for reply in c.connection.iter_streaming(
|
||||
OmniLinkMessageType.UploadPrograms
|
||||
):
|
||||
assert reply.opcode == int(OmniLinkMessageType.ProgramData)
|
||||
slot, prog = _decode_v1_programdata(reply.payload)
|
||||
received[slot] = prog
|
||||
|
||||
assert set(received) == set(seeded)
|
||||
for slot, want in seeded.items():
|
||||
got = received[slot]
|
||||
# Field-by-field — same checks as the v2 test, plus a slot equality.
|
||||
assert got.slot == slot
|
||||
assert got.prog_type == want.prog_type
|
||||
assert got.cond == want.cond
|
||||
assert got.cond2 == want.cond2
|
||||
assert got.cmd == want.cmd
|
||||
assert got.par == want.par
|
||||
assert got.pr2 == want.pr2
|
||||
assert got.month == want.month
|
||||
assert got.day == want.day
|
||||
assert got.days == want.days
|
||||
assert got.hour == want.hour
|
||||
assert got.minute == want.minute
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_v1_upload_programs_empty_state_yields_immediate_eod() -> None:
|
||||
"""No programs defined → the streaming iterator terminates without
|
||||
yielding anything (the panel jumps straight to EOD)."""
|
||||
panel = MockPanel(controller_key=CONTROLLER_KEY, state=MockState())
|
||||
async with panel.serve(transport="udp") as (host, port):
|
||||
async with OmniClientV1(
|
||||
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
|
||||
) as c:
|
||||
replies = [
|
||||
r async for r in c.connection.iter_streaming(
|
||||
OmniLinkMessageType.UploadPrograms
|
||||
)
|
||||
]
|
||||
assert replies == []
|
||||
|
||||
|
||||
# ---- v2 iter_programs (reason=1 "next defined" iteration) ---------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_v2_upload_program_reason1_returns_next_defined_slot() -> None:
|
||||
"""``request_reason=1`` should return the lowest defined slot strictly
|
||||
greater than the requested number — the C# panel uses this to iterate
|
||||
(clsHAC.cs:5331)."""
|
||||
seeded = {
|
||||
5: Program(slot=5, prog_type=int(ProgramType.TIMED), cmd=3),
|
||||
12: Program(slot=12, prog_type=int(ProgramType.TIMED), cmd=3),
|
||||
99: Program(slot=99, prog_type=int(ProgramType.EVENT), cmd=5),
|
||||
}
|
||||
panel = MockPanel(
|
||||
controller_key=CONTROLLER_KEY,
|
||||
state=MockState(programs={s: p.encode_wire_bytes() for s, p in seeded.items()}),
|
||||
)
|
||||
async with (
|
||||
panel.serve(transport="tcp") as (host, port),
|
||||
OmniConnection(
|
||||
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
|
||||
) as conn,
|
||||
):
|
||||
# Seed slot 0 with reason=1 → first defined slot (5).
|
||||
reply = await conn.request(
|
||||
OmniLink2MessageType.UploadProgram, struct.pack(">HB", 0, 1)
|
||||
)
|
||||
assert reply.opcode == int(OmniLink2MessageType.ProgramData)
|
||||
assert (reply.payload[0] << 8) | reply.payload[1] == 5
|
||||
|
||||
# From slot 5 with reason=1 → slot 12.
|
||||
reply = await conn.request(
|
||||
OmniLink2MessageType.UploadProgram, struct.pack(">HB", 5, 1)
|
||||
)
|
||||
assert (reply.payload[0] << 8) | reply.payload[1] == 12
|
||||
|
||||
# From slot 12 with reason=1 → slot 99.
|
||||
reply = await conn.request(
|
||||
OmniLink2MessageType.UploadProgram, struct.pack(">HB", 12, 1)
|
||||
)
|
||||
assert (reply.payload[0] << 8) | reply.payload[1] == 99
|
||||
|
||||
# From slot 99 with reason=1 → EOD (no more).
|
||||
reply = await conn.request(
|
||||
OmniLink2MessageType.UploadProgram, struct.pack(">HB", 99, 1)
|
||||
)
|
||||
assert reply.opcode == int(OmniLink2MessageType.EOD)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_v2_client_iter_programs_enumerates_all_seeded() -> None:
|
||||
"""High-level OmniClient.iter_programs() drives the reason=1 iteration
|
||||
and yields decoded Program records in slot-ascending order."""
|
||||
from omni_pca.client import OmniClient
|
||||
seeded = {
|
||||
12: Program(slot=12, prog_type=int(ProgramType.TIMED), cmd=3, hour=6, minute=0,
|
||||
days=int(Days.MONDAY | Days.FRIDAY)),
|
||||
42: Program(slot=42, prog_type=int(ProgramType.TIMED), cond=0x8D09, cond2=0x9B09,
|
||||
cmd=0x44, par=3, pr2=0x0100, month=8, day=12,
|
||||
days=int(Days.MONDAY), hour=7, minute=15),
|
||||
99: Program(slot=99, prog_type=int(ProgramType.EVENT), cmd=5, month=5, day=12),
|
||||
}
|
||||
panel = MockPanel(
|
||||
controller_key=CONTROLLER_KEY,
|
||||
state=MockState(programs={s: p.encode_wire_bytes() for s, p in seeded.items()}),
|
||||
)
|
||||
async with panel.serve(transport="tcp") as (host, port):
|
||||
async with OmniClient(
|
||||
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
|
||||
) as c:
|
||||
received = [p async for p in c.iter_programs()]
|
||||
|
||||
assert [p.slot for p in received] == [12, 42, 99]
|
||||
for got, want in zip(received, seeded.values()):
|
||||
assert got.prog_type == want.prog_type
|
||||
assert got.cmd == want.cmd
|
||||
assert got.hour == want.hour
|
||||
assert got.minute == want.minute
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_v2_client_iter_programs_empty_state_yields_nothing() -> None:
|
||||
from omni_pca.client import OmniClient
|
||||
panel = MockPanel(controller_key=CONTROLLER_KEY, state=MockState())
|
||||
async with panel.serve(transport="tcp") as (host, port):
|
||||
async with OmniClient(
|
||||
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
|
||||
) as c:
|
||||
received = [p async for p in c.iter_programs()]
|
||||
assert received == []
|
||||
|
||||
|
||||
# ---- v1 client iter_programs (high-level wrapper over iter_streaming) ----
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mockstate_from_pca_serves_real_panel_programs() -> None:
|
||||
"""End-to-end: build MockState from the live .pca, drive iter_programs
|
||||
over v2 wire, decode every yielded Program. This exercises the full
|
||||
file → mock → wire → decoder pipeline with real on-disk data.
|
||||
|
||||
The fixture is the same plain-text dump tests/test_pca_file.py uses;
|
||||
we re-encrypt with KEY_EXPORT on the fly so parse_pca_file accepts it.
|
||||
"""
|
||||
from pathlib import Path
|
||||
|
||||
from omni_pca.client import OmniClient
|
||||
from omni_pca.mock_panel import MockState
|
||||
from omni_pca.pca_file import KEY_EXPORT, decrypt_pca_bytes
|
||||
|
||||
plain = Path("/home/kdm/home-auto/HAI/pca-re/extracted/Our_House.pca.plain")
|
||||
if not plain.is_file():
|
||||
pytest.skip(f"live fixture missing: {plain}")
|
||||
encrypted = decrypt_pca_bytes(plain.read_bytes(), KEY_EXPORT)
|
||||
|
||||
state = MockState.from_pca(encrypted, key=KEY_EXPORT)
|
||||
# SystemInfo fields were populated from the .pca header.
|
||||
assert state.model_byte == 16 # OMNI_PRO_II
|
||||
assert state.firmware_major == 2
|
||||
# Programs: 330 defined per Phase 1 recon.
|
||||
assert len(state.programs) == 330
|
||||
|
||||
panel = MockPanel(controller_key=CONTROLLER_KEY, state=state)
|
||||
async with panel.serve(transport="tcp") as (host, port):
|
||||
async with OmniClient(
|
||||
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
|
||||
) as c:
|
||||
decoded = [p async for p in c.iter_programs()]
|
||||
|
||||
# Every defined slot streamed back, in ascending slot order.
|
||||
assert len(decoded) == 330
|
||||
assert [p.slot for p in decoded] == sorted(state.programs)
|
||||
# Spot check: every decoded record has a known ProgramType.
|
||||
for p in decoded:
|
||||
assert p.prog_type in {1, 2, 3} # TIMED / EVENT / YEARLY from this fixture
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_v1_client_iter_programs_enumerates_all_seeded() -> None:
|
||||
seeded = {
|
||||
12: Program(slot=12, prog_type=int(ProgramType.TIMED), cmd=3, hour=6, minute=0,
|
||||
days=int(Days.MONDAY | Days.FRIDAY)),
|
||||
42: Program(slot=42, prog_type=int(ProgramType.TIMED), cond=0x8D09, cond2=0x9B09,
|
||||
cmd=0x44, par=3, pr2=0x0100, month=8, day=12,
|
||||
days=int(Days.MONDAY), hour=7, minute=15),
|
||||
99: Program(slot=99, prog_type=int(ProgramType.EVENT), cmd=5, month=5, day=12),
|
||||
}
|
||||
panel = MockPanel(
|
||||
controller_key=CONTROLLER_KEY,
|
||||
state=MockState(programs={s: p.encode_wire_bytes() for s, p in seeded.items()}),
|
||||
)
|
||||
async with panel.serve(transport="udp") as (host, port):
|
||||
async with OmniClientV1(
|
||||
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
|
||||
) as c:
|
||||
received = [p async for p in c.iter_programs()]
|
||||
|
||||
assert [p.slot for p in received] == [12, 42, 99]
|
||||
for got, want in zip(received, seeded.values()):
|
||||
assert got.prog_type == want.prog_type
|
||||
assert got.cmd == want.cmd
|
||||
assert got.cond == want.cond
|
||||
assert got.cond2 == want.cond2
|
||||
assert got.hour == want.hour
|
||||
assert got.minute == want.minute
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user