MockPanel: v1 wire dispatch for hermetic OmniClientV1 tests

Adds OmniLinkMessage (0x10) outer-packet handling to the mock so the
v1 path no longer requires a real panel for testing. Exercised over
UDP because OmniClientV1 is UDP-only by design, but the dispatcher
itself is transport-agnostic and the TCP _handle_client routes
OmniLinkMessage packets through the same _dispatch_v1 method too.

Coverage today:
  * RequestSystemInformation (17) -> SystemInformation (18)
  * RequestSystemStatus      (19) -> SystemStatus (20), 8 area mode bytes
  * RequestZoneStatus        (21) -> ZoneStatus (22), short + long form
  * RequestUnitStatus        (23) -> UnitStatus (24), short + long form
                                     (long form auto-selected for indices > 255)
  * RequestThermostatStatus  (30) -> ThermostatStatus (31)
  * RequestAuxiliaryStatus   (25) -> AuxiliaryStatus  (26) (zero records)
  * UploadNames              (12) -> NameData (11) streaming, lock-step
                                     Ack-driven across Zone/Unit/Button/
                                     Area/Thermostat, terminated by EOD (3)
  * Command                  (15) -> Ack (5) / Nak (6), reuses v2 state
                                     mutator so light-on/off, set-level,
                                     bypass-zone, restore-zone all work
  * ExecuteSecurityCommand   (102) -> Ack (5) / ExecuteSecurityCommandResponse
                                      (103) on bad code, with structured
                                      status byte preserved
  * MessageCrcError          -> v1 Nak (opcode 6)

The dispatcher writes replies wrapped in OmniLinkMessage (16) outer
packets (vs OmniLink2Message (32) used by v2) so OmniClientV1 routes
them correctly. The 4-step handshake is shared with v2 -- it's
protocol-version-agnostic at the outer-packet layer.

UploadNames state is panel-instance scoped via _upload_names_cursor
(int | None) -- there is only one active session at a time on the
mock so a single cursor suffices.

tests/test_e2e_v1_mock.py: 13 cases driving OmniClientV1 through the
mock's UDP socket, covering the full read API + UploadNames streaming
+ write methods + structured-failure path on a wrong security code.

Full suite: 400 passed, 1 skipped (was 387 / 1).
This commit is contained in:
Ryan Malloy 2026-05-11 16:32:51 -06:00
parent dd53b2a89a
commit 0e3835d4ff
2 changed files with 705 additions and 2 deletions

View File

@ -52,6 +52,7 @@ import secrets
from collections.abc import AsyncIterator, Callable
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import ClassVar
from .commands import Command
from .crypto import (
@ -60,8 +61,14 @@ from .crypto import (
derive_session_key,
encrypt_message_payload,
)
from .message import Message, MessageCrcError, MessageFormatError, encode_v2
from .opcodes import OmniLink2MessageType, PacketType
from .message import (
Message,
MessageCrcError,
MessageFormatError,
encode_v1,
encode_v2,
)
from .opcodes import OmniLink2MessageType, OmniLinkMessageType, PacketType
from .packet import Packet
_log = logging.getLogger(__name__)
@ -328,6 +335,9 @@ class MockPanel:
self._active_writer: asyncio.StreamWriter | None = None
self._active_session_key: bytes | None = None
self._push_tasks: set[asyncio.Task[None]] = set()
# v1 UploadNames cursor: index into self._v1_name_stream() while a
# streaming download is in flight, ``None`` when no stream active.
self._upload_names_cursor: int | None = None
# -------- public observables (handy in tests) --------
@ -461,6 +471,19 @@ class MockPanel:
if not cont:
break
elif pkt_type is PacketType.OmniLinkMessage:
# v1 wire dialect — UDP-only panels speak this on the
# wire. We accept it over TCP too so the same mock
# server can fixture both transports for tests.
if session_key is None:
_log.debug("mock panel: v1 message before secure session")
break
cont = await self._handle_encrypted_message_v1(
reader, seq, session_key, writer
)
if not cont:
break
else:
_log.debug("mock panel: unhandled packet type %s", pkt_type.name)
break
@ -588,6 +611,68 @@ class MockPanel:
self._schedule_event_push(push_words, session_key, writer)
return True
async def _handle_encrypted_message_v1(
self,
reader: asyncio.StreamReader,
client_seq: int,
session_key: bytes,
writer: asyncio.StreamWriter,
) -> bool:
"""v1 counterpart of :meth:`_handle_encrypted_message`.
Reads, decrypts, decodes a v1 inner message (StartChar 0x5A),
dispatches via :meth:`_dispatch_v1`, and writes the reply back
wrapped in an ``OmniLinkMessage`` (16) outer packet.
"""
first_block = await _read_exact(reader, BLOCK_SIZE)
if first_block is None:
return False
first_plain = decrypt_message_payload(first_block, client_seq, session_key)
# first_plain[0] = StartChar (0x5A), first_plain[1] = MessageLength
msg_length = first_plain[1]
extra_needed = max(0, msg_length + 4 - BLOCK_SIZE)
rem = (-extra_needed) % BLOCK_SIZE
extra_aligned = extra_needed + rem
ciphertext = first_block
if extra_aligned > 0:
extra = await _read_exact(reader, extra_aligned)
if extra is None:
return False
ciphertext = first_block + extra
plaintext = decrypt_message_payload(ciphertext, client_seq, session_key)
try:
inner = Message.decode(plaintext)
except MessageCrcError:
_log.debug("mock panel: v1 inner message CRC failure")
await self._send_v1_reply(
client_seq, _build_v1_nak(0), session_key, writer
)
return True
except MessageFormatError as exc:
_log.debug("mock panel: malformed v1 inner message: %s", exc)
return False
opcode = inner.opcode
self._last_request_opcode = opcode
try:
opcode_name = OmniLinkMessageType(opcode).name
except ValueError:
opcode_name = f"Unknown({opcode})"
_log.debug(
"mock panel: v1 dispatch opcode=%s payload=%d bytes",
opcode_name, len(inner.payload),
)
reply, push_words = self._dispatch_v1(opcode, inner.payload)
await self._send_v1_reply(client_seq, reply, session_key, writer)
# v1 push events use opcode 35 instead of 55; the existing
# _schedule_event_push helper is v2-shaped and would emit a
# frame the v1 client can't parse. Skip pushes on v1 for now
# -- streaming UploadNames is the dominant flow we care about.
_ = push_words
return True
def _dispatch_v2(
self, opcode: int, payload: bytes
) -> tuple[Message, tuple[int, ...]]:
@ -1091,6 +1176,299 @@ class MockPanel:
writer.write(pkt.encode())
await writer.drain()
# ============================================================
# v1 wire-dialect dispatch (panels listening UDP-only)
# ============================================================
#
# Same crypto + handshake as v2; only the outer ``PacketType``
# (OmniLinkMessage = 16 vs OmniLink2Message = 32) and the inner
# ``Message.start_char`` (0x5A NonAddressable vs 0x21 OmniLink2)
# differ. The dispatch table here mirrors what ``OmniClientV1``
# actually sends — see omni_pca.v1.client.
def _dispatch_v1(
self, opcode: int, payload: bytes
) -> tuple[Message, tuple[int, ...]]:
"""Dispatch a single v1 request, return (reply_msg, push_words).
Mirrors :meth:`_dispatch_v2` but for v1 opcodes (see enuOmniLinkMessageType).
"""
if opcode == OmniLinkMessageType.RequestSystemInformation:
return self._v1_reply_system_information(), ()
if opcode == OmniLinkMessageType.RequestSystemStatus:
return self._v1_reply_system_status(), ()
if opcode == OmniLinkMessageType.RequestZoneStatus:
return self._v1_reply_zone_status(payload), ()
if opcode == OmniLinkMessageType.RequestUnitStatus:
return self._v1_reply_unit_status(payload), ()
if opcode == OmniLinkMessageType.RequestThermostatStatus:
return self._v1_reply_thermostat_status(payload), ()
if opcode == OmniLinkMessageType.RequestAuxiliaryStatus:
return self._v1_reply_auxiliary_status(payload), ()
if opcode == OmniLinkMessageType.UploadNames:
return self._v1_start_upload_names_stream(), ()
if opcode == OmniLinkMessageType.Ack:
# During an active UploadNames stream, each client Ack
# advances the cursor. With no active stream, drop silently
# (Ack as a request opcode is only meaningful mid-stream).
if self._upload_names_cursor is not None:
return self._v1_advance_upload_names_stream(), ()
return _build_v1_nak(opcode), ()
if opcode == OmniLinkMessageType.Command:
return self._v1_handle_command(payload)
if opcode == OmniLinkMessageType.ExecuteSecurityCommand:
return self._v1_handle_execute_security_command(payload)
return _build_v1_nak(opcode), ()
# ---- v1 reply builders ----
def _v1_reply_system_information(self) -> Message:
# Wire layout is byte-identical to v2 (clsOLMsgSystemInformation.cs
# vs clsOL2MsgSystemInformation.cs) -- only the opcode differs.
s = self.state
body = bytes(
[
s.model_byte & 0xFF,
s.firmware_major & 0xFF,
s.firmware_minor & 0xFF,
s.firmware_revision & 0xFF,
]
) + _name_bytes(s.local_phone, _PHONE_LEN)
return encode_v1(OmniLinkMessageType.SystemInformation, body)
def _v1_reply_system_status(self) -> Message:
# Bytes 0..13 byte-identical to v2 SystemStatus.
# After byte 13 the v1 wire carries per-area Mode bytes (one byte
# each) instead of v2's 2-byte alarm pairs. We emit eight zero
# mode bytes so OmniClientV1 sees 8 areas reporting OFF.
s = self.state
body = bytes(
[
1 if s.time_set else 0,
s.year & 0xFF, s.month & 0xFF, s.day & 0xFF,
s.day_of_week & 0xFF,
s.hour & 0xFF, s.minute & 0xFF, s.second & 0xFF,
s.daylight_saving & 0xFF,
s.sunrise_hour & 0xFF, s.sunrise_minute & 0xFF,
s.sunset_hour & 0xFF, s.sunset_minute & 0xFF,
s.battery & 0xFF,
]
)
# Per-area mode bytes (8 areas on Omni Pro II).
for idx in range(1, 9):
area = self.state.areas.get(idx)
body += bytes([area.mode if area else 0])
return encode_v1(OmniLinkMessageType.SystemStatus, body)
@staticmethod
def _v1_decode_range(payload: bytes) -> tuple[int, int] | None:
"""Decode RequestUnitStatus / RequestZoneStatus / ... range payload.
Short form (both 255): 2 bytes [start, end].
Long form (either > 255): 4 bytes [start_hi, start_lo, end_hi, end_lo].
See clsOLMsgRequestUnitStatus.cs:18-31.
"""
if len(payload) == 2:
return payload[0], payload[1]
if len(payload) == 4:
return (payload[0] << 8) | payload[1], (payload[2] << 8) | payload[3]
return None
def _v1_reply_zone_status(self, payload: bytes) -> Message:
rng = self._v1_decode_range(payload)
if rng is None or rng[0] > rng[1]:
return _build_v1_nak(OmniLinkMessageType.RequestZoneStatus)
start, end = rng
records = b""
for idx in range(start, end + 1):
z = self.state.zones.get(idx)
if z is not None:
records += bytes([z.status_byte, z.loop])
else:
# Slots without a defined zone still respond with
# zero bytes -- real panels do this too.
records += b"\x00\x00"
return encode_v1(OmniLinkMessageType.ZoneStatus, records)
def _v1_reply_unit_status(self, payload: bytes) -> Message:
rng = self._v1_decode_range(payload)
if rng is None or rng[0] > rng[1]:
return _build_v1_nak(OmniLinkMessageType.RequestUnitStatus)
start, end = rng
records = b""
for idx in range(start, end + 1):
u = self.state.units.get(idx)
if u is not None:
records += bytes(
[u.state, (u.time_remaining >> 8) & 0xFF, u.time_remaining & 0xFF]
)
else:
records += b"\x00\x00\x00"
return encode_v1(OmniLinkMessageType.UnitStatus, records)
def _v1_reply_thermostat_status(self, payload: bytes) -> Message:
rng = self._v1_decode_range(payload)
if rng is None or rng[0] > rng[1]:
return _build_v1_nak(OmniLinkMessageType.RequestThermostatStatus)
start, end = rng
records = b""
for idx in range(start, end + 1):
t = self.state.thermostats.get(idx)
if t is not None:
records += bytes(
[
1, # communicating
t.temperature_raw, t.heat_setpoint_raw,
t.cool_setpoint_raw,
t.system_mode, t.fan_mode, t.hold_mode,
]
)
else:
records += b"\x00" * 7
return encode_v1(OmniLinkMessageType.ThermostatStatus, records)
def _v1_reply_auxiliary_status(self, payload: bytes) -> Message:
rng = self._v1_decode_range(payload)
if rng is None or rng[0] > rng[1]:
return _build_v1_nak(OmniLinkMessageType.RequestAuxiliaryStatus)
start, end = rng
# MockState has no aux sensors -- return zero records.
records = b"\x00\x00\x00\x00" * (end - start + 1)
return encode_v1(OmniLinkMessageType.AuxiliaryStatus, records)
# ---- UploadNames streaming ----
# NameType enum (from omni_pca.v1.messages.NameType -- duplicated here
# so the mock doesn't depend on the v1 subpackage at import time).
_V1_NAME_TYPE_ZONE: ClassVar[int] = 1
_V1_NAME_TYPE_UNIT: ClassVar[int] = 2
_V1_NAME_TYPE_BUTTON: ClassVar[int] = 3
_V1_NAME_TYPE_AREA: ClassVar[int] = 5
_V1_NAME_TYPE_THERMOSTAT: ClassVar[int] = 6
_V1_NAME_TYPE_LENGTH: ClassVar[dict[int, int]] = {
1: 15, # Zone
2: 12, # Unit
3: 12, # Button
4: 12, # Code (unused by mock)
5: 12, # Area
6: 12, # Thermostat
7: 15, # Message (unused by mock)
}
def _v1_name_stream(self) -> list[tuple[int, int, str]]:
"""Flat list of (NameType, number, name) tuples — the panel emits
these in the order Zone Unit Button Code Area
Thermostat Message during ``UploadNames`` streaming.
Empty-named objects are skipped (matches real-panel behavior).
"""
items: list[tuple[int, int, str]] = []
for idx in sorted(self.state.zones):
n = self.state.zones[idx].name
if n:
items.append((self._V1_NAME_TYPE_ZONE, idx, n))
for idx in sorted(self.state.units):
n = self.state.units[idx].name
if n:
items.append((self._V1_NAME_TYPE_UNIT, idx, n))
for idx in sorted(self.state.buttons):
n = self.state.buttons[idx].name
if n:
items.append((self._V1_NAME_TYPE_BUTTON, idx, n))
for idx in sorted(self.state.areas):
n = self.state.areas[idx].name
if n:
items.append((self._V1_NAME_TYPE_AREA, idx, n))
for idx in sorted(self.state.thermostats):
n = self.state.thermostats[idx].name
if n:
items.append((self._V1_NAME_TYPE_THERMOSTAT, idx, n))
return items
def _v1_namedata_msg(self, type_byte: int, num: int, name: str) -> Message:
"""Encode a single NameData reply payload (clsOLMsgNameData.cs)."""
L = self._V1_NAME_TYPE_LENGTH.get(type_byte, 12)
encoded = name.encode("utf-8")[:L].ljust(L, b"\x00")
if num <= 0xFF:
body = bytes([type_byte, num]) + encoded + b"\x00"
else:
body = bytes(
[type_byte, (num >> 8) & 0xFF, num & 0xFF]
) + encoded + b"\x00"
return encode_v1(OmniLinkMessageType.NameData, body)
def _v1_start_upload_names_stream(self) -> Message:
"""Handle bare ``UploadNames`` request -- send first NameData
(or EOD immediately if no defined names)."""
names = self._v1_name_stream()
if not names:
self._upload_names_cursor = None
return _build_v1_eod()
self._upload_names_cursor = 0
t, n, name = names[0]
return self._v1_namedata_msg(t, n, name)
def _v1_advance_upload_names_stream(self) -> Message:
"""Handle client ``Acknowledge`` during an active stream -- send
the next NameData or EOD when exhausted."""
names = self._v1_name_stream()
# _upload_names_cursor != None implied by caller
assert self._upload_names_cursor is not None
self._upload_names_cursor += 1
if self._upload_names_cursor >= len(names):
self._upload_names_cursor = None
return _build_v1_eod()
t, n, name = names[self._upload_names_cursor]
return self._v1_namedata_msg(t, n, name)
# ---- v1 Command / ExecuteSecurityCommand wrappers ----
# The wire payload format is byte-identical to v2 (clsOLMsgCommand.cs
# vs clsOL2MsgCommand.cs); only the outer opcode and the reply Ack
# opcode (v1=5 vs v2=1) differ. We reuse the v2 state-mutation
# helper and just wrap the reply.
def _v1_handle_command(
self, payload: bytes
) -> tuple[Message, tuple[int, ...]]:
v2_reply, push_words = self._handle_command(payload)
if v2_reply.opcode == int(OmniLink2MessageType.Ack):
return _build_v1_ack(), push_words
# Pass-through Nak (no state mutation push when command refused).
return _build_v1_nak(OmniLinkMessageType.Command), ()
def _v1_handle_execute_security_command(
self, payload: bytes
) -> tuple[Message, tuple[int, ...]]:
v2_reply, push_words = self._handle_execute_security_command(payload)
if v2_reply.opcode == int(OmniLink2MessageType.Ack):
return _build_v1_ack(), push_words
if v2_reply.opcode == int(
OmniLink2MessageType.ExecuteSecurityCommandResponse
):
# Preserve the structured response (status byte) but rebuild
# with v1 opcode so OmniClientV1 sees opcode 103, not 75.
return (
encode_v1(
OmniLinkMessageType.ExecuteSecurityCommandResponse,
v2_reply.payload,
),
push_words,
)
return _build_v1_nak(OmniLinkMessageType.ExecuteSecurityCommand), ()
async def _send_v1_reply(
self,
client_seq: int,
message: Message,
session_key: bytes,
writer: asyncio.StreamWriter,
) -> None:
plaintext = message.encode()
ciphertext = encrypt_message_payload(plaintext, client_seq, session_key)
pkt = Packet(seq=client_seq, type=PacketType.OmniLinkMessage, data=ciphertext)
writer.write(pkt.encode())
await writer.drain()
# --------------------------------------------------------------------------
# Status / ExtendedStatus per-record builders
@ -1271,6 +1649,26 @@ def _build_nak(in_reply_to_opcode: int) -> Message:
return encode_v2(OmniLink2MessageType.Nak, bytes([in_reply_to_opcode & 0xFF]))
# ---- v1 wire-dialect counterparts ----
def _build_v1_ack() -> Message:
"""Build a v1 Ack (opcode 5) with no payload."""
return encode_v1(OmniLinkMessageType.Ack, b"")
def _build_v1_nak(in_reply_to_opcode: int) -> Message:
"""Build a v1 Nak (opcode 6) carrying the offending opcode byte."""
return encode_v1(
OmniLinkMessageType.Nak, bytes([int(in_reply_to_opcode) & 0xFF])
)
def _build_v1_eod() -> Message:
"""Build a v1 EOD (opcode 3) -- the end-of-stream marker for bulk
downloads like ``UploadNames`` and ``UploadSetup``."""
return encode_v1(OmniLinkMessageType.EOD, b"")
async def _read_exact(reader: asyncio.StreamReader, n: int) -> bytes | None:
"""Read exactly ``n`` bytes or return None if EOF arrives early."""
try:
@ -1411,6 +1809,15 @@ class _MockServerDatagramProtocol(asyncio.DatagramProtocol):
await self._handle_encrypted_udp(pkt, addr)
return
if pkt.type is PacketType.OmniLinkMessage:
# v1 wire dialect — the canonical UDP-only dialect real
# panels speak. Routes through MockPanel._dispatch_v1.
if self._session_key is None:
_log.debug("mock panel (udp) v1 message before secure session")
return
await self._handle_encrypted_udp_v1(pkt, addr)
return
_log.debug("mock panel (udp) unhandled packet type %s", pkt.type.name)
async def _handle_encrypted_udp(
@ -1440,6 +1847,34 @@ class _MockServerDatagramProtocol(asyncio.DatagramProtocol):
if push_words:
self._schedule_udp_push(push_words, addr)
async def _handle_encrypted_udp_v1(
self, pkt: Packet, addr: tuple[str, int]
) -> None:
"""v1 UDP counterpart of :meth:`_handle_encrypted_udp`.
Same crypto, different inner-message dialect (StartChar 0x5A,
v1 opcodes) and different outer reply type (``OmniLinkMessage``
= 16, not 32).
"""
assert self._session_key is not None
try:
plaintext = decrypt_message_payload(
pkt.data, pkt.seq, self._session_key
)
except Exception:
_log.debug("mock panel (udp) failed to decrypt v1 message")
return
try:
inner = Message.decode(plaintext)
except (MessageCrcError, MessageFormatError):
await self._send_reply_v1(pkt.seq, _build_v1_nak(0), addr)
return
opcode = inner.opcode
self._panel._last_request_opcode = opcode
reply, _push_words = self._panel._dispatch_v1(opcode, inner.payload)
await self._send_reply_v1(pkt.seq, reply, addr)
def _schedule_udp_push(
self, words: tuple[int, ...], addr: tuple[str, int]
) -> None:
@ -1480,3 +1915,19 @@ class _MockServerDatagramProtocol(asyncio.DatagramProtocol):
data=ciphertext,
)
self._send(pkt, addr)
async def _send_reply_v1(
self, client_seq: int, message: Message, addr: tuple[str, int]
) -> None:
"""v1 counterpart of :meth:`_send_reply` -- wraps the encrypted
reply in an ``OmniLinkMessage`` (16) outer packet instead of
``OmniLink2Message`` (32)."""
assert self._session_key is not None
plaintext = message.encode()
ciphertext = encrypt_message_payload(plaintext, client_seq, self._session_key)
pkt = Packet(
seq=client_seq,
type=PacketType.OmniLinkMessage,
data=ciphertext,
)
self._send(pkt, addr)

252
tests/test_e2e_v1_mock.py Normal file
View File

@ -0,0 +1,252 @@
"""End-to-end: OmniClientV1 ↔ MockPanel speaking the v1 wire dialect.
Exercises the MockPanel's new ``_dispatch_v1`` path over UDP (which
is what ``OmniClientV1`` opens see :class:`omni_pca.v1.connection.
OmniConnectionV1`). The packets travel ``127.0.0.1`` so there is no
real packet-loss risk; we still set a 2 s per-reply timeout to fail
fast if the dispatcher hangs.
"""
from __future__ import annotations
import pytest
from omni_pca.commands import CommandFailedError
from omni_pca.mock_panel import (
MockAreaState,
MockButtonState,
MockPanel,
MockState,
MockThermostatState,
MockUnitState,
MockZoneState,
)
from omni_pca.models import SecurityMode
from omni_pca.v1 import NameType, OmniClientV1
CONTROLLER_KEY = bytes.fromhex("6ba7b4e9b4656de3cd7edd4c650cdb09")
def _populated_state() -> MockState:
return MockState(
zones={
1: MockZoneState(name="FRONT DOOR"),
2: MockZoneState(name="BACK DOOR"),
3: MockZoneState(name="LIVING MOT", current_state=1, loop=0xFD),
},
units={
1: MockUnitState(name="FRONT PORCH", state=1), # on
2: MockUnitState(name="LIVING LAMP", state=0x96), # 50% brightness
},
areas={1: MockAreaState(name="MAIN", mode=int(SecurityMode.OFF))},
thermostats={
1: MockThermostatState(
name="DOWNSTAIRS",
temperature_raw=170, heat_setpoint_raw=140,
cool_setpoint_raw=200, system_mode=1, fan_mode=0, hold_mode=0,
),
},
buttons={1: MockButtonState(name="GOOD MORNING")},
user_codes={1: 1234},
)
# ---- handshake + read API ------------------------------------------------
@pytest.mark.asyncio
async def test_v1_handshake_and_system_information() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
info = await c.get_system_information()
assert info.model_name == "Omni Pro II"
assert info.firmware_version == "2.12r1"
@pytest.mark.asyncio
async def test_v1_get_system_status_reports_areas() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
status = await c.get_system_status()
# Mock emits 8 area mode bytes (Omni Pro II cap).
assert len(status.area_alarms) == 8
# Each tuple is (mode, 0); area 1 was OFF (0).
assert status.area_alarms[0] == (0, 0)
@pytest.mark.asyncio
async def test_v1_zone_status_short_form() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
zones = await c.get_zone_status(1, 8)
assert len(zones) == 8
assert zones[1].is_secure
# Zone 3 has current_state=1 (NotReady -> open).
assert zones[3].is_open
assert zones[3].loop == 0xFD
@pytest.mark.asyncio
async def test_v1_unit_status_short_form() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
units = await c.get_unit_status(1, 4)
assert units[1].is_on
assert units[2].brightness == 50 # state=0x96 == 150 -> 50%
assert not units[3].is_on # undefined slot, defaults
assert not units[4].is_on
@pytest.mark.asyncio
async def test_v1_unit_status_long_form() -> None:
"""Force the BE-u16 wire form by including indices > 255."""
state = _populated_state()
state.units[300] = MockUnitState(name="SPRINKLER-Z3", state=1)
panel = MockPanel(controller_key=CONTROLLER_KEY, state=state)
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
units = await c.get_unit_status(298, 302)
assert len(units) == 5
assert units[300].is_on
@pytest.mark.asyncio
async def test_v1_thermostat_status() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
tstats = await c.get_thermostat_status(1, 1)
t = tstats[1]
assert t.temperature_raw == 170
assert t.heat_setpoint_raw == 140
assert t.cool_setpoint_raw == 200
assert t.system_mode == 1
assert t.fan_mode == 0
assert t.hold_mode == 0
# ---- UploadNames streaming ----------------------------------------------
@pytest.mark.asyncio
async def test_v1_upload_names_streams_all_objects() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
all_names = await c.list_all_names()
# Expected: Zones 1-3, Units 1-2, Button 1, Area 1, Thermostat 1.
assert set(all_names.keys()) == {
int(NameType.ZONE),
int(NameType.UNIT),
int(NameType.BUTTON),
int(NameType.AREA),
int(NameType.THERMOSTAT),
}
assert all_names[int(NameType.ZONE)] == {
1: "FRONT DOOR", 2: "BACK DOOR", 3: "LIVING MOT",
}
assert all_names[int(NameType.UNIT)] == {
1: "FRONT PORCH", 2: "LIVING LAMP",
}
assert all_names[int(NameType.BUTTON)] == {1: "GOOD MORNING"}
assert all_names[int(NameType.AREA)] == {1: "MAIN"}
assert all_names[int(NameType.THERMOSTAT)] == {1: "DOWNSTAIRS"}
@pytest.mark.asyncio
async def test_v1_upload_names_empty_panel_returns_no_records() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY)
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
all_names = await c.list_all_names()
assert all_names == {}
@pytest.mark.asyncio
async def test_v1_upload_names_two_byte_form_for_high_indices() -> None:
state = _populated_state()
state.units[300] = MockUnitState(name="Z-LANDSCAPE") # > 255
panel = MockPanel(controller_key=CONTROLLER_KEY, state=state)
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
all_names = await c.list_all_names()
assert all_names[int(NameType.UNIT)][300] == "Z-LANDSCAPE"
# ---- write methods ------------------------------------------------------
@pytest.mark.asyncio
async def test_v1_turn_unit_on_mutates_mock_state() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
assert panel.state.units[2].state == 0x96 # 50%
await c.set_unit_level(2, 75)
assert panel.state.units[2].state == 100 + 75 # 175 = 75%
@pytest.mark.asyncio
async def test_v1_bypass_and_restore_zone() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
await c.bypass_zone(1, code=1)
assert panel.state.zones[1].is_bypassed
await c.restore_zone(1, code=1)
assert not panel.state.zones[1].is_bypassed
@pytest.mark.asyncio
async def test_v1_execute_security_command_arm_away() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
await c.execute_security_command(
area=1, mode=SecurityMode.AWAY, code=1234
)
assert panel.state.areas[1].mode == int(SecurityMode.AWAY)
@pytest.mark.asyncio
async def test_v1_execute_security_command_wrong_code() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
with pytest.raises(CommandFailedError):
await c.execute_security_command(
area=1, mode=SecurityMode.AWAY, code=9999
)
# State unchanged after failed command.
assert panel.state.areas[1].mode == int(SecurityMode.OFF)