diff --git a/src/omni_pca/mock_panel.py b/src/omni_pca/mock_panel.py index 9c9313a..4de942c 100644 --- a/src/omni_pca/mock_panel.py +++ b/src/omni_pca/mock_panel.py @@ -52,7 +52,7 @@ import secrets from collections.abc import AsyncIterator, Callable from contextlib import asynccontextmanager from dataclasses import dataclass, field -from typing import ClassVar +from typing import Any, ClassVar from .commands import Command from .crypto import ( @@ -266,6 +266,52 @@ class MockState: self.thermostats = _promote_dict(self.thermostats, MockThermostatState) self.buttons = _promote_dict(self.buttons, MockButtonState) + @classmethod + def from_pca( + cls, + path_or_bytes: str | bytes, + key: int, + **overrides: Any, + ) -> MockState: + """Build a MockState seeded from a real .pca file. + + Populated from the .pca: + + * ``model_byte`` + ``firmware_*`` — drive SystemInformation replies + so a connected client sees the panel the .pca came from. + * ``programs`` — every non-empty Program record from the 1500-slot + table, encoded back to wire bytes so UploadProgram / UploadPrograms + serve them exactly as a real panel would. + + Everything else uses MockState defaults (or whatever the caller + passes as ``**overrides``). Per-object name/state tables aren't + in the .pca header that pca_file currently extracts, so zones / + units / areas / thermostats default to empty unless the caller + explicitly provides them. + + ``key=0`` only works for files where the export keystream was + already applied (e.g., the result of ``decrypt_pca_bytes`` with + the same key); use ``KEY_EXPORT`` (391549495) for unmodified + PC Access exports. + """ + from .pca_file import parse_pca_file + + acct = parse_pca_file(path_or_bytes, key=key) + programs = { + p.slot: p.encode_wire_bytes() + for p in acct.programs + if p.slot is not None and not p.is_empty() + } + defaults: dict[str, Any] = { + "model_byte": acct.model, + "firmware_major": acct.firmware_major, + "firmware_minor": acct.firmware_minor, + "firmware_revision": acct.firmware_revision, + "programs": programs, + } + defaults.update(overrides) + return cls(**defaults) + # ---- name-bytes helpers (kept for back-compat with old callers) ----- def zone_name_bytes(self, idx: int) -> bytes: diff --git a/tests/test_e2e_program_echo.py b/tests/test_e2e_program_echo.py index ca09680..b5ff52f 100644 --- a/tests/test_e2e_program_echo.py +++ b/tests/test_e2e_program_echo.py @@ -314,6 +314,48 @@ async def test_v2_client_iter_programs_empty_state_yields_nothing() -> None: # ---- v1 client iter_programs (high-level wrapper over iter_streaming) ---- +@pytest.mark.asyncio +async def test_mockstate_from_pca_serves_real_panel_programs() -> None: + """End-to-end: build MockState from the live .pca, drive iter_programs + over v2 wire, decode every yielded Program. This exercises the full + file → mock → wire → decoder pipeline with real on-disk data. + + The fixture is the same plain-text dump tests/test_pca_file.py uses; + we re-encrypt with KEY_EXPORT on the fly so parse_pca_file accepts it. + """ + from pathlib import Path + + from omni_pca.client import OmniClient + from omni_pca.mock_panel import MockState + from omni_pca.pca_file import KEY_EXPORT, decrypt_pca_bytes + + plain = Path("/home/kdm/home-auto/HAI/pca-re/extracted/Our_House.pca.plain") + if not plain.is_file(): + pytest.skip(f"live fixture missing: {plain}") + encrypted = decrypt_pca_bytes(plain.read_bytes(), KEY_EXPORT) + + state = MockState.from_pca(encrypted, key=KEY_EXPORT) + # SystemInfo fields were populated from the .pca header. + assert state.model_byte == 16 # OMNI_PRO_II + assert state.firmware_major == 2 + # Programs: 330 defined per Phase 1 recon. + assert len(state.programs) == 330 + + panel = MockPanel(controller_key=CONTROLLER_KEY, state=state) + async with panel.serve(transport="tcp") as (host, port): + async with OmniClient( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as c: + decoded = [p async for p in c.iter_programs()] + + # Every defined slot streamed back, in ascending slot order. + assert len(decoded) == 330 + assert [p.slot for p in decoded] == sorted(state.programs) + # Spot check: every decoded record has a known ProgramType. + for p in decoded: + assert p.prog_type in {1, 2, 3} # TIMED / EVENT / YEARLY from this fixture + + @pytest.mark.asyncio async def test_v1_client_iter_programs_enumerates_all_seeded() -> None: seeded = {