From 0e3835d4ff89be14642d9a3b55413cfe2c1ba992 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Mon, 11 May 2026 16:32:51 -0600 Subject: [PATCH] MockPanel: v1 wire dispatch for hermetic OmniClientV1 tests Adds OmniLinkMessage (0x10) outer-packet handling to the mock so the v1 path no longer requires a real panel for testing. Exercised over UDP because OmniClientV1 is UDP-only by design, but the dispatcher itself is transport-agnostic and the TCP _handle_client routes OmniLinkMessage packets through the same _dispatch_v1 method too. Coverage today: * RequestSystemInformation (17) -> SystemInformation (18) * RequestSystemStatus (19) -> SystemStatus (20), 8 area mode bytes * RequestZoneStatus (21) -> ZoneStatus (22), short + long form * RequestUnitStatus (23) -> UnitStatus (24), short + long form (long form auto-selected for indices > 255) * RequestThermostatStatus (30) -> ThermostatStatus (31) * RequestAuxiliaryStatus (25) -> AuxiliaryStatus (26) (zero records) * UploadNames (12) -> NameData (11) streaming, lock-step Ack-driven across Zone/Unit/Button/ Area/Thermostat, terminated by EOD (3) * Command (15) -> Ack (5) / Nak (6), reuses v2 state mutator so light-on/off, set-level, bypass-zone, restore-zone all work * ExecuteSecurityCommand (102) -> Ack (5) / ExecuteSecurityCommandResponse (103) on bad code, with structured status byte preserved * MessageCrcError -> v1 Nak (opcode 6) The dispatcher writes replies wrapped in OmniLinkMessage (16) outer packets (vs OmniLink2Message (32) used by v2) so OmniClientV1 routes them correctly. The 4-step handshake is shared with v2 -- it's protocol-version-agnostic at the outer-packet layer. UploadNames state is panel-instance scoped via _upload_names_cursor (int | None) -- there is only one active session at a time on the mock so a single cursor suffices. tests/test_e2e_v1_mock.py: 13 cases driving OmniClientV1 through the mock's UDP socket, covering the full read API + UploadNames streaming + write methods + structured-failure path on a wrong security code. Full suite: 400 passed, 1 skipped (was 387 / 1). --- src/omni_pca/mock_panel.py | 455 ++++++++++++++++++++++++++++++++++++- tests/test_e2e_v1_mock.py | 252 ++++++++++++++++++++ 2 files changed, 705 insertions(+), 2 deletions(-) create mode 100644 tests/test_e2e_v1_mock.py diff --git a/src/omni_pca/mock_panel.py b/src/omni_pca/mock_panel.py index ea5968d..04afa46 100644 --- a/src/omni_pca/mock_panel.py +++ b/src/omni_pca/mock_panel.py @@ -52,6 +52,7 @@ import secrets from collections.abc import AsyncIterator, Callable from contextlib import asynccontextmanager from dataclasses import dataclass, field +from typing import ClassVar from .commands import Command from .crypto import ( @@ -60,8 +61,14 @@ from .crypto import ( derive_session_key, encrypt_message_payload, ) -from .message import Message, MessageCrcError, MessageFormatError, encode_v2 -from .opcodes import OmniLink2MessageType, PacketType +from .message import ( + Message, + MessageCrcError, + MessageFormatError, + encode_v1, + encode_v2, +) +from .opcodes import OmniLink2MessageType, OmniLinkMessageType, PacketType from .packet import Packet _log = logging.getLogger(__name__) @@ -328,6 +335,9 @@ class MockPanel: self._active_writer: asyncio.StreamWriter | None = None self._active_session_key: bytes | None = None self._push_tasks: set[asyncio.Task[None]] = set() + # v1 UploadNames cursor: index into self._v1_name_stream() while a + # streaming download is in flight, ``None`` when no stream active. + self._upload_names_cursor: int | None = None # -------- public observables (handy in tests) -------- @@ -461,6 +471,19 @@ class MockPanel: if not cont: break + elif pkt_type is PacketType.OmniLinkMessage: + # v1 wire dialect — UDP-only panels speak this on the + # wire. We accept it over TCP too so the same mock + # server can fixture both transports for tests. + if session_key is None: + _log.debug("mock panel: v1 message before secure session") + break + cont = await self._handle_encrypted_message_v1( + reader, seq, session_key, writer + ) + if not cont: + break + else: _log.debug("mock panel: unhandled packet type %s", pkt_type.name) break @@ -588,6 +611,68 @@ class MockPanel: self._schedule_event_push(push_words, session_key, writer) return True + async def _handle_encrypted_message_v1( + self, + reader: asyncio.StreamReader, + client_seq: int, + session_key: bytes, + writer: asyncio.StreamWriter, + ) -> bool: + """v1 counterpart of :meth:`_handle_encrypted_message`. + + Reads, decrypts, decodes a v1 inner message (StartChar 0x5A), + dispatches via :meth:`_dispatch_v1`, and writes the reply back + wrapped in an ``OmniLinkMessage`` (16) outer packet. + """ + first_block = await _read_exact(reader, BLOCK_SIZE) + if first_block is None: + return False + first_plain = decrypt_message_payload(first_block, client_seq, session_key) + # first_plain[0] = StartChar (0x5A), first_plain[1] = MessageLength + msg_length = first_plain[1] + extra_needed = max(0, msg_length + 4 - BLOCK_SIZE) + rem = (-extra_needed) % BLOCK_SIZE + extra_aligned = extra_needed + rem + ciphertext = first_block + if extra_aligned > 0: + extra = await _read_exact(reader, extra_aligned) + if extra is None: + return False + ciphertext = first_block + extra + plaintext = decrypt_message_payload(ciphertext, client_seq, session_key) + + try: + inner = Message.decode(plaintext) + except MessageCrcError: + _log.debug("mock panel: v1 inner message CRC failure") + await self._send_v1_reply( + client_seq, _build_v1_nak(0), session_key, writer + ) + return True + except MessageFormatError as exc: + _log.debug("mock panel: malformed v1 inner message: %s", exc) + return False + + opcode = inner.opcode + self._last_request_opcode = opcode + try: + opcode_name = OmniLinkMessageType(opcode).name + except ValueError: + opcode_name = f"Unknown({opcode})" + _log.debug( + "mock panel: v1 dispatch opcode=%s payload=%d bytes", + opcode_name, len(inner.payload), + ) + + reply, push_words = self._dispatch_v1(opcode, inner.payload) + await self._send_v1_reply(client_seq, reply, session_key, writer) + # v1 push events use opcode 35 instead of 55; the existing + # _schedule_event_push helper is v2-shaped and would emit a + # frame the v1 client can't parse. Skip pushes on v1 for now + # -- streaming UploadNames is the dominant flow we care about. + _ = push_words + return True + def _dispatch_v2( self, opcode: int, payload: bytes ) -> tuple[Message, tuple[int, ...]]: @@ -1091,6 +1176,299 @@ class MockPanel: writer.write(pkt.encode()) await writer.drain() + # ============================================================ + # v1 wire-dialect dispatch (panels listening UDP-only) + # ============================================================ + # + # Same crypto + handshake as v2; only the outer ``PacketType`` + # (OmniLinkMessage = 16 vs OmniLink2Message = 32) and the inner + # ``Message.start_char`` (0x5A NonAddressable vs 0x21 OmniLink2) + # differ. The dispatch table here mirrors what ``OmniClientV1`` + # actually sends — see omni_pca.v1.client. + + def _dispatch_v1( + self, opcode: int, payload: bytes + ) -> tuple[Message, tuple[int, ...]]: + """Dispatch a single v1 request, return (reply_msg, push_words). + + Mirrors :meth:`_dispatch_v2` but for v1 opcodes (see enuOmniLinkMessageType). + """ + if opcode == OmniLinkMessageType.RequestSystemInformation: + return self._v1_reply_system_information(), () + if opcode == OmniLinkMessageType.RequestSystemStatus: + return self._v1_reply_system_status(), () + if opcode == OmniLinkMessageType.RequestZoneStatus: + return self._v1_reply_zone_status(payload), () + if opcode == OmniLinkMessageType.RequestUnitStatus: + return self._v1_reply_unit_status(payload), () + if opcode == OmniLinkMessageType.RequestThermostatStatus: + return self._v1_reply_thermostat_status(payload), () + if opcode == OmniLinkMessageType.RequestAuxiliaryStatus: + return self._v1_reply_auxiliary_status(payload), () + if opcode == OmniLinkMessageType.UploadNames: + return self._v1_start_upload_names_stream(), () + if opcode == OmniLinkMessageType.Ack: + # During an active UploadNames stream, each client Ack + # advances the cursor. With no active stream, drop silently + # (Ack as a request opcode is only meaningful mid-stream). + if self._upload_names_cursor is not None: + return self._v1_advance_upload_names_stream(), () + return _build_v1_nak(opcode), () + if opcode == OmniLinkMessageType.Command: + return self._v1_handle_command(payload) + if opcode == OmniLinkMessageType.ExecuteSecurityCommand: + return self._v1_handle_execute_security_command(payload) + return _build_v1_nak(opcode), () + + # ---- v1 reply builders ---- + + def _v1_reply_system_information(self) -> Message: + # Wire layout is byte-identical to v2 (clsOLMsgSystemInformation.cs + # vs clsOL2MsgSystemInformation.cs) -- only the opcode differs. + s = self.state + body = bytes( + [ + s.model_byte & 0xFF, + s.firmware_major & 0xFF, + s.firmware_minor & 0xFF, + s.firmware_revision & 0xFF, + ] + ) + _name_bytes(s.local_phone, _PHONE_LEN) + return encode_v1(OmniLinkMessageType.SystemInformation, body) + + def _v1_reply_system_status(self) -> Message: + # Bytes 0..13 byte-identical to v2 SystemStatus. + # After byte 13 the v1 wire carries per-area Mode bytes (one byte + # each) instead of v2's 2-byte alarm pairs. We emit eight zero + # mode bytes so OmniClientV1 sees 8 areas reporting OFF. + s = self.state + body = bytes( + [ + 1 if s.time_set else 0, + s.year & 0xFF, s.month & 0xFF, s.day & 0xFF, + s.day_of_week & 0xFF, + s.hour & 0xFF, s.minute & 0xFF, s.second & 0xFF, + s.daylight_saving & 0xFF, + s.sunrise_hour & 0xFF, s.sunrise_minute & 0xFF, + s.sunset_hour & 0xFF, s.sunset_minute & 0xFF, + s.battery & 0xFF, + ] + ) + # Per-area mode bytes (8 areas on Omni Pro II). + for idx in range(1, 9): + area = self.state.areas.get(idx) + body += bytes([area.mode if area else 0]) + return encode_v1(OmniLinkMessageType.SystemStatus, body) + + @staticmethod + def _v1_decode_range(payload: bytes) -> tuple[int, int] | None: + """Decode RequestUnitStatus / RequestZoneStatus / ... range payload. + + Short form (both ≤ 255): 2 bytes [start, end]. + Long form (either > 255): 4 bytes [start_hi, start_lo, end_hi, end_lo]. + See clsOLMsgRequestUnitStatus.cs:18-31. + """ + if len(payload) == 2: + return payload[0], payload[1] + if len(payload) == 4: + return (payload[0] << 8) | payload[1], (payload[2] << 8) | payload[3] + return None + + def _v1_reply_zone_status(self, payload: bytes) -> Message: + rng = self._v1_decode_range(payload) + if rng is None or rng[0] > rng[1]: + return _build_v1_nak(OmniLinkMessageType.RequestZoneStatus) + start, end = rng + records = b"" + for idx in range(start, end + 1): + z = self.state.zones.get(idx) + if z is not None: + records += bytes([z.status_byte, z.loop]) + else: + # Slots without a defined zone still respond with + # zero bytes -- real panels do this too. + records += b"\x00\x00" + return encode_v1(OmniLinkMessageType.ZoneStatus, records) + + def _v1_reply_unit_status(self, payload: bytes) -> Message: + rng = self._v1_decode_range(payload) + if rng is None or rng[0] > rng[1]: + return _build_v1_nak(OmniLinkMessageType.RequestUnitStatus) + start, end = rng + records = b"" + for idx in range(start, end + 1): + u = self.state.units.get(idx) + if u is not None: + records += bytes( + [u.state, (u.time_remaining >> 8) & 0xFF, u.time_remaining & 0xFF] + ) + else: + records += b"\x00\x00\x00" + return encode_v1(OmniLinkMessageType.UnitStatus, records) + + def _v1_reply_thermostat_status(self, payload: bytes) -> Message: + rng = self._v1_decode_range(payload) + if rng is None or rng[0] > rng[1]: + return _build_v1_nak(OmniLinkMessageType.RequestThermostatStatus) + start, end = rng + records = b"" + for idx in range(start, end + 1): + t = self.state.thermostats.get(idx) + if t is not None: + records += bytes( + [ + 1, # communicating + t.temperature_raw, t.heat_setpoint_raw, + t.cool_setpoint_raw, + t.system_mode, t.fan_mode, t.hold_mode, + ] + ) + else: + records += b"\x00" * 7 + return encode_v1(OmniLinkMessageType.ThermostatStatus, records) + + def _v1_reply_auxiliary_status(self, payload: bytes) -> Message: + rng = self._v1_decode_range(payload) + if rng is None or rng[0] > rng[1]: + return _build_v1_nak(OmniLinkMessageType.RequestAuxiliaryStatus) + start, end = rng + # MockState has no aux sensors -- return zero records. + records = b"\x00\x00\x00\x00" * (end - start + 1) + return encode_v1(OmniLinkMessageType.AuxiliaryStatus, records) + + # ---- UploadNames streaming ---- + + # NameType enum (from omni_pca.v1.messages.NameType -- duplicated here + # so the mock doesn't depend on the v1 subpackage at import time). + _V1_NAME_TYPE_ZONE: ClassVar[int] = 1 + _V1_NAME_TYPE_UNIT: ClassVar[int] = 2 + _V1_NAME_TYPE_BUTTON: ClassVar[int] = 3 + _V1_NAME_TYPE_AREA: ClassVar[int] = 5 + _V1_NAME_TYPE_THERMOSTAT: ClassVar[int] = 6 + + _V1_NAME_TYPE_LENGTH: ClassVar[dict[int, int]] = { + 1: 15, # Zone + 2: 12, # Unit + 3: 12, # Button + 4: 12, # Code (unused by mock) + 5: 12, # Area + 6: 12, # Thermostat + 7: 15, # Message (unused by mock) + } + + def _v1_name_stream(self) -> list[tuple[int, int, str]]: + """Flat list of (NameType, number, name) tuples — the panel emits + these in the order Zone → Unit → Button → Code → Area → + Thermostat → Message during ``UploadNames`` streaming. + Empty-named objects are skipped (matches real-panel behavior). + """ + items: list[tuple[int, int, str]] = [] + for idx in sorted(self.state.zones): + n = self.state.zones[idx].name + if n: + items.append((self._V1_NAME_TYPE_ZONE, idx, n)) + for idx in sorted(self.state.units): + n = self.state.units[idx].name + if n: + items.append((self._V1_NAME_TYPE_UNIT, idx, n)) + for idx in sorted(self.state.buttons): + n = self.state.buttons[idx].name + if n: + items.append((self._V1_NAME_TYPE_BUTTON, idx, n)) + for idx in sorted(self.state.areas): + n = self.state.areas[idx].name + if n: + items.append((self._V1_NAME_TYPE_AREA, idx, n)) + for idx in sorted(self.state.thermostats): + n = self.state.thermostats[idx].name + if n: + items.append((self._V1_NAME_TYPE_THERMOSTAT, idx, n)) + return items + + def _v1_namedata_msg(self, type_byte: int, num: int, name: str) -> Message: + """Encode a single NameData reply payload (clsOLMsgNameData.cs).""" + L = self._V1_NAME_TYPE_LENGTH.get(type_byte, 12) + encoded = name.encode("utf-8")[:L].ljust(L, b"\x00") + if num <= 0xFF: + body = bytes([type_byte, num]) + encoded + b"\x00" + else: + body = bytes( + [type_byte, (num >> 8) & 0xFF, num & 0xFF] + ) + encoded + b"\x00" + return encode_v1(OmniLinkMessageType.NameData, body) + + def _v1_start_upload_names_stream(self) -> Message: + """Handle bare ``UploadNames`` request -- send first NameData + (or EOD immediately if no defined names).""" + names = self._v1_name_stream() + if not names: + self._upload_names_cursor = None + return _build_v1_eod() + self._upload_names_cursor = 0 + t, n, name = names[0] + return self._v1_namedata_msg(t, n, name) + + def _v1_advance_upload_names_stream(self) -> Message: + """Handle client ``Acknowledge`` during an active stream -- send + the next NameData or EOD when exhausted.""" + names = self._v1_name_stream() + # _upload_names_cursor != None implied by caller + assert self._upload_names_cursor is not None + self._upload_names_cursor += 1 + if self._upload_names_cursor >= len(names): + self._upload_names_cursor = None + return _build_v1_eod() + t, n, name = names[self._upload_names_cursor] + return self._v1_namedata_msg(t, n, name) + + # ---- v1 Command / ExecuteSecurityCommand wrappers ---- + # The wire payload format is byte-identical to v2 (clsOLMsgCommand.cs + # vs clsOL2MsgCommand.cs); only the outer opcode and the reply Ack + # opcode (v1=5 vs v2=1) differ. We reuse the v2 state-mutation + # helper and just wrap the reply. + + def _v1_handle_command( + self, payload: bytes + ) -> tuple[Message, tuple[int, ...]]: + v2_reply, push_words = self._handle_command(payload) + if v2_reply.opcode == int(OmniLink2MessageType.Ack): + return _build_v1_ack(), push_words + # Pass-through Nak (no state mutation push when command refused). + return _build_v1_nak(OmniLinkMessageType.Command), () + + def _v1_handle_execute_security_command( + self, payload: bytes + ) -> tuple[Message, tuple[int, ...]]: + v2_reply, push_words = self._handle_execute_security_command(payload) + if v2_reply.opcode == int(OmniLink2MessageType.Ack): + return _build_v1_ack(), push_words + if v2_reply.opcode == int( + OmniLink2MessageType.ExecuteSecurityCommandResponse + ): + # Preserve the structured response (status byte) but rebuild + # with v1 opcode so OmniClientV1 sees opcode 103, not 75. + return ( + encode_v1( + OmniLinkMessageType.ExecuteSecurityCommandResponse, + v2_reply.payload, + ), + push_words, + ) + return _build_v1_nak(OmniLinkMessageType.ExecuteSecurityCommand), () + + async def _send_v1_reply( + self, + client_seq: int, + message: Message, + session_key: bytes, + writer: asyncio.StreamWriter, + ) -> None: + plaintext = message.encode() + ciphertext = encrypt_message_payload(plaintext, client_seq, session_key) + pkt = Packet(seq=client_seq, type=PacketType.OmniLinkMessage, data=ciphertext) + writer.write(pkt.encode()) + await writer.drain() + # -------------------------------------------------------------------------- # Status / ExtendedStatus per-record builders @@ -1271,6 +1649,26 @@ def _build_nak(in_reply_to_opcode: int) -> Message: return encode_v2(OmniLink2MessageType.Nak, bytes([in_reply_to_opcode & 0xFF])) +# ---- v1 wire-dialect counterparts ---- + +def _build_v1_ack() -> Message: + """Build a v1 Ack (opcode 5) with no payload.""" + return encode_v1(OmniLinkMessageType.Ack, b"") + + +def _build_v1_nak(in_reply_to_opcode: int) -> Message: + """Build a v1 Nak (opcode 6) carrying the offending opcode byte.""" + return encode_v1( + OmniLinkMessageType.Nak, bytes([int(in_reply_to_opcode) & 0xFF]) + ) + + +def _build_v1_eod() -> Message: + """Build a v1 EOD (opcode 3) -- the end-of-stream marker for bulk + downloads like ``UploadNames`` and ``UploadSetup``.""" + return encode_v1(OmniLinkMessageType.EOD, b"") + + async def _read_exact(reader: asyncio.StreamReader, n: int) -> bytes | None: """Read exactly ``n`` bytes or return None if EOF arrives early.""" try: @@ -1411,6 +1809,15 @@ class _MockServerDatagramProtocol(asyncio.DatagramProtocol): await self._handle_encrypted_udp(pkt, addr) return + if pkt.type is PacketType.OmniLinkMessage: + # v1 wire dialect — the canonical UDP-only dialect real + # panels speak. Routes through MockPanel._dispatch_v1. + if self._session_key is None: + _log.debug("mock panel (udp) v1 message before secure session") + return + await self._handle_encrypted_udp_v1(pkt, addr) + return + _log.debug("mock panel (udp) unhandled packet type %s", pkt.type.name) async def _handle_encrypted_udp( @@ -1440,6 +1847,34 @@ class _MockServerDatagramProtocol(asyncio.DatagramProtocol): if push_words: self._schedule_udp_push(push_words, addr) + async def _handle_encrypted_udp_v1( + self, pkt: Packet, addr: tuple[str, int] + ) -> None: + """v1 UDP counterpart of :meth:`_handle_encrypted_udp`. + + Same crypto, different inner-message dialect (StartChar 0x5A, + v1 opcodes) and different outer reply type (``OmniLinkMessage`` + = 16, not 32). + """ + assert self._session_key is not None + try: + plaintext = decrypt_message_payload( + pkt.data, pkt.seq, self._session_key + ) + except Exception: + _log.debug("mock panel (udp) failed to decrypt v1 message") + return + try: + inner = Message.decode(plaintext) + except (MessageCrcError, MessageFormatError): + await self._send_reply_v1(pkt.seq, _build_v1_nak(0), addr) + return + + opcode = inner.opcode + self._panel._last_request_opcode = opcode + reply, _push_words = self._panel._dispatch_v1(opcode, inner.payload) + await self._send_reply_v1(pkt.seq, reply, addr) + def _schedule_udp_push( self, words: tuple[int, ...], addr: tuple[str, int] ) -> None: @@ -1480,3 +1915,19 @@ class _MockServerDatagramProtocol(asyncio.DatagramProtocol): data=ciphertext, ) self._send(pkt, addr) + + async def _send_reply_v1( + self, client_seq: int, message: Message, addr: tuple[str, int] + ) -> None: + """v1 counterpart of :meth:`_send_reply` -- wraps the encrypted + reply in an ``OmniLinkMessage`` (16) outer packet instead of + ``OmniLink2Message`` (32).""" + assert self._session_key is not None + plaintext = message.encode() + ciphertext = encrypt_message_payload(plaintext, client_seq, self._session_key) + pkt = Packet( + seq=client_seq, + type=PacketType.OmniLinkMessage, + data=ciphertext, + ) + self._send(pkt, addr) diff --git a/tests/test_e2e_v1_mock.py b/tests/test_e2e_v1_mock.py new file mode 100644 index 0000000..7d88f0c --- /dev/null +++ b/tests/test_e2e_v1_mock.py @@ -0,0 +1,252 @@ +"""End-to-end: OmniClientV1 ↔ MockPanel speaking the v1 wire dialect. + +Exercises the MockPanel's new ``_dispatch_v1`` path over UDP (which +is what ``OmniClientV1`` opens — see :class:`omni_pca.v1.connection. +OmniConnectionV1`). The packets travel ``127.0.0.1`` so there is no +real packet-loss risk; we still set a 2 s per-reply timeout to fail +fast if the dispatcher hangs. +""" + +from __future__ import annotations + +import pytest + +from omni_pca.commands import CommandFailedError +from omni_pca.mock_panel import ( + MockAreaState, + MockButtonState, + MockPanel, + MockState, + MockThermostatState, + MockUnitState, + MockZoneState, +) +from omni_pca.models import SecurityMode +from omni_pca.v1 import NameType, OmniClientV1 + +CONTROLLER_KEY = bytes.fromhex("6ba7b4e9b4656de3cd7edd4c650cdb09") + + +def _populated_state() -> MockState: + return MockState( + zones={ + 1: MockZoneState(name="FRONT DOOR"), + 2: MockZoneState(name="BACK DOOR"), + 3: MockZoneState(name="LIVING MOT", current_state=1, loop=0xFD), + }, + units={ + 1: MockUnitState(name="FRONT PORCH", state=1), # on + 2: MockUnitState(name="LIVING LAMP", state=0x96), # 50% brightness + }, + areas={1: MockAreaState(name="MAIN", mode=int(SecurityMode.OFF))}, + thermostats={ + 1: MockThermostatState( + name="DOWNSTAIRS", + temperature_raw=170, heat_setpoint_raw=140, + cool_setpoint_raw=200, system_mode=1, fan_mode=0, hold_mode=0, + ), + }, + buttons={1: MockButtonState(name="GOOD MORNING")}, + user_codes={1: 1234}, + ) + + +# ---- handshake + read API ------------------------------------------------ + + +@pytest.mark.asyncio +async def test_v1_handshake_and_system_information() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state()) + async with panel.serve(transport="udp") as (host, port): + async with OmniClientV1( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as c: + info = await c.get_system_information() + assert info.model_name == "Omni Pro II" + assert info.firmware_version == "2.12r1" + + +@pytest.mark.asyncio +async def test_v1_get_system_status_reports_areas() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state()) + async with panel.serve(transport="udp") as (host, port): + async with OmniClientV1( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as c: + status = await c.get_system_status() + # Mock emits 8 area mode bytes (Omni Pro II cap). + assert len(status.area_alarms) == 8 + # Each tuple is (mode, 0); area 1 was OFF (0). + assert status.area_alarms[0] == (0, 0) + + +@pytest.mark.asyncio +async def test_v1_zone_status_short_form() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state()) + async with panel.serve(transport="udp") as (host, port): + async with OmniClientV1( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as c: + zones = await c.get_zone_status(1, 8) + assert len(zones) == 8 + assert zones[1].is_secure + # Zone 3 has current_state=1 (NotReady -> open). + assert zones[3].is_open + assert zones[3].loop == 0xFD + + +@pytest.mark.asyncio +async def test_v1_unit_status_short_form() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state()) + async with panel.serve(transport="udp") as (host, port): + async with OmniClientV1( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as c: + units = await c.get_unit_status(1, 4) + assert units[1].is_on + assert units[2].brightness == 50 # state=0x96 == 150 -> 50% + assert not units[3].is_on # undefined slot, defaults + assert not units[4].is_on + + +@pytest.mark.asyncio +async def test_v1_unit_status_long_form() -> None: + """Force the BE-u16 wire form by including indices > 255.""" + state = _populated_state() + state.units[300] = MockUnitState(name="SPRINKLER-Z3", state=1) + panel = MockPanel(controller_key=CONTROLLER_KEY, state=state) + async with panel.serve(transport="udp") as (host, port): + async with OmniClientV1( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as c: + units = await c.get_unit_status(298, 302) + assert len(units) == 5 + assert units[300].is_on + + +@pytest.mark.asyncio +async def test_v1_thermostat_status() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state()) + async with panel.serve(transport="udp") as (host, port): + async with OmniClientV1( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as c: + tstats = await c.get_thermostat_status(1, 1) + t = tstats[1] + assert t.temperature_raw == 170 + assert t.heat_setpoint_raw == 140 + assert t.cool_setpoint_raw == 200 + assert t.system_mode == 1 + assert t.fan_mode == 0 + assert t.hold_mode == 0 + + +# ---- UploadNames streaming ---------------------------------------------- + + +@pytest.mark.asyncio +async def test_v1_upload_names_streams_all_objects() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state()) + async with panel.serve(transport="udp") as (host, port): + async with OmniClientV1( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as c: + all_names = await c.list_all_names() + + # Expected: Zones 1-3, Units 1-2, Button 1, Area 1, Thermostat 1. + assert set(all_names.keys()) == { + int(NameType.ZONE), + int(NameType.UNIT), + int(NameType.BUTTON), + int(NameType.AREA), + int(NameType.THERMOSTAT), + } + assert all_names[int(NameType.ZONE)] == { + 1: "FRONT DOOR", 2: "BACK DOOR", 3: "LIVING MOT", + } + assert all_names[int(NameType.UNIT)] == { + 1: "FRONT PORCH", 2: "LIVING LAMP", + } + assert all_names[int(NameType.BUTTON)] == {1: "GOOD MORNING"} + assert all_names[int(NameType.AREA)] == {1: "MAIN"} + assert all_names[int(NameType.THERMOSTAT)] == {1: "DOWNSTAIRS"} + + +@pytest.mark.asyncio +async def test_v1_upload_names_empty_panel_returns_no_records() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY) + async with panel.serve(transport="udp") as (host, port): + async with OmniClientV1( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as c: + all_names = await c.list_all_names() + assert all_names == {} + + +@pytest.mark.asyncio +async def test_v1_upload_names_two_byte_form_for_high_indices() -> None: + state = _populated_state() + state.units[300] = MockUnitState(name="Z-LANDSCAPE") # > 255 + panel = MockPanel(controller_key=CONTROLLER_KEY, state=state) + async with panel.serve(transport="udp") as (host, port): + async with OmniClientV1( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as c: + all_names = await c.list_all_names() + assert all_names[int(NameType.UNIT)][300] == "Z-LANDSCAPE" + + +# ---- write methods ------------------------------------------------------ + + +@pytest.mark.asyncio +async def test_v1_turn_unit_on_mutates_mock_state() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state()) + async with panel.serve(transport="udp") as (host, port): + async with OmniClientV1( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as c: + assert panel.state.units[2].state == 0x96 # 50% + await c.set_unit_level(2, 75) + assert panel.state.units[2].state == 100 + 75 # 175 = 75% + + +@pytest.mark.asyncio +async def test_v1_bypass_and_restore_zone() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state()) + async with panel.serve(transport="udp") as (host, port): + async with OmniClientV1( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as c: + await c.bypass_zone(1, code=1) + assert panel.state.zones[1].is_bypassed + await c.restore_zone(1, code=1) + assert not panel.state.zones[1].is_bypassed + + +@pytest.mark.asyncio +async def test_v1_execute_security_command_arm_away() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state()) + async with panel.serve(transport="udp") as (host, port): + async with OmniClientV1( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as c: + await c.execute_security_command( + area=1, mode=SecurityMode.AWAY, code=1234 + ) + assert panel.state.areas[1].mode == int(SecurityMode.AWAY) + + +@pytest.mark.asyncio +async def test_v1_execute_security_command_wrong_code() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state()) + async with panel.serve(transport="udp") as (host, port): + async with OmniClientV1( + host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0 + ) as c: + with pytest.raises(CommandFailedError): + await c.execute_security_command( + area=1, mode=SecurityMode.AWAY, code=9999 + ) + # State unchanged after failed command. + assert panel.state.areas[1].mode == int(SecurityMode.OFF)