diff --git a/.gitignore b/.gitignore index 48e324e..da978ae 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,4 @@ panel_key* .wine-pca/ ha-config/ dist/ +dev/.omni_key diff --git a/dev/probe_v1.py b/dev/probe_v1.py new file mode 100644 index 0000000..ec50d3a --- /dev/null +++ b/dev/probe_v1.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +"""Phase-1 smoke test: v1-over-UDP handshake + RequestSystemInformation. + +Run inside the project venv: + cd /home/kdm/home-auto/omni-pca + uv run python dev/probe_v1.py [--host 192.168.1.9] [--port 4369] + +Requires the panel's controller key. Picks it up from (in order): + 1. ``--key 32hex`` on the command line + 2. ``OMNI_KEY`` env var + 3. ``dev/.omni_key`` file (gitignored) + 4. The bundled ``.pca`` plain fixture (developer-only fallback) + +Success criteria: panel returns a v1 SystemInformation message (opcode 18) +within the timeout. Failure modes we want to distinguish: + * UDP socket fails to open → routing / firewall + * Handshake step 2 timeout → wrong port, wrong panel + * Handshake step 4 termination → wrong controller key + * SystemInformation timeout → v1 path isn't doing what we think + * SystemInformation reply → v1-over-UDP is real, proceed to Phase 2 +""" + +from __future__ import annotations + +import argparse +import asyncio +import logging +import os +import sys +from pathlib import Path + +from omni_pca.v1.connection import ( + HandshakeError, + InvalidEncryptionKeyError, + OmniConnectionV1, + RequestTimeoutError, +) +from omni_pca.opcodes import OmniLinkMessageType + + +def _load_key(arg_key: str | None) -> bytes: + if arg_key: + return bytes.fromhex(arg_key) + env = os.environ.get("OMNI_KEY") + if env: + return bytes.fromhex(env) + keyfile = Path(__file__).parent / ".omni_key" + if keyfile.exists(): + return bytes.fromhex(keyfile.read_text().strip()) + fixture = Path("/home/kdm/home-auto/HAI/pca-re/extracted/Our_House.pca.plain") + if fixture.exists(): + from omni_pca.pca_file import ( + PcaReader, + _CAP_OMNI_PRO_II, + _parse_header, + _walk_to_connection, + ) + + r = PcaReader(fixture.read_bytes()) + _parse_header(r) + _walk_to_connection(r, _CAP_OMNI_PRO_II) + r.string8_fixed(120) # network_address + r.string8_fixed(5) # port + return bytes.fromhex(r.string8_fixed(32).ljust(32, "0")[:32]) + raise SystemExit("no controller key: pass --key, set OMNI_KEY, or create dev/.omni_key") + + +def _decode_system_information(payload: bytes) -> dict[str, object]: + """Parse the v1 SystemInformation payload (mirrors clsOLMsgSystemInformation).""" + if len(payload) < 29: + raise ValueError(f"SystemInformation payload too short: {len(payload)} bytes") + return { + "opcode": payload[0], + "model": payload[1], + "fw_major": payload[2], + "fw_minor": payload[3], + "fw_revision": int.from_bytes(payload[4:5], "big", signed=True), + "local_phone": payload[5:29].rstrip(b"\x00").decode("ascii", errors="replace"), + } + + +async def amain(args: argparse.Namespace) -> int: + key = _load_key(args.key) + print(f"[probe] target {args.host}:{args.port} key=...{key[-2:].hex()} (16 B)") + + try: + async with OmniConnectionV1( + host=args.host, + port=args.port, + controller_key=key, + timeout=args.timeout, + retry_count=args.retries, + ) as conn: + print(f"[probe] handshake OK state={conn.state.name} " + f"session_key=...{conn.session_key[-2:].hex() if conn.session_key else 'n/a'}") + + print("[probe] sending v1 RequestSystemInformation (opcode 17)") + reply = await conn.request(OmniLinkMessageType.RequestSystemInformation) + print(f"[probe] reply: start_char={reply.start_char:#04x} " + f"opcode={reply.opcode} payload={reply.data.hex()}") + + if reply.opcode != int(OmniLinkMessageType.SystemInformation): + print(f"[probe] WARNING: expected opcode 18 (SystemInformation), " + f"got {reply.opcode}") + return 2 + + info = _decode_system_information(reply.data) + print(f"[probe] ✓ v1-over-UDP works") + print(f" model = {info['model']}") + print(f" firmware = {info['fw_major']}.{info['fw_minor']}.{info['fw_revision']}") + print(f" phone = {info['local_phone']!r}") + + except InvalidEncryptionKeyError as exc: + print(f"[probe] handshake terminated: wrong controller key? ({exc})") + return 1 + except HandshakeError as exc: + print(f"[probe] handshake failed: {exc}") + return 1 + except RequestTimeoutError as exc: + print(f"[probe] no reply to RequestSystemInformation: {exc}") + print(" → handshake worked but v1 path isn't responding. " + "Check tcpdump for what's on the wire.") + return 2 + except OSError as exc: + print(f"[probe] socket error: {exc}") + return 1 + + return 0 + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--host", default="192.168.1.9") + parser.add_argument("--port", type=int, default=4369) + parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key") + parser.add_argument("--timeout", type=float, default=5.0) + parser.add_argument("--retries", type=int, default=2) + parser.add_argument("--debug", action="store_true", + help="enable DEBUG logging (TX/RX packet dump)") + args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.debug else logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + + return asyncio.run(amain(args)) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/dev/probe_v1_client.py b/dev/probe_v1_client.py new file mode 100644 index 0000000..f612d23 --- /dev/null +++ b/dev/probe_v1_client.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +"""Phase-2a smoke test: drive OmniClientV1 against the real panel. + +Hits the read-only methods we care about for HA polling. Compares parsed +values against the recon dump so we catch off-by-one byte errors fast. + +Run: + cd /home/kdm/home-auto/omni-pca + uv run python dev/probe_v1_client.py +""" + +from __future__ import annotations + +import argparse +import asyncio +import logging +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) +from probe_v1 import _load_key # type: ignore # noqa: E402 + +from omni_pca.v1 import OmniClientV1, OmniNakError + + +async def amain(args: argparse.Namespace) -> int: + key = _load_key(args.key) + print(f"[client probe] target {args.host}:{args.port}\n") + + async with OmniClientV1( + host=args.host, port=args.port, controller_key=key, timeout=4.0, + ) as c: + info = await c.get_system_information() + print(f"system: model={info.model_name} fw={info.firmware_version} " + f"phone={info.local_phone!r}") + + print("\n--- discovery (streaming UploadNames) ---") + all_names = await c.list_all_names() + for type_byte in sorted(all_names): + try: + from omni_pca.v1 import NameType + label = NameType(type_byte).name + except ValueError: + label = f"type{type_byte}" + print(f" {label} ({len(all_names[type_byte])} entries)") + for num in sorted(all_names[type_byte]): + print(f" #{num}: {all_names[type_byte][num]!r}") + + try: + sysstatus = await c.get_system_status() + print(f"status: time={sysstatus.panel_time} " + f"battery=0x{sysstatus.battery_reading:02x} " + f"sunrise={sysstatus.sunrise_hour:02d}:{sysstatus.sunrise_minute:02d} " + f"sunset={sysstatus.sunset_hour:02d}:{sysstatus.sunset_minute:02d} " + f"area_modes={[m for m, _ in sysstatus.area_alarms]}") + except Exception as exc: + print(f"system status failed: {type(exc).__name__}: {exc}") + + print("\n--- zones 1..16 ---") + zones = await c.get_zone_status(1, 16) + for idx in sorted(zones): + z = zones[idx] + flags = [] + if z.is_open: flags.append("open") + if z.is_in_alarm: flags.append("alarm") + if z.is_bypassed: flags.append("bypass") + if z.is_trouble: flags.append("trouble") + tag = ",".join(flags) or "secure" + print(f" zone {idx:2d}: status=0x{z.raw_status:02x} loop=0x{z.loop:02x} ({tag})") + + print("\n--- units 1..16 ---") + units = await c.get_unit_status(1, 16) + for idx in sorted(units): + u = units[idx] + br = u.brightness + br_s = f"{br}%" if br is not None else "n/a" + print(f" unit {idx:2d}: state=0x{u.state:02x} ({br_s}) " + f"time_remaining={u.time_remaining_secs}s") + + print("\n--- thermostats 1..4 ---") + try: + tstats = await c.get_thermostat_status(1, 4) + for idx in sorted(tstats): + t = tstats[idx] + print(f" tstat {idx}: status=0x{t.status:02x} " + f"temp_F={t.temperature_f:.1f} " + f"heat={t.heat_setpoint_f:.0f} cool={t.cool_setpoint_f:.0f} " + f"mode=0x{t.system_mode:02x} fan=0x{t.fan_mode:02x} " + f"hold=0x{t.hold_mode:02x}") + except OmniNakError as exc: + print(f" no thermostats configured: {exc}") + + print("\n--- aux 1..8 ---") + try: + auxes = await c.get_aux_status(1, 8) + for idx in sorted(auxes): + a = auxes[idx] + print(f" aux {idx}: output=0x{a.output:02x} value=0x{a.value_raw:02x} " + f"low=0x{a.low_raw:02x} high=0x{a.high_raw:02x}") + except OmniNakError as exc: + print(f" no aux sensors: {exc}") + + print("\n[client probe] ✓ disconnected cleanly") + return 0 + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--host", default="192.168.1.9") + parser.add_argument("--port", type=int, default=4369) + parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key") + parser.add_argument("--debug", action="store_true") + args = parser.parse_args() + logging.basicConfig( + level=logging.DEBUG if args.debug else logging.WARNING, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + return asyncio.run(amain(args)) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/dev/probe_v1_recon.py b/dev/probe_v1_recon.py new file mode 100644 index 0000000..bc1ddc7 --- /dev/null +++ b/dev/probe_v1_recon.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +"""Phase-2 reconnaissance: fetch v1 status replies from the real panel. + +Doesn't parse — just dumps the raw payload bytes for each known v1 opcode +so we can match them against the C# message classes before writing +parsers. Builds the picture of what your panel actually has configured. + +Run: + cd /home/kdm/home-auto/omni-pca + uv run python dev/probe_v1_recon.py [--debug] +""" + +from __future__ import annotations + +import argparse +import asyncio +import logging +import sys +from pathlib import Path + +# Reuse the key loader from probe_v1. +sys.path.insert(0, str(Path(__file__).parent)) +from probe_v1 import _load_key # type: ignore # noqa: E402 + +from omni_pca.opcodes import OmniLinkMessageType +from omni_pca.v1.connection import OmniConnectionV1, RequestTimeoutError + + +async def _request_or_warn( + conn: OmniConnectionV1, + label: str, + opcode: OmniLinkMessageType, + payload: bytes = b"", + expected_opcode: int | None = None, +) -> None: + print(f"--- {label} (req opcode {int(opcode)}, payload {payload.hex() or ''}) ---") + try: + reply = await conn.request(opcode, payload, timeout=4.0) + except RequestTimeoutError as exc: + print(f" TIMEOUT: {exc}") + return + except Exception as exc: + print(f" ERROR: {type(exc).__name__}: {exc}") + return + print(f" reply opcode = {reply.opcode}") + print(f" payload ({len(reply.payload)} B) = {reply.payload.hex()}") + if expected_opcode is not None and reply.opcode != expected_opcode: + print(f" NOTE: expected opcode {expected_opcode}, got {reply.opcode}") + + +async def amain(args: argparse.Namespace) -> int: + key = _load_key(args.key) + print(f"[recon] target {args.host}:{args.port}\n") + + async with OmniConnectionV1( + host=args.host, + port=args.port, + controller_key=key, + timeout=4.0, + retry_count=1, + ) as conn: + print(f"handshake OK state={conn.state.name}\n") + + # --- panel-wide --- + await _request_or_warn( + conn, "SystemInformation", OmniLinkMessageType.RequestSystemInformation, + expected_opcode=int(OmniLinkMessageType.SystemInformation), + ) + await _request_or_warn( + conn, "SystemStatus", OmniLinkMessageType.RequestSystemStatus, + expected_opcode=int(OmniLinkMessageType.SystemStatus), + ) + await _request_or_warn( + conn, "StatusSummary", OmniLinkMessageType.RequestStatusSummary, + expected_opcode=int(OmniLinkMessageType.StatusSummary), + ) + + # --- bulk status, small ranges so we can read the bytes --- + await _request_or_warn( + conn, "ZoneStatus[1..8]", OmniLinkMessageType.RequestZoneStatus, + payload=bytes([1, 8]), + expected_opcode=int(OmniLinkMessageType.ZoneStatus), + ) + await _request_or_warn( + conn, "ZoneExtendedStatus[1..8]", OmniLinkMessageType.RequestZoneExtendedStatus, + payload=bytes([1, 8]), + expected_opcode=int(OmniLinkMessageType.ZoneExtendedStatus), + ) + await _request_or_warn( + conn, "UnitStatus[1..8]", OmniLinkMessageType.RequestUnitStatus, + payload=bytes([1, 8]), + expected_opcode=int(OmniLinkMessageType.UnitStatus), + ) + await _request_or_warn( + conn, "UnitExtendedStatus[1..8]", OmniLinkMessageType.RequestUnitExtendedStatus, + payload=bytes([1, 8]), + expected_opcode=int(OmniLinkMessageType.UnitExtendedStatus), + ) + await _request_or_warn( + conn, "ThermostatStatus[1..4]", OmniLinkMessageType.RequestThermostatStatus, + payload=bytes([1, 4]), + expected_opcode=int(OmniLinkMessageType.ThermostatStatus), + ) + await _request_or_warn( + conn, "ThermostatExtendedStatus[1..4]", OmniLinkMessageType.RequestThermostatExtendedStatus, + payload=bytes([1, 4]), + expected_opcode=int(OmniLinkMessageType.ThermostatExtendedStatus), + ) + await _request_or_warn( + conn, "AuxiliaryStatus[1..8]", OmniLinkMessageType.RequestAuxiliaryStatus, + payload=bytes([1, 8]), + expected_opcode=int(OmniLinkMessageType.AuxiliaryStatus), + ) + + # --- discovery: UploadNames is the READ request; DownloadNames is the + # WRITE direction (panel <- client). Reply payload is NameData with the + # next defined object's number + name. + # Per clsOL2MsgUploadNames: [type, num_hi, num_lo, relative_direction]. + # type: 1=Zone 2=Unit 3=Button 4=Code 5=Thermostat 6=Area 7=Message + # relative_direction: +1=next after num, -1=prev before num, 0=exact + for type_byte, type_name in [(1, "Zone"), (2, "Unit"), (5, "Thermostat"), (6, "Area")]: + await _request_or_warn( + conn, + f"UploadNames[type={type_name}, after=0, dir=+1]", + OmniLinkMessageType.UploadNames, + payload=bytes([type_byte, 0, 0, 1]), + expected_opcode=int(OmniLinkMessageType.NameData), + ) + + print("\n--- recon complete, session closed cleanly ---") + return 0 + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--host", default="192.168.1.9") + parser.add_argument("--port", type=int, default=4369) + parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key") + parser.add_argument("--debug", action="store_true") + args = parser.parse_args() + logging.basicConfig( + level=logging.DEBUG if args.debug else logging.WARNING, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + return asyncio.run(amain(args)) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/dev/probe_v1_stream.py b/dev/probe_v1_stream.py new file mode 100644 index 0000000..dd51a45 --- /dev/null +++ b/dev/probe_v1_stream.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +"""Probe the v1 UploadNames streaming flow. + +Sends UploadNames (no payload), then a series of Acknowledge messages, +dumping each reply until we get an EOD or 30 records (whichever comes +first). Confirms the lock-step pattern PC Access uses for bulk reads. +""" + +from __future__ import annotations + +import argparse +import asyncio +import logging +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) +from probe_v1 import _load_key # type: ignore # noqa: E402 + +from omni_pca.opcodes import OmniLinkMessageType +from omni_pca.v1 import OmniConnectionV1 + + +_NAME_TYPE_LABELS = { + 1: "Zone", 2: "Unit", 3: "Button", 4: "Code", + 5: "Thermostat", 6: "Area", 7: "Message", +} + + +def _decode_namedata(payload: bytes) -> str: + """Best-effort decode of a NameData payload for display.""" + if len(payload) < 3: + return f"" + name_type = payload[0] + # Heuristic: zones/messages are 15-char names, others 12. With one-byte + # NameNumber, payload length = 1 (type) + 1 (num) + L (name) + 1 (term). + # With two-byte NameNumber: 1 + 2 + L + 1. + L_15 = 15 + 3 # one-byte form, 15-char name + L_12 = 12 + 3 # one-byte form, 12-char name + if len(payload) == L_15 or len(payload) == L_15 + 1: + # 15-char name (Zone or Message), one-byte num. + num = payload[1] + name = payload[2:2 + 15].rstrip(b"\x00").decode("utf-8", errors="replace") + elif len(payload) == L_12 or len(payload) == L_12 + 1: + # 12-char name, one-byte num. + num = payload[1] + name = payload[2:2 + 12].rstrip(b"\x00").decode("utf-8", errors="replace") + else: + # Two-byte num form (NameNumber > 255): payload[1..2] = BE u16, then name. + num = (payload[1] << 8) | payload[2] + name = payload[3:].rstrip(b"\x00").decode("utf-8", errors="replace") + + label = _NAME_TYPE_LABELS.get(name_type, f"type{name_type}") + return f"{label} #{num}: {name!r}" + + +async def amain(args: argparse.Namespace) -> int: + key = _load_key(args.key) + print(f"[stream probe] target {args.host}:{args.port}\n") + + async with OmniConnectionV1( + host=args.host, port=args.port, controller_key=key, timeout=4.0 + ) as conn: + from omni_pca.message import Message, START_CHAR_V1_UNADDRESSED + + # Step 1: bare UploadNames. + upload = Message( + start_char=START_CHAR_V1_UNADDRESSED, + data=bytes([int(OmniLinkMessageType.UploadNames)]), + ) + seq, fut = conn._send_encrypted(upload) + reply = conn._decode_inner(await fut) + print(f"reply 1 (seq={seq}): opcode={reply.opcode} {_decode_namedata(reply.payload) if reply.opcode == int(OmniLinkMessageType.NameData) else f'(payload={reply.payload.hex()!r})'}") + + if reply.opcode != int(OmniLinkMessageType.NameData): + print("panel didn't reply with NameData — streaming flow may not apply here") + return 0 + + # Step 2..N: Acknowledge → next NameData (or EOD). + for i in range(2, args.max + 1): + ack = Message( + start_char=START_CHAR_V1_UNADDRESSED, + data=bytes([int(OmniLinkMessageType.Ack)]), + ) + seq, fut = conn._send_encrypted(ack) + reply = conn._decode_inner(await fut) + + if reply.opcode == int(OmniLinkMessageType.EOD): + print(f"reply {i} (seq={seq}): EOD — end of stream after {i-1} records") + return 0 + if reply.opcode == int(OmniLinkMessageType.NameData): + print(f"reply {i} (seq={seq}): {_decode_namedata(reply.payload)}") + else: + print(f"reply {i} (seq={seq}): unexpected opcode {reply.opcode}, " + f"payload={reply.payload.hex()}") + return 1 + + print(f"\nstopped after {args.max} replies (no EOD seen)") + return 0 + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--host", default="192.168.1.9") + parser.add_argument("--port", type=int, default=4369) + parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key") + parser.add_argument("--max", type=int, default=20, help="stop after N replies") + parser.add_argument("--debug", action="store_true") + args = parser.parse_args() + logging.basicConfig( + level=logging.DEBUG if args.debug else logging.WARNING, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + return asyncio.run(amain(args)) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/dev/probe_v1_write.py b/dev/probe_v1_write.py new file mode 100644 index 0000000..ee1042d --- /dev/null +++ b/dev/probe_v1_write.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +"""Phase-2c live write smoke test: round-trip a no-op unit command. + +Reads the current state of one unit, sends a command that should yield +the same observable result, then re-reads to confirm. Proves that +:meth:`OmniClientV1.execute_command` actually flows through the v1 +Command opcode against the real panel without changing anything visible. + +Run: + cd /home/kdm/home-auto/omni-pca + uv run python dev/probe_v1_write.py [--index N] + +Default target is unit #4 ('STAIRS' per current panel config). +""" + +from __future__ import annotations + +import argparse +import asyncio +import logging +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) +from probe_v1 import _load_key # type: ignore # noqa: E402 + +from omni_pca.v1 import OmniClientV1 + + +async def amain(args: argparse.Namespace) -> int: + key = _load_key(args.key) + print(f"[write probe] target {args.host}:{args.port} unit #{args.index}\n") + + async with OmniClientV1( + host=args.host, port=args.port, controller_key=key, timeout=4.0 + ) as c: + before = (await c.get_unit_status(args.index, args.index))[args.index] + print(f"BEFORE: state=0x{before.state:02x} " + f"brightness={before.brightness!r} " + f"time_remaining={before.time_remaining_secs}s") + + # Pick the safest no-op command for the unit's current state: + # - state == 0 → send UNIT_OFF (already off, panel acks) + # - state == 1 → send UNIT_ON (already on, panel acks) + # - 100 <= state <= 200 → set_unit_level(percent) at the current level + # - otherwise (scene/dim/etc.) → fall back to UNIT_ON which is harmless + if before.state == 0: + print("ACTION: turn_unit_off (already off, expecting Ack)") + await c.turn_unit_off(args.index) + elif before.state == 1: + print("ACTION: turn_unit_on (already on, expecting Ack)") + await c.turn_unit_on(args.index) + elif 100 <= before.state <= 200: + level = before.state - 100 + print(f"ACTION: set_unit_level({level}%) (already at this level)") + await c.set_unit_level(args.index, level) + else: + print(f"ACTION: turn_unit_on (state=0x{before.state:02x} is exotic; safe ack expected)") + await c.turn_unit_on(args.index) + + # Give the panel ~250ms to settle if it does pulse anything. + await asyncio.sleep(0.25) + + after = (await c.get_unit_status(args.index, args.index))[args.index] + print(f"AFTER: state=0x{after.state:02x} " + f"brightness={after.brightness!r} " + f"time_remaining={after.time_remaining_secs}s") + + if after.state == before.state: + print("\n✓ panel acked the Command, state unchanged — wire path verified") + else: + print(f"\n⚠ state changed (0x{before.state:02x} → 0x{after.state:02x}). " + "Probably harmless but worth investigating.") + return 0 + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--host", default="192.168.1.9") + parser.add_argument("--port", type=int, default=4369) + parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key") + parser.add_argument("--index", type=int, default=4) + parser.add_argument("--debug", action="store_true") + args = parser.parse_args() + logging.basicConfig( + level=logging.DEBUG if args.debug else logging.WARNING, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + return asyncio.run(amain(args)) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/omni_pca/message.py b/src/omni_pca/message.py index b28c6d3..af5d8df 100644 --- a/src/omni_pca/message.py +++ b/src/omni_pca/message.py @@ -3,7 +3,7 @@ Wire layout (non-addressable): ``[start_char][length][...data...][crc_lo][crc_hi]`` -For v1 addressable messages (StartChar=0x5A) a single SerialAddress byte +For v1 addressable messages (StartChar=0x41) a single SerialAddress byte is interleaved between start_char and length. CRC is CRC-16/MODBUS (poly 0xA001, init 0, reflected) computed over the @@ -13,6 +13,8 @@ on the wire (CRC1 = low byte, CRC2 = high byte). References: clsOmniLinkMessage.cs (lines 9, 164-186, 273-289) — frame + CRC clsOmniLink2Message.cs (lines 17-23) — v2 StartChar = 0x21 + enuOmniLinkMessageFormat.cs — Addressable=0x41, NonAddressable=0x5A, + OmniLink2=0x21 clsOL2MsgLogin.cs / clsOLMsgLogin.cs — example payloads """ @@ -23,8 +25,8 @@ from dataclasses import dataclass, field from .opcodes import OmniLink2MessageType, OmniLinkMessageType START_CHAR_V2 = 0x21 -START_CHAR_V1_UNADDRESSED = 0x41 -START_CHAR_V1_ADDRESSABLE = 0x5A +START_CHAR_V1_ADDRESSABLE = 0x41 +START_CHAR_V1_UNADDRESSED = 0x5A _CRC_POLY_REFLECTED = 0xA001 diff --git a/src/omni_pca/v1/__init__.py b/src/omni_pca/v1/__init__.py new file mode 100644 index 0000000..c6eb7a1 --- /dev/null +++ b/src/omni_pca/v1/__init__.py @@ -0,0 +1,50 @@ +"""V1 (legacy) Omni-Link protocol over UDP. + +The v2 path in :mod:`omni_pca` (TCP, OmniLink2Message, StartChar 0x21, +parameterised RequestProperties / RequestExtendedStatus) is what most +modern firmware speaks. This subpackage exists because some panels are +configured at the network module to listen on **UDP only**, in which case +PC Access falls back to the v1 wire protocol (typed RequestZoneStatus, +RequestUnitStatus, etc., StartChar 0x5A, OmniLinkMessage outer = 0x10). + +Reference: clsOmniLinkConnection.cs:353-360 (ConnectionProtocol() returns +V1 for Modem/UDP/Serial, V2 only for TCP). +""" + +from __future__ import annotations + +from .client import OmniClientV1, OmniNakError, OmniProtocolError +from .connection import ( + HandshakeError, + InvalidEncryptionKeyError, + OmniConnectionV1, + RequestTimeoutError, +) +from .messages import ( + NameRecord, + NameType, + parse_v1_aux_status, + parse_v1_namedata, + parse_v1_system_status, + parse_v1_thermostat_status, + parse_v1_unit_status, + parse_v1_zone_status, +) + +__all__ = [ + "HandshakeError", + "InvalidEncryptionKeyError", + "NameRecord", + "NameType", + "OmniClientV1", + "OmniConnectionV1", + "OmniNakError", + "OmniProtocolError", + "RequestTimeoutError", + "parse_v1_aux_status", + "parse_v1_namedata", + "parse_v1_system_status", + "parse_v1_thermostat_status", + "parse_v1_unit_status", + "parse_v1_zone_status", +] diff --git a/src/omni_pca/v1/client.py b/src/omni_pca/v1/client.py new file mode 100644 index 0000000..250c6b4 --- /dev/null +++ b/src/omni_pca/v1/client.py @@ -0,0 +1,452 @@ +"""High-level read-only client for v1-over-UDP Omni-Link panels. + +Mirrors the v2 :class:`omni_pca.client.OmniClient` API where the v1 wire +protocol can satisfy the same call. Methods that require v2-only opcodes +(e.g. ``RequestProperties``, ``AcknowledgeAlerts``) are intentionally +absent until Phase 2b/2c add their v1 equivalents (streaming +``UploadNames``, no-op or alternate dispatch). + +API parity goals (this module): + get_system_information() — same dataclass as v2 + get_system_status() — same dataclass as v2 + get_zone_status(start, end) -> dict — uses v1 ZoneStatus + get_unit_status(start, end) -> dict — uses v1 UnitStatus + get_thermostat_status(start, end) -> dict — uses v1 ThermostatStatus + get_aux_status(start, end) -> dict — uses v1 AuxiliaryStatus +""" + +from __future__ import annotations + +import struct +from collections.abc import AsyncIterator, Callable +from typing import Self + +from ..commands import Command, CommandFailedError, SecurityCommandResponse +from ..models import ( + AuxSensorStatus, + SecurityMode, + SystemInformation, + SystemStatus, + ThermostatStatus, + UnitStatus, + ZoneStatus, +) +from ..opcodes import OmniLinkMessageType +from .connection import OmniConnectionV1 +from .messages import ( + NameRecord, + NameType, + parse_v1_aux_status, + parse_v1_namedata, + parse_v1_system_status, + parse_v1_thermostat_status, + parse_v1_unit_status, + parse_v1_zone_status, +) + +_DEFAULT_PORT = 4369 + + +class OmniClientV1: + """Read-only v1-over-UDP Omni-Link client. + + .. code-block:: python + + async with OmniClientV1("192.168.1.9", controller_key=key) as c: + info = await c.get_system_information() + zones = await c.get_zone_status(1, 16) + """ + + def __init__( + self, + host: str, + port: int = _DEFAULT_PORT, + controller_key: bytes = b"", + timeout: float = 5.0, + retry_count: int = 3, + ) -> None: + self._conn = OmniConnectionV1( + host=host, + port=port, + controller_key=controller_key, + timeout=timeout, + retry_count=retry_count, + ) + + @property + def connection(self) -> OmniConnectionV1: + return self._conn + + async def __aenter__(self) -> Self: + await self._conn.connect() + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + await self._conn.close() + + # ---- panel-wide ---------------------------------------------------- + + async def get_system_information(self) -> SystemInformation: + """Fetch model + firmware + dialer phone number. + + Wire format identical to v2 (verified per + clsOLMsgSystemInformation.cs vs clsOL2MsgSystemInformation.cs); + we reuse the existing dataclass parser unchanged. + """ + reply = await self._conn.request( + OmniLinkMessageType.RequestSystemInformation + ) + self._expect(reply.opcode, OmniLinkMessageType.SystemInformation) + return SystemInformation.parse(reply.payload) + + async def get_system_status(self) -> SystemStatus: + """Fetch panel time, sunrise/sunset, battery reading, area modes.""" + reply = await self._conn.request( + OmniLinkMessageType.RequestSystemStatus + ) + self._expect(reply.opcode, OmniLinkMessageType.SystemStatus) + return parse_v1_system_status(reply.payload) + + # ---- bulk per-object status ---------------------------------------- + + async def get_zone_status( + self, start: int, end: int + ) -> dict[int, ZoneStatus]: + return await self._range_status( + OmniLinkMessageType.RequestZoneStatus, + OmniLinkMessageType.ZoneStatus, + start, + end, + parse_v1_zone_status, + ) + + async def get_unit_status( + self, start: int, end: int + ) -> dict[int, UnitStatus]: + return await self._range_status( + OmniLinkMessageType.RequestUnitStatus, + OmniLinkMessageType.UnitStatus, + start, + end, + parse_v1_unit_status, + ) + + async def get_thermostat_status( + self, start: int, end: int + ) -> dict[int, ThermostatStatus]: + return await self._range_status( + OmniLinkMessageType.RequestThermostatStatus, + OmniLinkMessageType.ThermostatStatus, + start, + end, + parse_v1_thermostat_status, + ) + + async def get_aux_status( + self, start: int, end: int + ) -> dict[int, AuxSensorStatus]: + return await self._range_status( + OmniLinkMessageType.RequestAuxiliaryStatus, + OmniLinkMessageType.AuxiliaryStatus, + start, + end, + parse_v1_aux_status, + ) + + # ---- discovery (streaming UploadNames) ------------------------------ + + async def iter_names(self) -> AsyncIterator[NameRecord]: + """Stream every defined name from the panel. + + v1 has no per-type name request — a bare ``UploadNames`` triggers + the panel to dump *all* defined names of *all* types in a fixed + order (Zone, Unit, Button, Code, Area, Thermostat, Message, …), + each as a separate ``NameData`` reply that the client must + ``Acknowledge`` to advance. This iterator handles the lock-step + protocol and yields each record as it arrives. + + Reference: clsHAC.cs:4418 (sends bare UploadNames), + OL1ReadConfigHandleResponse (loops over NameData/EOD). + """ + async for reply in self._conn.iter_streaming( + OmniLinkMessageType.UploadNames + ): + if reply.opcode != int(OmniLinkMessageType.NameData): + # Defensive — iter_streaming normally only yields + # non-EOD/NAK replies, so this is a wire-format fault. + raise OmniProtocolError( + f"unexpected opcode {reply.opcode} during UploadNames stream " + f"(expected {int(OmniLinkMessageType.NameData)})" + ) + yield parse_v1_namedata(reply.payload) + + async def list_all_names(self) -> dict[int, dict[int, str]]: + """Bucket every defined name by ``NameType``. + + Returns ``{name_type: {object_number: name}}``. Useful when HA + needs all four (zones+units+areas+thermostats) in one pass — + cheaper than four separate streams since the panel only supports + one streaming session at a time anyway. + """ + out: dict[int, dict[int, str]] = {} + async for rec in self.iter_names(): + out.setdefault(rec.name_type, {})[rec.number] = rec.name + return out + + async def list_zone_names(self) -> dict[int, str]: + return (await self.list_all_names()).get(int(NameType.ZONE), {}) + + async def list_unit_names(self) -> dict[int, str]: + return (await self.list_all_names()).get(int(NameType.UNIT), {}) + + async def list_area_names(self) -> dict[int, str]: + return (await self.list_all_names()).get(int(NameType.AREA), {}) + + async def list_thermostat_names(self) -> dict[int, str]: + return (await self.list_all_names()).get(int(NameType.THERMOSTAT), {}) + + async def list_button_names(self) -> dict[int, str]: + return (await self.list_all_names()).get(int(NameType.BUTTON), {}) + + # ---- write methods (Command + ExecuteSecurityCommand) ---------------- + # + # The Command and ExecuteSecurityCommand payloads are byte-identical + # between v1 and v2 — only the outer opcode differs (15 vs 20 for + # Command, 102 vs 74 for ExecuteSecurityCommand). So these methods are + # near-duplicates of OmniClient's, just routed through the v1 opcodes. + # Reference: clsOLMsgCommand.cs, clsOLMsgExecuteSecurityCommand.cs. + + async def execute_command( + self, + command: Command, + parameter1: int = 0, + parameter2: int = 0, + ) -> None: + """Send a generic Command (v1 opcode 15). + + Wire payload (4 bytes, identical to v2 form): + [0] command byte (enuUnitCommand value) + [1] parameter1 (single byte; brightness, mode, code index, ...) + [2] parameter2 high byte (BE u16) + [3] parameter2 low byte (object number for nearly every command) + + Panel acks with v1 Ack (opcode 5) on success, Nak (6) on failure. + """ + if not 0 <= parameter1 <= 0xFF: + raise ValueError(f"parameter1 must fit in a byte: {parameter1}") + if not 0 <= parameter2 <= 0xFFFF: + raise ValueError(f"parameter2 must fit in u16: {parameter2}") + payload = struct.pack( + ">BBH", int(command), parameter1 & 0xFF, parameter2 & 0xFFFF + ) + reply = await self._conn.request(OmniLinkMessageType.Command, payload) + if reply.opcode == int(OmniLinkMessageType.Nak): + raise CommandFailedError( + f"panel NAK'd Command {command.name} " + f"(p1={parameter1}, p2={parameter2})" + ) + if reply.opcode != int(OmniLinkMessageType.Ack): + raise CommandFailedError( + f"unexpected reply to Command {command.name}: opcode={reply.opcode}" + ) + + async def execute_security_command( + self, + area: int, + mode: SecurityMode, + code: int, + ) -> None: + """Arm or disarm a security area (v1 opcode 102). + + Wire payload (6 bytes, identical to v2 form — clsOLMsgExecuteSecurityCommand.cs): + [0] area number (1-based) + [1] security mode byte (raw enuSecurityMode 0..7) + [2] code digit 1 (thousands) + [3] code digit 2 (hundreds) + [4] code digit 3 (tens) + [5] code digit 4 (ones) + + Panel responds with: + * ``ExecuteSecurityCommandResponse`` (103) carrying a status byte + (0 = success, see :class:`SecurityCommandResponse` for others), or + * ``Ack`` (5) on success without structured response, or + * ``Nak`` (6) on flat-out refusal. + + Raises: + ValueError: ``area`` not 1..255 or ``code`` not 0..9999. + CommandFailedError: panel Nak'd OR response status was non-zero; + ``failure_code`` carries the raw status byte when present. + """ + if not 1 <= area <= 0xFF: + raise ValueError(f"area out of range: {area}") + if not 0 <= code <= 9999: + raise ValueError(f"code out of range (0000-9999): {code}") + d1 = (code // 1000) % 10 + d2 = (code // 100) % 10 + d3 = (code // 10) % 10 + d4 = code % 10 + payload = bytes([area & 0xFF, int(mode) & 0xFF, d1, d2, d3, d4]) + reply = await self._conn.request( + OmniLinkMessageType.ExecuteSecurityCommand, payload + ) + if reply.opcode == int(OmniLinkMessageType.Nak): + raise CommandFailedError( + f"panel NAK'd ExecuteSecurityCommand " + f"(area={area}, mode={mode.name})" + ) + if reply.opcode == int(OmniLinkMessageType.ExecuteSecurityCommandResponse): + if not reply.payload: + raise CommandFailedError( + "ExecuteSecurityCommandResponse with empty payload" + ) + status = reply.payload[0] + if status != int(SecurityCommandResponse.SUCCESS): + try: + label = SecurityCommandResponse(status).name + except ValueError: + label = f"unknown({status})" + raise CommandFailedError( + f"ExecuteSecurityCommand failed: {label}", + failure_code=status, + ) + return + if reply.opcode == int(OmniLinkMessageType.Ack): + return + raise CommandFailedError( + f"unexpected reply to ExecuteSecurityCommand: opcode={reply.opcode}" + ) + + async def acknowledge_alerts(self) -> None: + """V1 has no AcknowledgeAlerts opcode — silently no-op. + + v2 introduced :attr:`OmniLink2MessageType.AcknowledgeAlerts` (60) + as a dedicated panel-wide ack; v1 panels expect alerts to be + cleared by per-area arming or by user action at the keypad. To + keep the v1↔v2 method shape parallel, this method is a no-op so + HA service callers don't need a per-transport branch. + """ + return + + # ---- thin command wrappers (one-liner conveniences) ------------------ + + async def turn_unit_on(self, index: int) -> None: + await self.execute_command(Command.UNIT_ON, parameter2=index) + + async def turn_unit_off(self, index: int) -> None: + await self.execute_command(Command.UNIT_OFF, parameter2=index) + + async def set_unit_level(self, index: int, percent: int) -> None: + if not 0 <= percent <= 100: + raise ValueError(f"percent must be 0..100: {percent}") + await self.execute_command( + Command.UNIT_LEVEL, parameter1=percent, parameter2=index + ) + + async def bypass_zone(self, index: int, code: int = 0) -> None: + await self.execute_command( + Command.BYPASS_ZONE, parameter1=code, parameter2=index + ) + + async def restore_zone(self, index: int, code: int = 0) -> None: + await self.execute_command( + Command.RESTORE_ZONE, parameter1=code, parameter2=index + ) + + async def execute_button(self, index: int) -> None: + await self.execute_command(Command.EXECUTE_BUTTON, parameter2=index) + + async def set_thermostat_system_mode(self, index: int, mode_value: int) -> None: + if not 0 <= mode_value <= 0xFF: + raise ValueError(f"mode value must fit in a byte: {mode_value}") + await self.execute_command( + Command.SET_THERMOSTAT_SYSTEM_MODE, + parameter1=mode_value, + parameter2=index, + ) + + async def set_thermostat_fan_mode(self, index: int, mode_value: int) -> None: + await self.execute_command( + Command.SET_THERMOSTAT_FAN_MODE, + parameter1=mode_value, + parameter2=index, + ) + + async def set_thermostat_hold_mode(self, index: int, mode_value: int) -> None: + await self.execute_command( + Command.SET_THERMOSTAT_HOLD_MODE, + parameter1=mode_value, + parameter2=index, + ) + + async def set_thermostat_heat_setpoint_raw( + self, index: int, raw_temp: int + ) -> None: + """Set the heat setpoint by raw byte value (Omni temperature scale). + + Use the same :func:`omni_temp_to_celsius` family of helpers from + :mod:`omni_pca.models` to convert from °C/°F if needed. + """ + if not 0 <= raw_temp <= 0xFF: + raise ValueError(f"raw_temp must fit in a byte: {raw_temp}") + await self.execute_command( + Command.SET_THERMOSTAT_HEAT_SETPOINT, + parameter1=raw_temp, + parameter2=index, + ) + + async def set_thermostat_cool_setpoint_raw( + self, index: int, raw_temp: int + ) -> None: + if not 0 <= raw_temp <= 0xFF: + raise ValueError(f"raw_temp must fit in a byte: {raw_temp}") + await self.execute_command( + Command.SET_THERMOSTAT_COOL_SETPOINT, + parameter1=raw_temp, + parameter2=index, + ) + + # ---- helpers -------------------------------------------------------- + + async def _range_status[T]( + self, + request_op: OmniLinkMessageType, + reply_op: OmniLinkMessageType, + start: int, + end: int, + parser: Callable[[bytes, int], list[T]], + ) -> dict[int, T]: + if not 1 <= start <= end <= 0xFF: + raise ValueError( + f"invalid range: start={start}, end={end} (must be 1..255 with start<=end)" + ) + payload = bytes([start, end]) + reply = await self._conn.request(request_op, payload) + self._expect(reply.opcode, reply_op) + records = parser(reply.payload, start) + return {r.index: r for r in records} # type: ignore[attr-defined] + + @staticmethod + def _expect(actual: int, expected: OmniLinkMessageType) -> None: + if actual == int(OmniLinkMessageType.Nak): + raise OmniNakError( + f"panel NAK'd request expecting opcode {int(expected)} " + f"({expected.name})" + ) + if actual != int(expected): + raise OmniProtocolError( + f"unexpected reply opcode {actual}, want {int(expected)} " + f"({expected.name})" + ) + + +class OmniNakError(RuntimeError): + """Panel returned the v1 Nak opcode (6) instead of the expected reply. + + Thrown when a feature the panel doesn't support is requested — e.g. + ``RequestZoneExtendedStatus`` on firmware 2.12 NAKs because only the + non-extended ``RequestZoneStatus`` is supported. + """ + + +class OmniProtocolError(RuntimeError): + """Panel returned a reply opcode neither matching nor a NAK.""" diff --git a/src/omni_pca/v1/connection.py b/src/omni_pca/v1/connection.py new file mode 100644 index 0000000..bc78449 --- /dev/null +++ b/src/omni_pca/v1/connection.py @@ -0,0 +1,522 @@ +"""Async UDP connection to an Omni-Link controller speaking the v1 wire protocol. + +Differs from :class:`omni_pca.connection.OmniConnection` in three ways: + +1. **Transport**: UDP only. Each datagram carries exactly one outer Packet. +2. **Outer packet type for messages**: ``OmniLinkMessage`` (0x10), not + ``OmniLink2Message`` (0x20). The 4-step handshake packets are identical. +3. **Inner message format**: v1 ``Message`` with ``StartChar = 0x5A`` + (NonAddressable) carrying a v1 opcode, not the v2 ``StartChar = 0x21`` + carrying a v2 opcode. + +The handshake itself (ClientRequestNewSession → ControllerAckNewSession → +ClientRequestSecureSession → ControllerAckSecureSession) and the AES-128 +session key derivation are protocol-agnostic and we reuse the same crypto +primitives. + +Reference: clsOmniLinkConnection.cs (UDP path): + udpConnect lines 1239-1295 open + queue ClientRequestNewSession + udpListen lines 1298-1399 receive loop, dispatches replies + udpHandleRequestNewSession lines 1401-1459 step 2 → step 3 + udpHandleRequestSecureSession lines 1461-1487 step 4 → OnlineSecure + udpSend lines 1514-1560 outer PacketType = OmniLinkMessage (16) + EncryptPacket lines 372-401 same crypto as TCP +""" + +from __future__ import annotations + +import asyncio +import contextlib +import logging +from collections.abc import AsyncIterator +from enum import IntEnum +from types import TracebackType + +from ..crypto import ( + BLOCK_SIZE, + decrypt_message_payload, + derive_session_key, + encrypt_message_payload, +) +from ..message import ( + START_CHAR_V1_UNADDRESSED, + Message, + MessageCrcError, +) +from ..opcodes import OmniLinkMessageType, PacketType +from ..packet import Packet + +_log = logging.getLogger(__name__) + +_DEFAULT_PORT = 4369 +_SESSION_ID_LEN = 5 +_PROTO_VERSION = (0x00, 0x01) +_MAX_SEQ = 0xFFFF + + +class ConnectionState(IntEnum): + DISCONNECTED = 0 + CONNECTING = 1 + NEW_SESSION = 2 + SECURE = 3 + ONLINE = 4 + + +class ConnectionError(OSError): # noqa: A001 - intentional shadow at module scope + pass + + +class HandshakeError(ConnectionError): + pass + + +class InvalidEncryptionKeyError(HandshakeError): + """Controller answered ``ControllerSessionTerminated`` during handshake.""" + + +class ProtocolError(ValueError): + pass + + +class RequestTimeoutError(TimeoutError): + pass + + +class OmniConnectionV1: + """UDP + v1-wire-format connection to an Omni-Link controller.""" + + def __init__( + self, + host: str, + port: int = _DEFAULT_PORT, + controller_key: bytes = b"", + timeout: float = 5.0, + retry_count: int = 3, + ) -> None: + if len(controller_key) != 16: + raise ValueError( + f"controller_key must be 16 bytes, got {len(controller_key)}" + ) + self._host = host + self._port = port + self._controller_key = bytes(controller_key) + self._default_timeout = timeout + self._retry_count = max(0, retry_count) + + self._udp_transport: asyncio.DatagramTransport | None = None + self._udp_protocol: _OmniDatagramProtocol | None = None + + self._state = ConnectionState.DISCONNECTED + self._session_id: bytes | None = None + self._session_key: bytes | None = None + + # First wire packet uses seq=1; wraparound skips 0 (reserved for + # unsolicited inbound). See clsOmniLinkConnection.cs:1251 (UDP + # init pktSequence=1, then udpSend pre-increments). + self._next_seq: int = 1 + + self._pending: dict[int, asyncio.Future[Packet]] = {} + self._unsolicited_queue: asyncio.Queue[Message] = asyncio.Queue() + + self._handshake_event: asyncio.Event = asyncio.Event() + self._handshake_packet: Packet | None = None + self._handshake_error: Exception | None = None + + self._closed = False + + @property + def state(self) -> ConnectionState: + return self._state + + @property + def session_key(self) -> bytes | None: + return self._session_key + + async def __aenter__(self) -> OmniConnectionV1: + await self.connect() + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + tb: TracebackType | None, + ) -> None: + await self.close() + + async def connect(self) -> None: + if self._state is not ConnectionState.DISCONNECTED: + raise ConnectionError( + f"already connecting/connected (state={self._state})" + ) + self._state = ConnectionState.CONNECTING + try: + loop = asyncio.get_running_loop() + self._udp_transport, self._udp_protocol = ( + await loop.create_datagram_endpoint( + lambda: _OmniDatagramProtocol(self), + remote_addr=(self._host, self._port), + ) + ) + except (TimeoutError, OSError) as exc: + self._state = ConnectionState.DISCONNECTED + raise ConnectionError(f"failed to open UDP socket: {exc}") from exc + + try: + await self._do_handshake() + except BaseException: + await self.close() + raise + + async def close(self) -> None: + """Tear down. Politely terminate the panel session first. + + Without ClientSessionTerminated the panel keeps our slot allocated + until its idle timeout — and rejects subsequent connect attempts + with ControllerCannotStartNewSession (0x07). + """ + if self._closed: + return + self._closed = True + previous_state = self._state + self._state = ConnectionState.DISCONNECTED + + if previous_state in ( + ConnectionState.NEW_SESSION, + ConnectionState.SECURE, + ConnectionState.ONLINE, + ): + try: + term = Packet( + seq=self._claim_seq(), + type=PacketType.ClientSessionTerminated, + data=b"", + ) + self._write_packet(term) + except Exception as exc: # noqa: BLE001 - close() must be idempotent + _log.debug("close: failed to send ClientSessionTerminated: %s", exc) + + for fut in self._pending.values(): + if not fut.done(): + fut.set_exception(ConnectionError("connection closed")) + self._pending.clear() + + if self._udp_transport is not None: + with contextlib.suppress(OSError): + self._udp_transport.close() + self._udp_transport = None + self._udp_protocol = None + + # ---- public request API --------------------------------------------- + + async def request( + self, + opcode: OmniLinkMessageType | int, + payload: bytes = b"", + timeout: float | None = None, + ) -> Message: + """Send a v1 request, await the matching reply, return the inner Message.""" + if self._state is not ConnectionState.ONLINE: + raise ConnectionError( + f"cannot send request, connection state={self._state.name}" + ) + message = Message( + start_char=START_CHAR_V1_UNADDRESSED, + data=bytes([int(opcode)]) + payload, + ) + per_attempt_timeout = timeout if timeout is not None else self._default_timeout + max_attempts = 1 + self._retry_count + last_exc: Exception | None = None + + for attempt in range(1, max_attempts + 1): + seq, fut = self._send_encrypted(message) + try: + reply_packet = await asyncio.wait_for(fut, per_attempt_timeout) + except TimeoutError as exc: + last_exc = exc + self._pending.pop(seq, None) + if attempt < max_attempts: + _log.debug( + "udp v1 retry %d/%d on opcode=%d seq=%d", + attempt, max_attempts, int(opcode), seq, + ) + continue + raise RequestTimeoutError( + f"no v1 reply for opcode={int(opcode)} " + f"after {max_attempts} attempt(s)" + ) from last_exc + return self._decode_inner(reply_packet) + raise RequestTimeoutError( + f"request loop exited without reply for opcode={int(opcode)}" + ) + + async def iter_streaming( + self, + initial_op: OmniLinkMessageType | int, + *, + ack_op: OmniLinkMessageType | int = OmniLinkMessageType.Ack, + end_op: OmniLinkMessageType | int = OmniLinkMessageType.EOD, + nak_op: OmniLinkMessageType | int = OmniLinkMessageType.Nak, + timeout: float | None = None, + ) -> AsyncIterator[Message]: + """Drive a v1 lock-step streaming download (UploadNames / UploadSetup / etc). + + Sends ``initial_op`` (no payload), yields each ``ack_op``-elicited + reply, and stops when the panel sends ``end_op``. ``nak_op`` is + treated as an immediate end-of-stream — no exception (some + firmwares use NAK to signal "no records to upload"). + + Unlike :meth:`request` we don't retry on timeout — losing a + reply mid-stream desynchronises the conversation, so the right + answer is to surface the timeout and let the caller restart. + """ + if self._state is not ConnectionState.ONLINE: + raise ConnectionError( + f"cannot stream, connection state={self._state.name}" + ) + per_reply_timeout = timeout if timeout is not None else self._default_timeout + + # Step 1: send the initial bare-opcode request, wait for first reply. + first_msg = Message( + start_char=START_CHAR_V1_UNADDRESSED, + data=bytes([int(initial_op)]), + ) + seq, fut = self._send_encrypted(first_msg) + try: + reply_pkt = await asyncio.wait_for(fut, per_reply_timeout) + except TimeoutError as exc: + self._pending.pop(seq, None) + raise RequestTimeoutError( + f"no first reply to streaming opcode={int(initial_op)}" + ) from exc + reply = self._decode_inner(reply_pkt) + + # Step 2..N: ack-and-receive until end_op or nak_op. + while True: + if reply.opcode == int(end_op) or reply.opcode == int(nak_op): + return + yield reply + + ack_msg = Message( + start_char=START_CHAR_V1_UNADDRESSED, + data=bytes([int(ack_op)]), + ) + seq, fut = self._send_encrypted(ack_msg) + try: + reply_pkt = await asyncio.wait_for(fut, per_reply_timeout) + except TimeoutError as exc: + self._pending.pop(seq, None) + raise RequestTimeoutError( + f"no reply after streaming Ack (seq={seq})" + ) from exc + reply = self._decode_inner(reply_pkt) + + def unsolicited(self) -> AsyncIterator[Message]: + queue = self._unsolicited_queue + + async def _gen() -> AsyncIterator[Message]: + while True: + yield await queue.get() + + return _gen() + + # ---- handshake ------------------------------------------------------- + + async def _do_handshake(self) -> None: + # Step 1: empty ClientRequestNewSession. + self._state = ConnectionState.NEW_SESSION + step1 = Packet( + seq=self._claim_seq(), + type=PacketType.ClientRequestNewSession, + data=b"", + ) + self._write_packet(step1) + + # Step 2: ControllerAckNewSession (carries protocol version + SessionID). + ack1 = await self._await_handshake_packet() + if ack1.type is PacketType.ControllerCannotStartNewSession: + raise HandshakeError("controller cannot start new session (busy?)") + if ack1.type is not PacketType.ControllerAckNewSession: + raise HandshakeError(f"unexpected step-2 packet type {ack1.type.name}") + if len(ack1.data) < 7: + raise HandshakeError( + f"ControllerAckNewSession payload too short: {len(ack1.data)} bytes" + ) + if (ack1.data[0], ack1.data[1]) != _PROTO_VERSION: + raise HandshakeError( + f"unsupported protocol version {ack1.data[0]:#04x}{ack1.data[1]:02x}" + ) + self._session_id = bytes(ack1.data[2 : 2 + _SESSION_ID_LEN]) + self._session_key = derive_session_key(self._controller_key, self._session_id) + + # Step 3: encrypted ClientRequestSecureSession echoing SessionID. + self._state = ConnectionState.SECURE + step3_seq = self._claim_seq() + step3_ct = encrypt_message_payload( + self._session_id, step3_seq, self._session_key + ) + step3 = Packet( + seq=step3_seq, + type=PacketType.ClientRequestSecureSession, + data=step3_ct, + ) + self._write_packet(step3) + + # Step 4: ControllerAckSecureSession (or termination). + ack2 = await self._await_handshake_packet() + if ack2.type is PacketType.ControllerSessionTerminated: + raise InvalidEncryptionKeyError( + "controller terminated session during handshake (wrong ControllerKey?)" + ) + if ack2.type is not PacketType.ControllerAckSecureSession: + raise HandshakeError( + f"unexpected step-4 packet type {ack2.type.name}" + ) + self._state = ConnectionState.ONLINE + + async def _await_handshake_packet(self) -> Packet: + try: + await asyncio.wait_for( + self._handshake_event.wait(), self._default_timeout + ) + except TimeoutError as exc: + raise HandshakeError( + "timeout waiting for controller handshake reply" + ) from exc + if self._handshake_error is not None: + err = self._handshake_error + self._handshake_error = None + raise err + pkt = self._handshake_packet + self._handshake_packet = None + self._handshake_event.clear() + if pkt is None: + raise HandshakeError("handshake event fired with no packet") + return pkt + + # ---- send / receive helpers ----------------------------------------- + + def _claim_seq(self) -> int: + seq = self._next_seq + nxt = seq + 1 + if nxt > _MAX_SEQ or nxt == 0: + nxt = 1 + self._next_seq = nxt + return seq + + def _send_encrypted( + self, inner: Message + ) -> tuple[int, asyncio.Future[Packet]]: + if self._session_key is None: + raise ConnectionError("no session key (handshake not complete)") + seq = self._claim_seq() + plaintext = inner.encode() + ciphertext = encrypt_message_payload(plaintext, seq, self._session_key) + # KEY DIFFERENCE FROM V2: outer type is OmniLinkMessage (0x10), + # not OmniLink2Message (0x20). See clsOmniLinkConnection.cs:1536. + pkt = Packet(seq=seq, type=PacketType.OmniLinkMessage, data=ciphertext) + + loop = asyncio.get_running_loop() + fut: asyncio.Future[Packet] = loop.create_future() + self._pending[seq] = fut + self._write_packet(pkt) + return seq, fut + + def _write_packet(self, pkt: Packet) -> None: + if self._udp_transport is None: + raise ConnectionError("transport not open") + wire = pkt.encode() + _log.debug( + "TX seq=%d type=%s len=%d", pkt.seq, pkt.type.name, len(pkt.data) + ) + self._udp_transport.sendto(wire) + + def _decode_inner(self, pkt: Packet) -> Message: + if self._session_key is None: + raise ConnectionError("no session key") + if not pkt.data: + raise ProtocolError("empty packet data") + plaintext = decrypt_message_payload(pkt.data, pkt.seq, self._session_key) + try: + return Message.decode(plaintext) + except MessageCrcError as exc: + raise ProtocolError(f"inner v1 message CRC mismatch: {exc}") from exc + + # ---- inbound dispatch (called from the datagram protocol) ----------- + + def _dispatch(self, pkt: Packet) -> None: + if pkt.data is None: + pkt = Packet(seq=pkt.seq, type=pkt.type, data=b"") + + if self._state in (ConnectionState.NEW_SESSION, ConnectionState.SECURE): + handshake_types = { + PacketType.ControllerAckNewSession, + PacketType.ControllerAckSecureSession, + PacketType.ControllerSessionTerminated, + PacketType.ControllerCannotStartNewSession, + } + if pkt.type in handshake_types: + self._handshake_packet = pkt + self._handshake_event.set() + return + + if pkt.seq == 0: + if pkt.type is PacketType.OmniLinkMessage: + try: + msg = self._decode_inner(pkt) + except (ProtocolError, ConnectionError) as exc: + _log.warning( + "dropping malformed unsolicited v1 packet: %s", exc + ) + return + try: + self._unsolicited_queue.put_nowait(msg) + except asyncio.QueueFull: # pragma: no cover - unbounded queue + _log.warning("unsolicited queue full; dropping message") + return + + fut = self._pending.pop(pkt.seq, None) + if fut is None: + _log.debug( + "no waiter for seq=%d type=%s; dropping", + pkt.seq, pkt.type.name, + ) + return + if pkt.type is PacketType.ControllerSessionTerminated: + fut.set_exception(ConnectionError("controller terminated session")) + return + if not fut.done(): + fut.set_result(pkt) + + +class _OmniDatagramProtocol(asyncio.DatagramProtocol): + """asyncio.DatagramProtocol bound to a single OmniConnectionV1. + + Each datagram is one complete Packet. We decode it and hand it to the + connection's dispatcher; the dispatcher already knows how to sort + handshake / solicited / unsolicited paths. + """ + + def __init__(self, conn: OmniConnectionV1) -> None: + self._conn = conn + + def connection_made(self, transport: asyncio.BaseTransport) -> None: + pass + + def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: + try: + pkt = Packet.decode(data) + except Exception as exc: + _log.warning("dropping malformed UDP datagram: %s", exc) + return + try: + self._conn._dispatch(pkt) + except Exception: + _log.exception("UDP v1 dispatch crashed for seq=%d", pkt.seq) + + def error_received(self, exc: Exception) -> None: + _log.warning("UDP v1 socket error: %s", exc) + + def connection_lost(self, exc: Exception | None) -> None: + if exc is not None: + _log.warning("UDP v1 transport lost: %s", exc) diff --git a/src/omni_pca/v1/messages.py b/src/omni_pca/v1/messages.py new file mode 100644 index 0000000..49ed236 --- /dev/null +++ b/src/omni_pca/v1/messages.py @@ -0,0 +1,310 @@ +"""V1 status-reply and name parsers. + +The v1 wire protocol's typed status messages (ZoneStatus, UnitStatus, +ThermostatStatus, AuxiliaryStatus) carry one record per object in the +range the client requested — but, unlike v2's ExtendedStatus, the records +do **not** include the object number. The starting index is implicit +from the request payload, and each record is at a fixed offset. + +This module supplies "block" parsers that take both the reply payload +and the starting index, and produce a list of the existing top-level +dataclasses (:class:`omni_pca.models.ZoneStatus` etc) so HA entity code +doesn't need a v1-specific schema. The :func:`parse_v1_namedata` helper +decodes the bulk-name-download replies streamed by ``UploadNames``. + +Per-record byte counts (verified against firmware 2.12 over UDP): + ZoneStatus 2 bytes per zone (status, analog_loop) + UnitStatus 3 bytes per unit (status, time_hi, time_lo) + ThermostatStatus 7 bytes per tstat (status, current_t, heat_sp, + cool_sp, sys_mode, fan_mode, + hold_mode) + AuxiliaryStatus 4 bytes per aux (relay, current, low_sp, + high_sp) + +References: + clsOLMsgZoneStatus.cs / clsOLMsgRequestZoneStatus.cs + clsOLMsgUnitStatus.cs / clsOLMsgRequestUnitStatus.cs + clsOLMsgThermostatStatus.cs / clsOLMsgRequestThermostatStatus.cs + clsOLMsgAuxiliaryStatus.cs / clsOLMsgRequestAuxiliaryStatus.cs + clsOLMsgSystemStatus.cs — v1 byte 14 = battery, then per-area Mode + clsOLMsgNameData.cs — bulk name download record format + enuNameType.cs — Zone=1 Unit=2 Button=3 Code=4 Area=5 + Tstat=6 Message=7 UserSetting=8 + AccessControlReader=9 +""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from enum import IntEnum + +from ..models import ( + AuxSensorStatus, + SystemStatus, + ThermostatStatus, + UnitStatus, + ZoneStatus, +) + +_ZONE_RECORD_BYTES = 2 +_UNIT_RECORD_BYTES = 3 +_THERMOSTAT_RECORD_BYTES = 7 +_AUX_RECORD_BYTES = 4 + + +def parse_v1_zone_status(payload: bytes, start_index: int) -> list[ZoneStatus]: + """Parse a v1 ZoneStatus reply payload into per-zone dataclasses. + + ``payload`` is the inner Message ``payload`` (data minus opcode byte); + its length must be a multiple of ``_ZONE_RECORD_BYTES``. + """ + if len(payload) % _ZONE_RECORD_BYTES != 0: + raise ValueError( + f"v1 ZoneStatus payload length {len(payload)} not a multiple of " + f"{_ZONE_RECORD_BYTES}" + ) + out: list[ZoneStatus] = [] + for i, off in enumerate(range(0, len(payload), _ZONE_RECORD_BYTES)): + out.append( + ZoneStatus( + index=start_index + i, + raw_status=payload[off], + loop=payload[off + 1], + ) + ) + return out + + +def parse_v1_unit_status(payload: bytes, start_index: int) -> list[UnitStatus]: + """Parse a v1 UnitStatus reply payload into per-unit dataclasses.""" + if len(payload) % _UNIT_RECORD_BYTES != 0: + raise ValueError( + f"v1 UnitStatus payload length {len(payload)} not a multiple of " + f"{_UNIT_RECORD_BYTES}" + ) + out: list[UnitStatus] = [] + for i, off in enumerate(range(0, len(payload), _UNIT_RECORD_BYTES)): + out.append( + UnitStatus( + index=start_index + i, + state=payload[off], + time_remaining_secs=(payload[off + 1] << 8) | payload[off + 2], + ) + ) + return out + + +def parse_v1_thermostat_status( + payload: bytes, start_index: int +) -> list[ThermostatStatus]: + """Parse a v1 ThermostatStatus reply payload into per-tstat dataclasses. + + The v1 record only carries 7 fields; the v2 dataclass has 4 more + (humidity, humidify_setpoint, dehumidify_setpoint, outdoor_temp, + horc_status). We zero-fill those — HA's climate platform doesn't + require them and an explicit 0 is more honest than a fake value. + """ + if len(payload) % _THERMOSTAT_RECORD_BYTES != 0: + raise ValueError( + f"v1 ThermostatStatus payload length {len(payload)} not a multiple " + f"of {_THERMOSTAT_RECORD_BYTES}" + ) + out: list[ThermostatStatus] = [] + for i, off in enumerate(range(0, len(payload), _THERMOSTAT_RECORD_BYTES)): + out.append( + ThermostatStatus( + index=start_index + i, + status=payload[off], + temperature_raw=payload[off + 1], + heat_setpoint_raw=payload[off + 2], + cool_setpoint_raw=payload[off + 3], + system_mode=payload[off + 4], + fan_mode=payload[off + 5], + hold_mode=payload[off + 6], + humidity_raw=0, + humidify_setpoint_raw=0, + dehumidify_setpoint_raw=0, + outdoor_temperature_raw=0, + horc_status=0, + ) + ) + return out + + +def parse_v1_aux_status(payload: bytes, start_index: int) -> list[AuxSensorStatus]: + """Parse a v1 AuxiliaryStatus reply payload into per-aux dataclasses.""" + if len(payload) % _AUX_RECORD_BYTES != 0: + raise ValueError( + f"v1 AuxiliaryStatus payload length {len(payload)} not a multiple " + f"of {_AUX_RECORD_BYTES}" + ) + out: list[AuxSensorStatus] = [] + for i, off in enumerate(range(0, len(payload), _AUX_RECORD_BYTES)): + out.append( + AuxSensorStatus( + index=start_index + i, + output=payload[off], + value_raw=payload[off + 1], + low_raw=payload[off + 2], + high_raw=payload[off + 3], + ) + ) + return out + + +def parse_v1_system_status(payload: bytes) -> SystemStatus: + """Parse a v1 SystemStatus reply. + + Bytes 0..13 are byte-identical to v2 (time/date + sunrise/sunset + + battery). After byte 13 v1 carries per-area Mode bytes (1 byte each) + while v2 carries 2-byte alarm-flag pairs. We translate to the v2 + dataclass's ``area_alarms`` shape by promoting each v1 mode byte to + a ``(mode, 0)`` tuple — that way HA code that already consumes + :class:`SystemStatus` keeps working without a v1-specific branch. + """ + if len(payload) < 14: + raise ValueError( + f"v1 SystemStatus payload too short: {len(payload)} bytes" + ) + time_valid = payload[0] != 0 + year = payload[1] + month = payload[2] + day = payload[3] + # day_of_week = payload[4] + hour = payload[5] + minute = payload[6] + second = payload[7] + # daylight = payload[8] + sunrise_h = payload[9] + sunrise_m = payload[10] + sunset_h = payload[11] + sunset_m = payload[12] + battery = payload[13] + + panel_time: datetime | None = None + if time_valid: + try: + panel_time = datetime( + year=2000 + year, + month=month, + day=day, + hour=hour, + minute=minute, + second=second, + ) + except ValueError: + panel_time = None + + # Promote each v1 per-area mode byte to a (mode, 0) pair so the v2 + # area_alarms tuple shape carries the same information without a + # second dataclass. + mode_bytes = payload[14:] + area_alarms = tuple((b, 0) for b in mode_bytes) + + return SystemStatus( + time_valid=time_valid, + panel_time=panel_time, + sunrise_hour=sunrise_h, + sunrise_minute=sunrise_m, + sunset_hour=sunset_h, + sunset_minute=sunset_m, + battery_reading=battery, + area_alarms=area_alarms, + ) + + +# ---- NameData -------------------------------------------------------------- + + +class NameType(IntEnum): + """Categories of named objects panels can stream over UploadNames. + + Reference: enuNameType.cs. + """ + + ZONE = 1 + UNIT = 2 + BUTTON = 3 + CODE = 4 + AREA = 5 + THERMOSTAT = 6 + MESSAGE = 7 + USER_SETTING = 8 + ACCESS_CONTROL_READER = 9 + + +# Per-type max name length (clsCapOMNI_PRO_II.cs lines 55-71). +# Other Omni models share these numbers — the few exceptions are +# documented but not relevant for the panels we know speak v1+UDP. +_NAME_TYPE_LENGTH: dict[int, int] = { + NameType.ZONE: 15, + NameType.UNIT: 12, + NameType.BUTTON: 12, + NameType.CODE: 12, + NameType.AREA: 12, + NameType.THERMOSTAT: 12, + NameType.MESSAGE: 15, + NameType.USER_SETTING: 15, + NameType.ACCESS_CONTROL_READER: 15, +} + + +@dataclass(frozen=True, slots=True) +class NameRecord: + """One name record from a v1 ``NameData`` reply (opcode 11).""" + + name_type: int + number: int + name: str + + @property + def name_type_label(self) -> str: + try: + return NameType(self.name_type).name + except ValueError: + return f"Unknown({self.name_type})" + + +def parse_v1_namedata(payload: bytes) -> NameRecord: + """Decode a v1 ``NameData`` payload (opcode 11) into a :class:`NameRecord`. + + Wire layout (per clsOLMsgNameData.cs, MessageLength is the + full Data byte count including the opcode): + + * One-byte form (NameNumber ≤ 255), MessageLength = 4 + NameTypeLen: + ``[opcode][type][num][name×L][\\0]`` — one trailing reserved byte. + * Two-byte form (NameNumber > 255), MessageLength = 5 + NameTypeLen: + ``[opcode][type][num_hi][num_lo][name×L][\\0]``. + + ``payload`` here is the *inner* :attr:`Message.payload` (data minus + the leading opcode), so the lengths to compare against are L+3 and + L+4 respectively. + """ + if len(payload) < 3: + raise ValueError(f"NameData payload too short: {len(payload)} bytes") + name_type = payload[0] + name_len = _NAME_TYPE_LENGTH.get(name_type) + + if name_len is not None: + # Disambiguate by payload length against the expected forms. + one_byte_len = name_len + 3 # type + num + name + 1 trailing + two_byte_len = name_len + 4 # type + num_hi + num_lo + name + 1 trailing + if len(payload) >= two_byte_len: + number = (payload[1] << 8) | payload[2] + name_bytes = payload[3 : 3 + name_len] + elif len(payload) >= one_byte_len: + number = payload[1] + name_bytes = payload[2 : 2 + name_len] + else: + # Short payload — best-effort one-byte decode of whatever is left. + number = payload[1] + name_bytes = payload[2:] + else: + # Unknown type — can't tell the form. Assume one-byte and consume + # the rest; HA filters by known type anyway. + number = payload[1] + name_bytes = payload[2:] + + name = name_bytes.split(b"\x00", 1)[0].decode("utf-8", errors="replace") + return NameRecord(name_type=name_type, number=number, name=name) diff --git a/tests/test_v1_client_commands.py b/tests/test_v1_client_commands.py new file mode 100644 index 0000000..1ef12be --- /dev/null +++ b/tests/test_v1_client_commands.py @@ -0,0 +1,290 @@ +"""Unit tests for the OmniClientV1 write methods. + +These exercise wire-payload construction by monkey-patching the +connection's ``request`` method so we never have to open a UDP socket. +The contract under test: + +* :meth:`OmniClientV1.execute_command` packs ``[cmd][p1][p2_hi][p2_lo]``. +* :meth:`OmniClientV1.execute_security_command` packs + ``[area][mode][d1][d2][d3][d4]`` with the C# digit-by-digit form. +* Convenience wrappers (``turn_unit_on`` etc) route through + :meth:`execute_command` with the right Command enum values. +* Replies are interpreted: Ack → return, Nak → CommandFailedError, + non-zero SecurityCommandResponse → CommandFailedError with code. +""" + +from __future__ import annotations + +import struct + +import pytest + +from omni_pca.commands import ( + Command, + CommandFailedError, + SecurityCommandResponse, +) +from omni_pca.message import START_CHAR_V1_UNADDRESSED, Message +from omni_pca.models import SecurityMode +from omni_pca.opcodes import OmniLinkMessageType +from omni_pca.v1.client import OmniClientV1 + + +class _FakeConn: + """Records each request, returns a canned reply. + + Tests construct one with a list of (opcode, payload_bytes) replies in + order; each call to :meth:`request` consumes one. + """ + + def __init__( + self, + replies: list[tuple[int, bytes]] | None = None, + ) -> None: + self.replies = replies or [] + self.calls: list[tuple[int, bytes]] = [] + + async def request( + self, + opcode: int, + payload: bytes = b"", + timeout: float | None = None, + ) -> Message: + self.calls.append((int(opcode), bytes(payload))) + if not self.replies: + # Default: panel ack — works for the boring success path. + return Message( + start_char=START_CHAR_V1_UNADDRESSED, + data=bytes([int(OmniLinkMessageType.Ack)]), + ) + reply_op, reply_payload = self.replies.pop(0) + return Message( + start_char=START_CHAR_V1_UNADDRESSED, + data=bytes([reply_op]) + reply_payload, + ) + + +def _make_client(replies: list[tuple[int, bytes]] | None = None) -> tuple[OmniClientV1, _FakeConn]: + client = OmniClientV1( + host="127.0.0.1", + controller_key=b"\x00" * 16, + ) + fake = _FakeConn(replies) + # Swap out the real connection with our recorder. + client._conn = fake # type: ignore[assignment] + return client, fake + + +# ---- execute_command --------------------------------------------------- + + +@pytest.mark.asyncio +async def test_execute_command_packs_payload_be() -> None: + client, fake = _make_client() + await client.execute_command(Command.UNIT_LEVEL, parameter1=42, parameter2=0x1234) + assert len(fake.calls) == 1 + opcode, payload = fake.calls[0] + assert opcode == int(OmniLinkMessageType.Command) + # [cmd][p1][p2_hi][p2_lo] + assert payload == struct.pack(">BBH", int(Command.UNIT_LEVEL), 42, 0x1234) + + +@pytest.mark.asyncio +async def test_execute_command_rejects_oversized_parameters() -> None: + client, _ = _make_client() + with pytest.raises(ValueError, match="parameter1"): + await client.execute_command(Command.UNIT_LEVEL, parameter1=256, parameter2=1) + with pytest.raises(ValueError, match="parameter2"): + await client.execute_command(Command.UNIT_LEVEL, parameter1=0, parameter2=0x10000) + + +@pytest.mark.asyncio +async def test_execute_command_nak_raises_command_failed() -> None: + client, _ = _make_client([(int(OmniLinkMessageType.Nak), b"")]) + with pytest.raises(CommandFailedError, match="NAK"): + await client.execute_command(Command.UNIT_ON, parameter2=5) + + +@pytest.mark.asyncio +async def test_execute_command_unexpected_reply_raises() -> None: + # Panel returns SystemInformation reply to a Command request — that's bogus. + client, _ = _make_client( + [(int(OmniLinkMessageType.SystemInformation), b"\x00")] + ) + with pytest.raises(CommandFailedError, match="unexpected reply"): + await client.execute_command(Command.UNIT_ON, parameter2=5) + + +# ---- thin wrappers ----------------------------------------------------- + + +@pytest.mark.asyncio +async def test_turn_unit_on_sends_unit_on_command() -> None: + client, fake = _make_client() + await client.turn_unit_on(7) + opcode, payload = fake.calls[0] + assert opcode == int(OmniLinkMessageType.Command) + assert payload[0] == int(Command.UNIT_ON) + assert (payload[2] << 8) | payload[3] == 7 + + +@pytest.mark.asyncio +async def test_turn_unit_off_sends_unit_off_command() -> None: + client, fake = _make_client() + await client.turn_unit_off(255) + payload = fake.calls[0][1] + assert payload[0] == int(Command.UNIT_OFF) + assert (payload[2] << 8) | payload[3] == 255 + + +@pytest.mark.asyncio +async def test_set_unit_level_packs_percent_as_p1() -> None: + client, fake = _make_client() + await client.set_unit_level(3, 75) + payload = fake.calls[0][1] + assert payload[0] == int(Command.UNIT_LEVEL) + assert payload[1] == 75 + assert (payload[2] << 8) | payload[3] == 3 + + +@pytest.mark.asyncio +async def test_set_unit_level_rejects_out_of_range_percent() -> None: + client, _ = _make_client() + with pytest.raises(ValueError, match="0..100"): + await client.set_unit_level(1, 101) + with pytest.raises(ValueError, match="0..100"): + await client.set_unit_level(1, -1) + + +@pytest.mark.asyncio +async def test_bypass_zone_packs_code_as_p1_and_zone_as_p2() -> None: + client, fake = _make_client() + await client.bypass_zone(12, code=5) + payload = fake.calls[0][1] + assert payload[0] == int(Command.BYPASS_ZONE) + assert payload[1] == 5 + assert (payload[2] << 8) | payload[3] == 12 + + +@pytest.mark.asyncio +async def test_restore_zone_packs_code_and_zone() -> None: + client, fake = _make_client() + await client.restore_zone(99, code=3) + payload = fake.calls[0][1] + assert payload[0] == int(Command.RESTORE_ZONE) + assert payload[1] == 3 + assert (payload[2] << 8) | payload[3] == 99 + + +@pytest.mark.asyncio +async def test_execute_button() -> None: + client, fake = _make_client() + await client.execute_button(15) + payload = fake.calls[0][1] + assert payload[0] == int(Command.EXECUTE_BUTTON) + assert (payload[2] << 8) | payload[3] == 15 + + +@pytest.mark.asyncio +async def test_set_thermostat_modes_route_through_command() -> None: + client, fake = _make_client() + await client.set_thermostat_system_mode(2, 1) # 1 = Heat + await client.set_thermostat_fan_mode(2, 2) # 2 = On + await client.set_thermostat_hold_mode(2, 1) # 1 = Hold + cmds = [p[1][0] for p in fake.calls] + assert cmds == [ + int(Command.SET_THERMOSTAT_SYSTEM_MODE), + int(Command.SET_THERMOSTAT_FAN_MODE), + int(Command.SET_THERMOSTAT_HOLD_MODE), + ] + + +@pytest.mark.asyncio +async def test_set_thermostat_setpoint_raw_validates_byte_range() -> None: + client, _ = _make_client() + with pytest.raises(ValueError, match="raw_temp"): + await client.set_thermostat_heat_setpoint_raw(1, 256) + with pytest.raises(ValueError, match="raw_temp"): + await client.set_thermostat_cool_setpoint_raw(1, -1) + + +# ---- execute_security_command ------------------------------------------ + + +@pytest.mark.asyncio +async def test_execute_security_command_digit_packing() -> None: + # Code 1234 → digits 1, 2, 3, 4. + client, fake = _make_client([(int(OmniLinkMessageType.Ack), b"")]) + await client.execute_security_command(area=1, mode=SecurityMode.OFF, code=1234) + opcode, payload = fake.calls[0] + assert opcode == int(OmniLinkMessageType.ExecuteSecurityCommand) + assert payload == bytes([1, int(SecurityMode.OFF), 1, 2, 3, 4]) + + +@pytest.mark.asyncio +async def test_execute_security_command_pads_short_codes() -> None: + # Code 7 → digits 0, 0, 0, 7. + client, fake = _make_client([(int(OmniLinkMessageType.Ack), b"")]) + await client.execute_security_command(area=8, mode=SecurityMode.AWAY, code=7) + payload = fake.calls[0][1] + assert payload == bytes([8, int(SecurityMode.AWAY), 0, 0, 0, 7]) + + +@pytest.mark.asyncio +async def test_execute_security_command_response_success_returns() -> None: + # Panel returns ExecuteSecurityCommandResponse with status=0 (success). + client, _ = _make_client( + [( + int(OmniLinkMessageType.ExecuteSecurityCommandResponse), + bytes([int(SecurityCommandResponse.SUCCESS)]), + )] + ) + await client.execute_security_command(area=1, mode=SecurityMode.OFF, code=0) + + +@pytest.mark.asyncio +async def test_execute_security_command_response_failure_raises() -> None: + # Panel returns ExecuteSecurityCommandResponse with status= + # SecureSystem (1) — wrong code or area not enabled for this code. + client, _ = _make_client( + [( + int(OmniLinkMessageType.ExecuteSecurityCommandResponse), + bytes([int(SecurityCommandResponse.INVALID_CODE)]), + )] + ) + with pytest.raises(CommandFailedError) as ei: + await client.execute_security_command( + area=1, mode=SecurityMode.AWAY, code=9999 + ) + assert ei.value.failure_code == int(SecurityCommandResponse.INVALID_CODE) + + +@pytest.mark.asyncio +async def test_execute_security_command_nak_raises() -> None: + client, _ = _make_client([(int(OmniLinkMessageType.Nak), b"")]) + with pytest.raises(CommandFailedError, match="NAK"): + await client.execute_security_command( + area=1, mode=SecurityMode.OFF, code=0 + ) + + +@pytest.mark.asyncio +async def test_execute_security_command_rejects_bad_inputs() -> None: + client, _ = _make_client() + with pytest.raises(ValueError, match="area"): + await client.execute_security_command(area=0, mode=SecurityMode.OFF, code=0) + with pytest.raises(ValueError, match="code"): + await client.execute_security_command( + area=1, mode=SecurityMode.OFF, code=10000 + ) + + +# ---- acknowledge_alerts ------------------------------------------------- + + +@pytest.mark.asyncio +async def test_acknowledge_alerts_is_noop_on_v1() -> None: + """v1 has no AcknowledgeAlerts opcode — method should not call request.""" + client, fake = _make_client() + await client.acknowledge_alerts() + assert fake.calls == [] diff --git a/tests/test_v1_messages.py b/tests/test_v1_messages.py new file mode 100644 index 0000000..f85ce4e --- /dev/null +++ b/tests/test_v1_messages.py @@ -0,0 +1,276 @@ +"""Unit tests for omni_pca.v1.messages parsers. + +Test vectors are real wire payloads captured from a firmware-2.12 Omni +Pro II panel via dev/probe_v1_recon.py — see the comment above each +test for the inputs that produced it. +""" + +from __future__ import annotations + +import pytest + +from omni_pca.v1.messages import ( + parse_v1_aux_status, + parse_v1_system_status, + parse_v1_thermostat_status, + parse_v1_unit_status, + parse_v1_zone_status, +) + + +# ---- ZoneStatus --------------------------------------------------------- + + +def test_v1_zone_status_secure_and_open() -> None: + # Captured: RequestZoneStatus(1, 8) → 16-byte payload, 8 zones × 2 bytes. + # zone 6 raw_status=0x01 (open), all others 0x00. + payload = bytes.fromhex("0080007f007f0080008001fd00810080") + zones = parse_v1_zone_status(payload, start_index=1) + assert len(zones) == 8 + assert {z.index for z in zones} == set(range(1, 9)) + assert zones[0].is_secure # zone 1 + assert zones[5].is_open # zone 6 + assert zones[5].raw_status == 0x01 + assert zones[5].loop == 0xFD + + +def test_v1_zone_status_indexes_offset_by_start() -> None: + # If we requested zones 17..24, the same 16-byte payload should + # produce indexes 17..24. + payload = bytes.fromhex("0080007f007f0080008001fd00810080") + zones = parse_v1_zone_status(payload, start_index=17) + assert {z.index for z in zones} == set(range(17, 25)) + + +def test_v1_zone_status_invalid_length() -> None: + with pytest.raises(ValueError, match="multiple of 2"): + parse_v1_zone_status(b"\x00\x00\x00", start_index=1) + + +# ---- UnitStatus --------------------------------------------------------- + + +def test_v1_unit_status_dimmer_levels() -> None: + # Captured: RequestUnitStatus(1, 8) → 24-byte payload, 8 units × 3 bytes. + # state bytes: 01, 01, 69, 96, 69, 00, 73, 00 → 100%, 100%, 5%, 50%, 5%, 0%, 15%, 0% + payload = bytes.fromhex("010000010000690000960000690000000000730000000000") + units = parse_v1_unit_status(payload, start_index=1) + assert len(units) == 8 + assert units[0].is_on and units[0].brightness == 100 # state=0x01 + assert units[2].brightness == 5 # state=0x69 = 105 → -100 = 5% + assert units[3].brightness == 50 # state=0x96 = 150 → -100 = 50% + assert not units[5].is_on # state=0x00 + assert units[6].brightness == 15 # state=0x73 = 115 → -100 = 15% + + +def test_v1_unit_status_time_remaining_be_u16() -> None: + # Single record with remaining=0x1234. + payload = bytes([0x01, 0x12, 0x34]) + units = parse_v1_unit_status(payload, start_index=42) + assert len(units) == 1 + assert units[0].index == 42 + assert units[0].time_remaining_secs == 0x1234 + + +def test_v1_unit_status_invalid_length() -> None: + with pytest.raises(ValueError, match="multiple of 3"): + parse_v1_unit_status(b"\x00\x00", start_index=1) + + +# ---- ThermostatStatus --------------------------------------------------- + + +def test_v1_thermostat_status_unconfigured() -> None: + # Captured: RequestThermostatStatus(1, 4) → 28 B, all values 0/0/0/0/0/0/0 + # except byte 0 of records 0-1 which is 0x01 (status). The "raw=0" temps + # decode to -40°C / -40°F per omni_temp_to_fahrenheit. + payload = bytes.fromhex( + "01000000000000010000000000000000000000000000000000000000" + ) + tstats = parse_v1_thermostat_status(payload, start_index=1) + assert len(tstats) == 4 + assert tstats[0].status == 0x01 + assert tstats[2].status == 0x00 + assert tstats[0].humidity_raw == 0 # zero-filled (v1 doesn't carry it) + assert tstats[0].outdoor_temperature_raw == 0 + assert tstats[0].horc_status == 0 + + +def test_v1_thermostat_full_record() -> None: + # Hand-constructed: status=0x01, temp=170 (=45°F), heat=140 (30°F), + # cool=200 (60°F), mode=1, fan=2, hold=3. + payload = bytes([0x01, 170, 140, 200, 1, 2, 3]) + tstats = parse_v1_thermostat_status(payload, start_index=5) + assert len(tstats) == 1 + t = tstats[0] + assert t.index == 5 + assert t.status == 0x01 + assert t.temperature_raw == 170 + assert t.heat_setpoint_raw == 140 + assert t.cool_setpoint_raw == 200 + assert t.system_mode == 1 + assert t.fan_mode == 2 + assert t.hold_mode == 3 + + +def test_v1_thermostat_invalid_length() -> None: + with pytest.raises(ValueError, match="multiple of 7"): + parse_v1_thermostat_status(b"\x00" * 6, start_index=1) + + +# ---- AuxiliaryStatus ---------------------------------------------------- + + +def test_v1_aux_status_all_zero() -> None: + # Captured: RequestAuxiliaryStatus(1, 8) → 32 B all zeros. + payload = bytes(32) + auxes = parse_v1_aux_status(payload, start_index=1) + assert len(auxes) == 8 + assert all(a.output == 0 and a.value_raw == 0 for a in auxes) + + +def test_v1_aux_status_record_field_order() -> None: + # Single record: output=11, value=22, low=33, high=44 + payload = bytes([11, 22, 33, 44]) + auxes = parse_v1_aux_status(payload, start_index=99) + assert len(auxes) == 1 + a = auxes[0] + assert a.index == 99 + assert a.output == 11 + assert a.value_raw == 22 + assert a.low_raw == 33 + assert a.high_raw == 44 + + +def test_v1_aux_invalid_length() -> None: + with pytest.raises(ValueError, match="multiple of 4"): + parse_v1_aux_status(b"\x00\x00\x00", start_index=1) + + +# ---- SystemStatus ------------------------------------------------------- + + +def test_v1_system_status_full_payload() -> None: + # Captured: RequestSystemStatus → 38 B payload from firmware 2.12. + # Bytes: 011a050a07163b1c01061c150003 + 24 area-mode bytes + # decode: time_valid=1, year=26 (=2026), month=05, day=10, + # dow=07, hour=22, min=59, sec=28, dst=01, sun_h=06, sun_m=28, + # sun_h2=21, sun_m2=21, battery=0x00, then area modes. + # Note: the 14th byte (0x03) is the BATTERY reading = 3, not 0. + payload = bytes.fromhex( + "011a050a07163b1c01061c150003000000000000000002090000000000000000000000000000" + ) + s = parse_v1_system_status(payload) + assert s.time_valid is True + assert s.panel_time is not None + assert s.panel_time.year == 2000 + 0x1A # 2026 + assert s.panel_time.month == 0x05 + assert s.panel_time.day == 0x0A + assert s.sunrise_hour == 0x06 + assert s.sunrise_minute == 0x1C # 28 + assert s.sunset_hour == 0x15 # 21 + assert s.sunset_minute == 0x00 + assert s.battery_reading == 0x03 + # 24 trailing bytes promoted to area_alarms tuples (mode_byte, 0). + assert len(s.area_alarms) == 24 + assert s.area_alarms[0] == (0, 0) + # Area 9 in this capture had mode=2. + assert s.area_alarms[8] == (2, 0) + + +def test_v1_system_status_minimum_payload() -> None: + # Just the 14 header bytes, no area modes. + payload = bytes(14) + s = parse_v1_system_status(payload) + assert s.time_valid is False + assert s.panel_time is None + assert s.battery_reading == 0 + assert s.area_alarms == () + + +def test_v1_system_status_too_short_raises() -> None: + with pytest.raises(ValueError, match="too short"): + parse_v1_system_status(b"\x00" * 13) + + +# ---- NameData ----------------------------------------------------------- + + +from omni_pca.v1.messages import NameType, parse_v1_namedata # noqa: E402 + + +def test_v1_namedata_zone_one_byte_form() -> None: + # Captured: UploadNames stream → first reply = Zone #1 'GARAGE ENTRY'. + # Payload 18 B = type(1) + num(1) + name(15) + reserved(1). + payload = bytes.fromhex("010147415241474520454e54525900000000") + rec = parse_v1_namedata(payload) + assert rec.name_type == int(NameType.ZONE) + assert rec.name_type_label == "ZONE" + assert rec.number == 1 + assert rec.name == "GARAGE ENTRY" + + +def test_v1_namedata_unit_one_byte_form() -> None: + # Hand-crafted: Unit #5 = "GARAGE ENTRY" (12-char name slot, no padding need). + name = "GARAGE ENTRY" + payload = ( + bytes([int(NameType.UNIT), 5]) + + name.encode("ascii").ljust(12, b"\x00") + + b"\x00" # reserved trailing byte + ) + rec = parse_v1_namedata(payload) + assert rec.name_type == int(NameType.UNIT) + assert rec.number == 5 + assert rec.name == name + + +def test_v1_namedata_unit_two_byte_form() -> None: + # Unit #257 = 'Z1-LANDSCAPE' — captured from the real panel after the + # numbered units rolled over 256. + payload = ( + bytes([int(NameType.UNIT), 0x01, 0x01]) # type, num_hi=1, num_lo=1 + + b"Z1-LANDSCAPE".ljust(12, b"\x00") # 12-char name + + b"\x00" + ) + rec = parse_v1_namedata(payload) + assert rec.name_type == int(NameType.UNIT) + assert rec.number == 257 + assert rec.name == "Z1-LANDSCAPE" + + +def test_v1_namedata_thermostat() -> None: + payload = ( + bytes([int(NameType.THERMOSTAT), 1]) + + b"DOWNSTAIRS".ljust(12, b"\x00") + + b"\x00" + ) + rec = parse_v1_namedata(payload) + assert rec.name_type == int(NameType.THERMOSTAT) + assert rec.number == 1 + assert rec.name == "DOWNSTAIRS" + + +def test_v1_namedata_strips_trailing_nulls() -> None: + payload = ( + bytes([int(NameType.ZONE), 9]) + + b"HALL MOTION".ljust(15, b"\x00") + + b"\x00" + ) + rec = parse_v1_namedata(payload) + assert rec.name == "HALL MOTION" # no embedded nulls in result + + +def test_v1_namedata_unknown_type_falls_through() -> None: + # Unknown name type — parser should still return SOMETHING by + # consuming the rest as the name. HA filters by NameType anyway. + payload = bytes([99, 7]) + b"WHATEVER\x00\x00" + rec = parse_v1_namedata(payload) + assert rec.name_type == 99 + assert rec.name_type_label == "Unknown(99)" + assert rec.number == 7 + assert rec.name == "WHATEVER" + + +def test_v1_namedata_short_payload_raises() -> None: + with pytest.raises(ValueError, match="too short"): + parse_v1_namedata(b"\x01\x00")