omni-pca/tests/test_v1_messages.py
Ryan Malloy 92c8b695b4 v1-over-UDP: parallel OmniClientV1 for panels that listen UDP-only
Some Omni network modules are configured for UDP, in which case PC Access
falls back to the v1 wire protocol (OmniLinkMessage outer = 0x10, inner
StartChar 0x5A, typed Request*Status opcodes) instead of v2's TCP path
(OmniLink2Message + StartChar 0x21 + parameterised RequestProperties).
This adds a parallel implementation rather than overloading the v2 path.

omni_pca/v1/
  connection.py   UDP-only OmniConnectionV1; reuses crypto + handshake,
                  routes post-handshake messages through OmniLinkMessage
                  (0x10) wrapping v1 inner format. Adds iter_streaming
                  for the lock-step UploadNames/Acknowledge/EOD pattern.
  messages.py     Block parsers for the typed v1 status replies (zone,
                  unit, thermostat, aux), v1 SystemStatus, and NameData
                  (handles both one-byte and two-byte NameNumber forms).
  client.py       OmniClientV1: read API (get_system_information,
                  get_*_status), discovery (iter_names + list_*_names),
                  write API (execute_command, execute_security_command,
                  turn_unit_*, set_unit_level, bypass/restore_zone,
                  execute_button, set_thermostat_*). acknowledge_alerts
                  is a no-op (v1 has no equivalent opcode).

Discovery uses bare UploadNames; panel streams every defined name across
all types in a fixed order with per-record Acknowledge. Verified against
firmware 2.12 — pulled 16 zones, 44 units, 16 buttons, 8 codes,
2 thermostats, 8 messages in one stream.

src/omni_pca/message.py
  Fix flipped START_CHAR_V1_* constants. enuOmniLinkMessageFormat says
  Addressable=0x41 and NonAddressable=0x5A; our names had them swapped.
  Wire bytes were unchanged, so existing tests kept passing — but
  encode_v1() with no serial_address now correctly emits 0x5A, which
  is what UDP needs.

tests/
  test_v1_messages.py        22 cases; payloads are real wire captures
                              from a firmware-2.12 panel via probe_v1_recon.
  test_v1_client_commands.py 20 cases; payload-packing for the Command
                              and ExecuteSecurityCommand opcodes,
                              including BE u16 parameter2 and the
                              digit-by-digit security code form.

dev/
  probe_v1.py        Phase-1 smoke: handshake + RequestSystemInformation.
  probe_v1_recon.py  Raw opcode dump for protocol reconnaissance.
  probe_v1_stream.py Streaming UploadNames flow exploration.
  probe_v1_client.py Full read-path smoke test via OmniClientV1.
  probe_v1_write.py  Live no-op execute_command round-trip.

.gitignore: ignore dev/.omni_key (probe scripts read controller key from
this file as one fallback option).

Discovery on firmware 2.12: Request*ExtendedStatus opcodes (63/65/69)
NAK on this firmware — only the basic Request*Status opcodes are
implemented, so OmniClientV1 uses those (3 bytes/unit, 7 bytes/tstat,
4 bytes/aux records). HA still gets enough signal for polling; full
properties discovery uses streaming UploadNames instead.

Test totals: 387 passed, 1 skipped (existing fixture skip).
2026-05-11 01:08:01 -06:00

277 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Unit tests for omni_pca.v1.messages parsers.
Test vectors are real wire payloads captured from a firmware-2.12 Omni
Pro II panel via dev/probe_v1_recon.py — see the comment above each
test for the inputs that produced it.
"""
from __future__ import annotations
import pytest
from omni_pca.v1.messages import (
parse_v1_aux_status,
parse_v1_system_status,
parse_v1_thermostat_status,
parse_v1_unit_status,
parse_v1_zone_status,
)
# ---- ZoneStatus ---------------------------------------------------------
def test_v1_zone_status_secure_and_open() -> None:
# Captured: RequestZoneStatus(1, 8) → 16-byte payload, 8 zones × 2 bytes.
# zone 6 raw_status=0x01 (open), all others 0x00.
payload = bytes.fromhex("0080007f007f0080008001fd00810080")
zones = parse_v1_zone_status(payload, start_index=1)
assert len(zones) == 8
assert {z.index for z in zones} == set(range(1, 9))
assert zones[0].is_secure # zone 1
assert zones[5].is_open # zone 6
assert zones[5].raw_status == 0x01
assert zones[5].loop == 0xFD
def test_v1_zone_status_indexes_offset_by_start() -> None:
# If we requested zones 17..24, the same 16-byte payload should
# produce indexes 17..24.
payload = bytes.fromhex("0080007f007f0080008001fd00810080")
zones = parse_v1_zone_status(payload, start_index=17)
assert {z.index for z in zones} == set(range(17, 25))
def test_v1_zone_status_invalid_length() -> None:
with pytest.raises(ValueError, match="multiple of 2"):
parse_v1_zone_status(b"\x00\x00\x00", start_index=1)
# ---- UnitStatus ---------------------------------------------------------
def test_v1_unit_status_dimmer_levels() -> None:
# Captured: RequestUnitStatus(1, 8) → 24-byte payload, 8 units × 3 bytes.
# state bytes: 01, 01, 69, 96, 69, 00, 73, 00 → 100%, 100%, 5%, 50%, 5%, 0%, 15%, 0%
payload = bytes.fromhex("010000010000690000960000690000000000730000000000")
units = parse_v1_unit_status(payload, start_index=1)
assert len(units) == 8
assert units[0].is_on and units[0].brightness == 100 # state=0x01
assert units[2].brightness == 5 # state=0x69 = 105 → -100 = 5%
assert units[3].brightness == 50 # state=0x96 = 150 → -100 = 50%
assert not units[5].is_on # state=0x00
assert units[6].brightness == 15 # state=0x73 = 115 → -100 = 15%
def test_v1_unit_status_time_remaining_be_u16() -> None:
# Single record with remaining=0x1234.
payload = bytes([0x01, 0x12, 0x34])
units = parse_v1_unit_status(payload, start_index=42)
assert len(units) == 1
assert units[0].index == 42
assert units[0].time_remaining_secs == 0x1234
def test_v1_unit_status_invalid_length() -> None:
with pytest.raises(ValueError, match="multiple of 3"):
parse_v1_unit_status(b"\x00\x00", start_index=1)
# ---- ThermostatStatus ---------------------------------------------------
def test_v1_thermostat_status_unconfigured() -> None:
# Captured: RequestThermostatStatus(1, 4) → 28 B, all values 0/0/0/0/0/0/0
# except byte 0 of records 0-1 which is 0x01 (status). The "raw=0" temps
# decode to -40°C / -40°F per omni_temp_to_fahrenheit.
payload = bytes.fromhex(
"01000000000000010000000000000000000000000000000000000000"
)
tstats = parse_v1_thermostat_status(payload, start_index=1)
assert len(tstats) == 4
assert tstats[0].status == 0x01
assert tstats[2].status == 0x00
assert tstats[0].humidity_raw == 0 # zero-filled (v1 doesn't carry it)
assert tstats[0].outdoor_temperature_raw == 0
assert tstats[0].horc_status == 0
def test_v1_thermostat_full_record() -> None:
# Hand-constructed: status=0x01, temp=170 (=45°F), heat=140 (30°F),
# cool=200 (60°F), mode=1, fan=2, hold=3.
payload = bytes([0x01, 170, 140, 200, 1, 2, 3])
tstats = parse_v1_thermostat_status(payload, start_index=5)
assert len(tstats) == 1
t = tstats[0]
assert t.index == 5
assert t.status == 0x01
assert t.temperature_raw == 170
assert t.heat_setpoint_raw == 140
assert t.cool_setpoint_raw == 200
assert t.system_mode == 1
assert t.fan_mode == 2
assert t.hold_mode == 3
def test_v1_thermostat_invalid_length() -> None:
with pytest.raises(ValueError, match="multiple of 7"):
parse_v1_thermostat_status(b"\x00" * 6, start_index=1)
# ---- AuxiliaryStatus ----------------------------------------------------
def test_v1_aux_status_all_zero() -> None:
# Captured: RequestAuxiliaryStatus(1, 8) → 32 B all zeros.
payload = bytes(32)
auxes = parse_v1_aux_status(payload, start_index=1)
assert len(auxes) == 8
assert all(a.output == 0 and a.value_raw == 0 for a in auxes)
def test_v1_aux_status_record_field_order() -> None:
# Single record: output=11, value=22, low=33, high=44
payload = bytes([11, 22, 33, 44])
auxes = parse_v1_aux_status(payload, start_index=99)
assert len(auxes) == 1
a = auxes[0]
assert a.index == 99
assert a.output == 11
assert a.value_raw == 22
assert a.low_raw == 33
assert a.high_raw == 44
def test_v1_aux_invalid_length() -> None:
with pytest.raises(ValueError, match="multiple of 4"):
parse_v1_aux_status(b"\x00\x00\x00", start_index=1)
# ---- SystemStatus -------------------------------------------------------
def test_v1_system_status_full_payload() -> None:
# Captured: RequestSystemStatus → 38 B payload from firmware 2.12.
# Bytes: 011a050a07163b1c01061c150003 + 24 area-mode bytes
# decode: time_valid=1, year=26 (=2026), month=05, day=10,
# dow=07, hour=22, min=59, sec=28, dst=01, sun_h=06, sun_m=28,
# sun_h2=21, sun_m2=21, battery=0x00, then area modes.
# Note: the 14th byte (0x03) is the BATTERY reading = 3, not 0.
payload = bytes.fromhex(
"011a050a07163b1c01061c150003000000000000000002090000000000000000000000000000"
)
s = parse_v1_system_status(payload)
assert s.time_valid is True
assert s.panel_time is not None
assert s.panel_time.year == 2000 + 0x1A # 2026
assert s.panel_time.month == 0x05
assert s.panel_time.day == 0x0A
assert s.sunrise_hour == 0x06
assert s.sunrise_minute == 0x1C # 28
assert s.sunset_hour == 0x15 # 21
assert s.sunset_minute == 0x00
assert s.battery_reading == 0x03
# 24 trailing bytes promoted to area_alarms tuples (mode_byte, 0).
assert len(s.area_alarms) == 24
assert s.area_alarms[0] == (0, 0)
# Area 9 in this capture had mode=2.
assert s.area_alarms[8] == (2, 0)
def test_v1_system_status_minimum_payload() -> None:
# Just the 14 header bytes, no area modes.
payload = bytes(14)
s = parse_v1_system_status(payload)
assert s.time_valid is False
assert s.panel_time is None
assert s.battery_reading == 0
assert s.area_alarms == ()
def test_v1_system_status_too_short_raises() -> None:
with pytest.raises(ValueError, match="too short"):
parse_v1_system_status(b"\x00" * 13)
# ---- NameData -----------------------------------------------------------
from omni_pca.v1.messages import NameType, parse_v1_namedata # noqa: E402
def test_v1_namedata_zone_one_byte_form() -> None:
# Captured: UploadNames stream → first reply = Zone #1 'GARAGE ENTRY'.
# Payload 18 B = type(1) + num(1) + name(15) + reserved(1).
payload = bytes.fromhex("010147415241474520454e54525900000000")
rec = parse_v1_namedata(payload)
assert rec.name_type == int(NameType.ZONE)
assert rec.name_type_label == "ZONE"
assert rec.number == 1
assert rec.name == "GARAGE ENTRY"
def test_v1_namedata_unit_one_byte_form() -> None:
# Hand-crafted: Unit #5 = "GARAGE ENTRY" (12-char name slot, no padding need).
name = "GARAGE ENTRY"
payload = (
bytes([int(NameType.UNIT), 5])
+ name.encode("ascii").ljust(12, b"\x00")
+ b"\x00" # reserved trailing byte
)
rec = parse_v1_namedata(payload)
assert rec.name_type == int(NameType.UNIT)
assert rec.number == 5
assert rec.name == name
def test_v1_namedata_unit_two_byte_form() -> None:
# Unit #257 = 'Z1-LANDSCAPE' — captured from the real panel after the
# numbered units rolled over 256.
payload = (
bytes([int(NameType.UNIT), 0x01, 0x01]) # type, num_hi=1, num_lo=1
+ b"Z1-LANDSCAPE".ljust(12, b"\x00") # 12-char name
+ b"\x00"
)
rec = parse_v1_namedata(payload)
assert rec.name_type == int(NameType.UNIT)
assert rec.number == 257
assert rec.name == "Z1-LANDSCAPE"
def test_v1_namedata_thermostat() -> None:
payload = (
bytes([int(NameType.THERMOSTAT), 1])
+ b"DOWNSTAIRS".ljust(12, b"\x00")
+ b"\x00"
)
rec = parse_v1_namedata(payload)
assert rec.name_type == int(NameType.THERMOSTAT)
assert rec.number == 1
assert rec.name == "DOWNSTAIRS"
def test_v1_namedata_strips_trailing_nulls() -> None:
payload = (
bytes([int(NameType.ZONE), 9])
+ b"HALL MOTION".ljust(15, b"\x00")
+ b"\x00"
)
rec = parse_v1_namedata(payload)
assert rec.name == "HALL MOTION" # no embedded nulls in result
def test_v1_namedata_unknown_type_falls_through() -> None:
# Unknown name type — parser should still return SOMETHING by
# consuming the rest as the name. HA filters by NameType anyway.
payload = bytes([99, 7]) + b"WHATEVER\x00\x00"
rec = parse_v1_namedata(payload)
assert rec.name_type == 99
assert rec.name_type_label == "Unknown(99)"
assert rec.number == 7
assert rec.name == "WHATEVER"
def test_v1_namedata_short_payload_raises() -> None:
with pytest.raises(ValueError, match="too short"):
parse_v1_namedata(b"\x01\x00")