From 7db9616a34a9d982ff1dc1efc32e230f5b8d6e92 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Tue, 12 May 2026 20:34:00 -0600 Subject: [PATCH] pca_file: extract Zone/Unit/Button/Code/Tstat/Area/Message names MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Names block (between SetupData and Voices) was previously walked as opaque bytes. It's actually a sequence of seven object-family tables, each storing N × String8(L) per the clsAbstractNamedItem.ReadName / clsPcaCryptFileStream.ReadString8(out S, byte L) pattern. Per-slot layout is [1 byte actual length][L bytes name], with length 0 meaning "unused". New PcaAccount fields: * zone_names, unit_names, button_names, code_names, thermostat_names, area_names, message_names — each is {1-based slot: name}, only non-empty slots. Object *properties* (zone_type, area_membership, etc.) aren't extracted yet — those live in SetupData, which remains opaque. Names alone unlock the biggest win: meaningful entity labels in HA from a .pca snapshot. MockState.from_pca now seeds zones/units/areas/thermostats/buttons with MockZoneState/MockUnitState/etc. instances carrying just the name. Defaults handle everything else. A connected client sees the real panel's names through normal wire discovery (UploadNames streams them back, properties synth fills the rest). New end-to-end test verifies the HA integration discovers all 16 zones, 44 units, 16 buttons, 2 thermostats from the live fixture when the MockPanel is built via MockState.from_pca — proving the full file → mock → wire → HA pipeline. Live fixture: 16 zones, 44 units, 16 buttons, 8 codes, 2 thermostats, 0 areas, 8 messages, 330 programs. (Areas in this v1 install have no user-assigned names — expected.) Full suite: 499 passed, 1 skipped (fixture-gated). --- src/omni_pca/mock_panel.py | 31 +++++-- src/omni_pca/pca_file.py | 107 ++++++++++++++++++++---- tests/ha_integration/test_pca_source.py | 55 ++++++++++++ tests/test_e2e_program_echo.py | 10 +++ tests/test_pca_file.py | 2 +- 5 files changed, 179 insertions(+), 26 deletions(-) diff --git a/src/omni_pca/mock_panel.py b/src/omni_pca/mock_panel.py index 4de942c..b26ba12 100644 --- a/src/omni_pca/mock_panel.py +++ b/src/omni_pca/mock_panel.py @@ -280,14 +280,21 @@ class MockState: * ``model_byte`` + ``firmware_*`` — drive SystemInformation replies so a connected client sees the panel the .pca came from. * ``programs`` — every non-empty Program record from the 1500-slot - table, encoded back to wire bytes so UploadProgram / UploadPrograms - serve them exactly as a real panel would. + table, encoded back to wire bytes so UploadProgram / + UploadPrograms serve them exactly as a real panel would. + * ``zones`` / ``units`` / ``areas`` / ``thermostats`` / ``buttons`` + — populated with the names from the .pca's Names section. Each + entry is a ``MockZoneState`` / ``MockUnitState`` / etc. with + only ``name`` set; other fields (zone_type, area assignment, + thermostat type, …) default to 0 because those properties live + in SetupData, which we don't decode yet. - Everything else uses MockState defaults (or whatever the caller - passes as ``**overrides``). Per-object name/state tables aren't - in the .pca header that pca_file currently extracts, so zones / - units / areas / thermostats default to empty unless the caller - explicitly provides them. + ``user_codes`` is not seeded — the .pca only stores code *names*, + not the PIN values; the panel keeps PINs in SetupData. Override + explicitly if a test needs them. + + Anything else uses MockState defaults. Pass kwargs to override + any seeded field. ``key=0`` only works for files where the export keystream was already applied (e.g., the result of ``decrypt_pca_bytes`` with @@ -308,6 +315,16 @@ class MockState: "firmware_minor": acct.firmware_minor, "firmware_revision": acct.firmware_revision, "programs": programs, + "zones": {i: MockZoneState(name=n) for i, n in acct.zone_names.items()}, + "units": {i: MockUnitState(name=n) for i, n in acct.unit_names.items()}, + "areas": {i: MockAreaState(name=n) for i, n in acct.area_names.items()}, + "thermostats": { + i: MockThermostatState(name=n) + for i, n in acct.thermostat_names.items() + }, + "buttons": { + i: MockButtonState(name=n) for i, n in acct.button_names.items() + }, } defaults.update(overrides) return cls(**defaults) diff --git a/src/omni_pca/pca_file.py b/src/omni_pca/pca_file.py index bcab822..da1e286 100644 --- a/src/omni_pca/pca_file.py +++ b/src/omni_pca/pca_file.py @@ -214,6 +214,20 @@ class PcaAccount: basis — failure here doesn't break Connection extraction. """ + # Per-object name tables — populated from the Names block between + # SetupData and Voices. Keys are 1-based slot numbers; empty slots + # are omitted entirely (the panel stores them as length-0 String8 + # blobs, which we filter at read time). Properties beyond the name + # (zone_type, area assignment, thermostat type, etc.) live in + # SetupData and aren't extracted yet. + zone_names: dict[int, str] = field(default_factory=dict) + unit_names: dict[int, str] = field(default_factory=dict) + button_names: dict[int, str] = field(default_factory=dict) + code_names: dict[int, str] = field(default_factory=dict) + thermostat_names: dict[int, str] = field(default_factory=dict) + area_names: dict[int, str] = field(default_factory=dict) + message_names: dict[int, str] = field(default_factory=dict) + def parse_pca01_cfg(data: bytes, key: int = KEY_PC01) -> PcaConfig: """Decrypt ``data`` (raw PCA01.CFG bytes) and parse per clsPcaCfg.Read().""" @@ -345,25 +359,66 @@ def _walk_to_remarks(r: PcaReader) -> dict[int, str]: return {} -def _walk_to_connection(r: PcaReader, cap: dict[str, int]) -> bytes: +@dataclass +class _ConnectionWalk: + """Side-channel output of :func:`_walk_to_connection`. + + Captures the per-object name tables on the way past so the caller + can attach them to :class:`PcaAccount`. Each ``*_names`` dict is + ``{1-based slot: name}`` with only non-empty slots present — + matches the "iter_defined" convention used for programs. + """ + + programs_blob: bytes + zone_names: dict[int, str] = field(default_factory=dict) + unit_names: dict[int, str] = field(default_factory=dict) + button_names: dict[int, str] = field(default_factory=dict) + code_names: dict[int, str] = field(default_factory=dict) + thermostat_names: dict[int, str] = field(default_factory=dict) + area_names: dict[int, str] = field(default_factory=dict) + message_names: dict[int, str] = field(default_factory=dict) + + +def _read_name_table(r: PcaReader, count: int, name_len: int) -> dict[int, str]: + """Read ``count`` String8(name_len) slots; return only non-empty ones. + + Per-slot layout per ``clsAbstractNamedItem.ReadName`` / + ``clsPcaCryptFileStream.ReadString8(out S, byte L)``: + + ``[1 byte actual length][name_len bytes name]`` + + The length byte is 0 for unused slots. We use ``string8_fixed`` to + consume exactly ``1 + name_len`` bytes per slot regardless. + """ + out: dict[int, str] = {} + for i in range(1, count + 1): + name = r.string8_fixed(name_len) + if name: + out[i] = name + return out + + +def _walk_to_connection(r: PcaReader, cap: dict[str, int]) -> _ConnectionWalk: """Walk SetupData, flags, Names, Voices, Programs, EventLog so the - next read lands on the Connection block. Returns the raw 21,000-byte - Programs blob so the caller can decode it; everything else is still - skipped as before. Mirrors clsHAC.cs:7995-8044.""" + next read lands on the Connection block. Captures per-object name + tables on the way past and returns them alongside the Programs blob. + + Mirrors clsHAC.cs:7995-8044. The per-object names are read via + clsAbstractNamedItem.ReadName → String8(L) — see + :func:`_read_name_table` for the per-slot layout. + """ r.bytes_(cap["lenSetupData"]) r.bytes_(10) # bool + bool + u16 + u16 + u32 - name_specs = [ - (cap["max_zones"], cap["lenZoneName"]), - (cap["max_units"], cap["lenUnitName"]), - (cap["max_buttons"], cap["lenButtonName"]), - (cap["max_codes"], cap["lenCodeName"]), - (cap["max_tstats"], cap["lenTstatName"]), - (cap["max_areas"], cap["lenAreaName"]), - (cap["max_messages"], cap["lenMessageName"]), - ] - for max_slots, name_len in name_specs: - r.bytes_(max_slots * (1 + name_len)) + # Object family order per clsHAC body layout: + # Zones → Units → Buttons → Codes → Thermostats → Areas → Messages. + zone_names = _read_name_table(r, cap["max_zones"], cap["lenZoneName"]) + unit_names = _read_name_table(r, cap["max_units"], cap["lenUnitName"]) + button_names = _read_name_table(r, cap["max_buttons"], cap["lenButtonName"]) + code_names = _read_name_table(r, cap["max_codes"], cap["lenCodeName"]) + thermostat_names = _read_name_table(r, cap["max_tstats"], cap["lenTstatName"]) + area_names = _read_name_table(r, cap["max_areas"], cap["lenAreaName"]) + message_names = _read_name_table(r, cap["max_messages"], cap["lenMessageName"]) # Voices: structured slots are 12 B (LargeVocabulary), skip slots 6 B. voice_specs = [ @@ -384,7 +439,16 @@ def _walk_to_connection(r: PcaReader, cap: dict[str, int]) -> bytes: programs_blob = r.bytes_(cap["max_programs"] * cap["program_bytes"]) r.bytes_(cap["max_event_log"] * cap["event_bytes"]) - return programs_blob + return _ConnectionWalk( + programs_blob=programs_blob, + zone_names=zone_names, + unit_names=unit_names, + button_names=button_names, + code_names=code_names, + thermostat_names=thermostat_names, + area_names=area_names, + message_names=message_names, + ) def parse_pca_file(path_or_bytes: str | os.PathLike[str] | bytes, key: int) -> PcaAccount: @@ -435,7 +499,7 @@ def parse_pca_file(path_or_bytes: str | os.PathLike[str] | bytes, key: int) -> P return account try: - programs_blob = _walk_to_connection(r, _CAP_OMNI_PRO_II) + walk = _walk_to_connection(r, _CAP_OMNI_PRO_II) network_address = r.string8_fixed(120) port_str = r.string8_fixed(5) try: @@ -450,7 +514,7 @@ def parse_pca_file(path_or_bytes: str | os.PathLike[str] | bytes, key: int) -> P # walker misaligned for a non-OMNI_PRO_II model, in which case # leaving programs=() is the honest answer. try: - programs: tuple[Program, ...] = decode_program_table(programs_blob) + programs: tuple[Program, ...] = decode_program_table(walk.programs_blob) except Exception: _log.warning("failed to decode Programs block", exc_info=True) programs = () @@ -464,6 +528,13 @@ def parse_pca_file(path_or_bytes: str | os.PathLike[str] | bytes, key: int) -> P account.network_port = network_port account.controller_key = controller_key account.programs = programs + account.zone_names = walk.zone_names + account.unit_names = walk.unit_names + account.button_names = walk.button_names + account.code_names = walk.code_names + account.thermostat_names = walk.thermostat_names + account.area_names = walk.area_names + account.message_names = walk.message_names # PCA03+ continues past Connection with ModemBaud flags + nine # Description blocks + the Remarks table. We walk it on a diff --git a/tests/ha_integration/test_pca_source.py b/tests/ha_integration/test_pca_source.py index 6ee267b..d09b860 100644 --- a/tests/ha_integration/test_pca_source.py +++ b/tests/ha_integration/test_pca_source.py @@ -103,6 +103,61 @@ async def test_pca_source_overrides_wire_programs( assert int(sensors[0].state) == 330 +async def test_mockpanel_from_pca_drives_full_ha_discovery( + hass: HomeAssistant, tmp_path: Path +) -> None: + """End-to-end: build a MockPanel state straight from the live .pca, + then point HA at that mock with no other configuration. The + integration should discover *every* named zone / unit / button / + thermostat from the .pca via the normal wire path — no .pca config + needed, because the mock is now serving real data. + """ + if not LIVE_FIXTURE_PLAIN.is_file(): + pytest.skip(f"live .pca fixture missing: {LIVE_FIXTURE_PLAIN}") + + from custom_components.omni_pca.const import CONF_CONTROLLER_KEY + from omni_pca.mock_panel import MockPanel, MockState + from omni_pca.pca_file import KEY_EXPORT, decrypt_pca_bytes + + encrypted = decrypt_pca_bytes(LIVE_FIXTURE_PLAIN.read_bytes(), KEY_EXPORT) + state = MockState.from_pca(encrypted, key=KEY_EXPORT) + # Sanity — the from_pca seeding matches the live fixture's names. + assert len(state.zones) == 16 + assert len(state.units) == 44 + + panel = MockPanel( + controller_key=bytes(range(16)), # matches CONTROLLER_KEY_HEX + state=state, + ) + async with panel.serve(host="127.0.0.1") as (host, port): + entry = MockConfigEntry( + domain=DOMAIN, + data={ + CONF_HOST: host, + CONF_PORT: port, + CONF_CONTROLLER_KEY: CONTROLLER_KEY_HEX, + }, + title=f"Mock Omni @ {host}:{port} (from .pca)", + unique_id=f"{host}:{port}", + ) + entry.add_to_hass(hass) + assert await hass.config_entries.async_setup(entry.entry_id) + await hass.async_block_till_done() + try: + coordinator = hass.data[DOMAIN][entry.entry_id] + # All 16 zones surfaced through normal wire discovery. + assert len(coordinator.data.zones) == 16 + # Units, buttons, thermostats too. + assert len(coordinator.data.units) == 44 + assert len(coordinator.data.buttons) == 16 + assert len(coordinator.data.thermostats) == 2 + # And the programs sensor reflects 330 from wire iter_programs. + assert len(coordinator.data.programs) == 330 + finally: + await hass.config_entries.async_unload(entry.entry_id) + await hass.async_block_till_done() + + async def test_pca_path_validation_rejects_missing_file( hass: HomeAssistant, tmp_path: Path ) -> None: diff --git a/tests/test_e2e_program_echo.py b/tests/test_e2e_program_echo.py index b5ff52f..a57ee87 100644 --- a/tests/test_e2e_program_echo.py +++ b/tests/test_e2e_program_echo.py @@ -340,6 +340,16 @@ async def test_mockstate_from_pca_serves_real_panel_programs() -> None: assert state.firmware_major == 2 # Programs: 330 defined per Phase 1 recon. assert len(state.programs) == 330 + # Names: per the live fixture's reconnaissance dump. + assert len(state.zones) == 16 + assert len(state.units) == 44 + assert len(state.buttons) == 16 + assert len(state.thermostats) == 2 + # Areas in this fixture have no names — that's fine, just verify. + assert len(state.areas) == 0 + assert state.zones[1].name == "GARAGE ENTRY" + assert state.units[1].name == "ROOM ONE" + assert state.thermostats[1].name == "DOWNSTAIRS" panel = MockPanel(controller_key=CONTROLLER_KEY, state=state) async with panel.serve(transport="tcp") as (host, port): diff --git a/tests/test_pca_file.py b/tests/test_pca_file.py index c854a8d..89f7e5d 100644 --- a/tests/test_pca_file.py +++ b/tests/test_pca_file.py @@ -119,7 +119,7 @@ def _load_programs_blob_or_skip() -> bytes: r = PcaReader(p.read_bytes()) _parse_header(r) - return _walk_to_connection(r, _CAP_OMNI_PRO_II) + return _walk_to_connection(r, _CAP_OMNI_PRO_II).programs_blob def test_programs_block_decodes_against_live_fixture() -> None: