v1-over-UDP: parallel OmniClientV1 for panels that listen UDP-only
Some Omni network modules are configured for UDP, in which case PC Access
falls back to the v1 wire protocol (OmniLinkMessage outer = 0x10, inner
StartChar 0x5A, typed Request*Status opcodes) instead of v2's TCP path
(OmniLink2Message + StartChar 0x21 + parameterised RequestProperties).
This adds a parallel implementation rather than overloading the v2 path.
omni_pca/v1/
connection.py UDP-only OmniConnectionV1; reuses crypto + handshake,
routes post-handshake messages through OmniLinkMessage
(0x10) wrapping v1 inner format. Adds iter_streaming
for the lock-step UploadNames/Acknowledge/EOD pattern.
messages.py Block parsers for the typed v1 status replies (zone,
unit, thermostat, aux), v1 SystemStatus, and NameData
(handles both one-byte and two-byte NameNumber forms).
client.py OmniClientV1: read API (get_system_information,
get_*_status), discovery (iter_names + list_*_names),
write API (execute_command, execute_security_command,
turn_unit_*, set_unit_level, bypass/restore_zone,
execute_button, set_thermostat_*). acknowledge_alerts
is a no-op (v1 has no equivalent opcode).
Discovery uses bare UploadNames; panel streams every defined name across
all types in a fixed order with per-record Acknowledge. Verified against
firmware 2.12 — pulled 16 zones, 44 units, 16 buttons, 8 codes,
2 thermostats, 8 messages in one stream.
src/omni_pca/message.py
Fix flipped START_CHAR_V1_* constants. enuOmniLinkMessageFormat says
Addressable=0x41 and NonAddressable=0x5A; our names had them swapped.
Wire bytes were unchanged, so existing tests kept passing — but
encode_v1() with no serial_address now correctly emits 0x5A, which
is what UDP needs.
tests/
test_v1_messages.py 22 cases; payloads are real wire captures
from a firmware-2.12 panel via probe_v1_recon.
test_v1_client_commands.py 20 cases; payload-packing for the Command
and ExecuteSecurityCommand opcodes,
including BE u16 parameter2 and the
digit-by-digit security code form.
dev/
probe_v1.py Phase-1 smoke: handshake + RequestSystemInformation.
probe_v1_recon.py Raw opcode dump for protocol reconnaissance.
probe_v1_stream.py Streaming UploadNames flow exploration.
probe_v1_client.py Full read-path smoke test via OmniClientV1.
probe_v1_write.py Live no-op execute_command round-trip.
.gitignore: ignore dev/.omni_key (probe scripts read controller key from
this file as one fallback option).
Discovery on firmware 2.12: Request*ExtendedStatus opcodes (63/65/69)
NAK on this firmware — only the basic Request*Status opcodes are
implemented, so OmniClientV1 uses those (3 bytes/unit, 7 bytes/tstat,
4 bytes/aux records). HA still gets enough signal for polling; full
properties discovery uses streaming UploadNames instead.
Test totals: 387 passed, 1 skipped (existing fixture skip).
This commit is contained in:
parent
d91561a6d2
commit
92c8b695b4
1
.gitignore
vendored
1
.gitignore
vendored
@ -41,3 +41,4 @@ panel_key*
|
||||
.wine-pca/
|
||||
ha-config/
|
||||
dist/
|
||||
dev/.omni_key
|
||||
|
||||
151
dev/probe_v1.py
Normal file
151
dev/probe_v1.py
Normal file
@ -0,0 +1,151 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Phase-1 smoke test: v1-over-UDP handshake + RequestSystemInformation.
|
||||
|
||||
Run inside the project venv:
|
||||
cd /home/kdm/home-auto/omni-pca
|
||||
uv run python dev/probe_v1.py [--host 192.168.1.9] [--port 4369]
|
||||
|
||||
Requires the panel's controller key. Picks it up from (in order):
|
||||
1. ``--key 32hex`` on the command line
|
||||
2. ``OMNI_KEY`` env var
|
||||
3. ``dev/.omni_key`` file (gitignored)
|
||||
4. The bundled ``.pca`` plain fixture (developer-only fallback)
|
||||
|
||||
Success criteria: panel returns a v1 SystemInformation message (opcode 18)
|
||||
within the timeout. Failure modes we want to distinguish:
|
||||
* UDP socket fails to open → routing / firewall
|
||||
* Handshake step 2 timeout → wrong port, wrong panel
|
||||
* Handshake step 4 termination → wrong controller key
|
||||
* SystemInformation timeout → v1 path isn't doing what we think
|
||||
* SystemInformation reply → v1-over-UDP is real, proceed to Phase 2
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from omni_pca.v1.connection import (
|
||||
HandshakeError,
|
||||
InvalidEncryptionKeyError,
|
||||
OmniConnectionV1,
|
||||
RequestTimeoutError,
|
||||
)
|
||||
from omni_pca.opcodes import OmniLinkMessageType
|
||||
|
||||
|
||||
def _load_key(arg_key: str | None) -> bytes:
|
||||
if arg_key:
|
||||
return bytes.fromhex(arg_key)
|
||||
env = os.environ.get("OMNI_KEY")
|
||||
if env:
|
||||
return bytes.fromhex(env)
|
||||
keyfile = Path(__file__).parent / ".omni_key"
|
||||
if keyfile.exists():
|
||||
return bytes.fromhex(keyfile.read_text().strip())
|
||||
fixture = Path("/home/kdm/home-auto/HAI/pca-re/extracted/Our_House.pca.plain")
|
||||
if fixture.exists():
|
||||
from omni_pca.pca_file import (
|
||||
PcaReader,
|
||||
_CAP_OMNI_PRO_II,
|
||||
_parse_header,
|
||||
_walk_to_connection,
|
||||
)
|
||||
|
||||
r = PcaReader(fixture.read_bytes())
|
||||
_parse_header(r)
|
||||
_walk_to_connection(r, _CAP_OMNI_PRO_II)
|
||||
r.string8_fixed(120) # network_address
|
||||
r.string8_fixed(5) # port
|
||||
return bytes.fromhex(r.string8_fixed(32).ljust(32, "0")[:32])
|
||||
raise SystemExit("no controller key: pass --key, set OMNI_KEY, or create dev/.omni_key")
|
||||
|
||||
|
||||
def _decode_system_information(payload: bytes) -> dict[str, object]:
|
||||
"""Parse the v1 SystemInformation payload (mirrors clsOLMsgSystemInformation)."""
|
||||
if len(payload) < 29:
|
||||
raise ValueError(f"SystemInformation payload too short: {len(payload)} bytes")
|
||||
return {
|
||||
"opcode": payload[0],
|
||||
"model": payload[1],
|
||||
"fw_major": payload[2],
|
||||
"fw_minor": payload[3],
|
||||
"fw_revision": int.from_bytes(payload[4:5], "big", signed=True),
|
||||
"local_phone": payload[5:29].rstrip(b"\x00").decode("ascii", errors="replace"),
|
||||
}
|
||||
|
||||
|
||||
async def amain(args: argparse.Namespace) -> int:
|
||||
key = _load_key(args.key)
|
||||
print(f"[probe] target {args.host}:{args.port} key=...{key[-2:].hex()} (16 B)")
|
||||
|
||||
try:
|
||||
async with OmniConnectionV1(
|
||||
host=args.host,
|
||||
port=args.port,
|
||||
controller_key=key,
|
||||
timeout=args.timeout,
|
||||
retry_count=args.retries,
|
||||
) as conn:
|
||||
print(f"[probe] handshake OK state={conn.state.name} "
|
||||
f"session_key=...{conn.session_key[-2:].hex() if conn.session_key else 'n/a'}")
|
||||
|
||||
print("[probe] sending v1 RequestSystemInformation (opcode 17)")
|
||||
reply = await conn.request(OmniLinkMessageType.RequestSystemInformation)
|
||||
print(f"[probe] reply: start_char={reply.start_char:#04x} "
|
||||
f"opcode={reply.opcode} payload={reply.data.hex()}")
|
||||
|
||||
if reply.opcode != int(OmniLinkMessageType.SystemInformation):
|
||||
print(f"[probe] WARNING: expected opcode 18 (SystemInformation), "
|
||||
f"got {reply.opcode}")
|
||||
return 2
|
||||
|
||||
info = _decode_system_information(reply.data)
|
||||
print(f"[probe] ✓ v1-over-UDP works")
|
||||
print(f" model = {info['model']}")
|
||||
print(f" firmware = {info['fw_major']}.{info['fw_minor']}.{info['fw_revision']}")
|
||||
print(f" phone = {info['local_phone']!r}")
|
||||
|
||||
except InvalidEncryptionKeyError as exc:
|
||||
print(f"[probe] handshake terminated: wrong controller key? ({exc})")
|
||||
return 1
|
||||
except HandshakeError as exc:
|
||||
print(f"[probe] handshake failed: {exc}")
|
||||
return 1
|
||||
except RequestTimeoutError as exc:
|
||||
print(f"[probe] no reply to RequestSystemInformation: {exc}")
|
||||
print(" → handshake worked but v1 path isn't responding. "
|
||||
"Check tcpdump for what's on the wire.")
|
||||
return 2
|
||||
except OSError as exc:
|
||||
print(f"[probe] socket error: {exc}")
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--host", default="192.168.1.9")
|
||||
parser.add_argument("--port", type=int, default=4369)
|
||||
parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key")
|
||||
parser.add_argument("--timeout", type=float, default=5.0)
|
||||
parser.add_argument("--retries", type=int, default=2)
|
||||
parser.add_argument("--debug", action="store_true",
|
||||
help="enable DEBUG logging (TX/RX packet dump)")
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.debug else logging.INFO,
|
||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
)
|
||||
|
||||
return asyncio.run(amain(args))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
122
dev/probe_v1_client.py
Normal file
122
dev/probe_v1_client.py
Normal file
@ -0,0 +1,122 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Phase-2a smoke test: drive OmniClientV1 against the real panel.
|
||||
|
||||
Hits the read-only methods we care about for HA polling. Compares parsed
|
||||
values against the recon dump so we catch off-by-one byte errors fast.
|
||||
|
||||
Run:
|
||||
cd /home/kdm/home-auto/omni-pca
|
||||
uv run python dev/probe_v1_client.py
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from probe_v1 import _load_key # type: ignore # noqa: E402
|
||||
|
||||
from omni_pca.v1 import OmniClientV1, OmniNakError
|
||||
|
||||
|
||||
async def amain(args: argparse.Namespace) -> int:
|
||||
key = _load_key(args.key)
|
||||
print(f"[client probe] target {args.host}:{args.port}\n")
|
||||
|
||||
async with OmniClientV1(
|
||||
host=args.host, port=args.port, controller_key=key, timeout=4.0,
|
||||
) as c:
|
||||
info = await c.get_system_information()
|
||||
print(f"system: model={info.model_name} fw={info.firmware_version} "
|
||||
f"phone={info.local_phone!r}")
|
||||
|
||||
print("\n--- discovery (streaming UploadNames) ---")
|
||||
all_names = await c.list_all_names()
|
||||
for type_byte in sorted(all_names):
|
||||
try:
|
||||
from omni_pca.v1 import NameType
|
||||
label = NameType(type_byte).name
|
||||
except ValueError:
|
||||
label = f"type{type_byte}"
|
||||
print(f" {label} ({len(all_names[type_byte])} entries)")
|
||||
for num in sorted(all_names[type_byte]):
|
||||
print(f" #{num}: {all_names[type_byte][num]!r}")
|
||||
|
||||
try:
|
||||
sysstatus = await c.get_system_status()
|
||||
print(f"status: time={sysstatus.panel_time} "
|
||||
f"battery=0x{sysstatus.battery_reading:02x} "
|
||||
f"sunrise={sysstatus.sunrise_hour:02d}:{sysstatus.sunrise_minute:02d} "
|
||||
f"sunset={sysstatus.sunset_hour:02d}:{sysstatus.sunset_minute:02d} "
|
||||
f"area_modes={[m for m, _ in sysstatus.area_alarms]}")
|
||||
except Exception as exc:
|
||||
print(f"system status failed: {type(exc).__name__}: {exc}")
|
||||
|
||||
print("\n--- zones 1..16 ---")
|
||||
zones = await c.get_zone_status(1, 16)
|
||||
for idx in sorted(zones):
|
||||
z = zones[idx]
|
||||
flags = []
|
||||
if z.is_open: flags.append("open")
|
||||
if z.is_in_alarm: flags.append("alarm")
|
||||
if z.is_bypassed: flags.append("bypass")
|
||||
if z.is_trouble: flags.append("trouble")
|
||||
tag = ",".join(flags) or "secure"
|
||||
print(f" zone {idx:2d}: status=0x{z.raw_status:02x} loop=0x{z.loop:02x} ({tag})")
|
||||
|
||||
print("\n--- units 1..16 ---")
|
||||
units = await c.get_unit_status(1, 16)
|
||||
for idx in sorted(units):
|
||||
u = units[idx]
|
||||
br = u.brightness
|
||||
br_s = f"{br}%" if br is not None else "n/a"
|
||||
print(f" unit {idx:2d}: state=0x{u.state:02x} ({br_s}) "
|
||||
f"time_remaining={u.time_remaining_secs}s")
|
||||
|
||||
print("\n--- thermostats 1..4 ---")
|
||||
try:
|
||||
tstats = await c.get_thermostat_status(1, 4)
|
||||
for idx in sorted(tstats):
|
||||
t = tstats[idx]
|
||||
print(f" tstat {idx}: status=0x{t.status:02x} "
|
||||
f"temp_F={t.temperature_f:.1f} "
|
||||
f"heat={t.heat_setpoint_f:.0f} cool={t.cool_setpoint_f:.0f} "
|
||||
f"mode=0x{t.system_mode:02x} fan=0x{t.fan_mode:02x} "
|
||||
f"hold=0x{t.hold_mode:02x}")
|
||||
except OmniNakError as exc:
|
||||
print(f" no thermostats configured: {exc}")
|
||||
|
||||
print("\n--- aux 1..8 ---")
|
||||
try:
|
||||
auxes = await c.get_aux_status(1, 8)
|
||||
for idx in sorted(auxes):
|
||||
a = auxes[idx]
|
||||
print(f" aux {idx}: output=0x{a.output:02x} value=0x{a.value_raw:02x} "
|
||||
f"low=0x{a.low_raw:02x} high=0x{a.high_raw:02x}")
|
||||
except OmniNakError as exc:
|
||||
print(f" no aux sensors: {exc}")
|
||||
|
||||
print("\n[client probe] ✓ disconnected cleanly")
|
||||
return 0
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--host", default="192.168.1.9")
|
||||
parser.add_argument("--port", type=int, default=4369)
|
||||
parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key")
|
||||
parser.add_argument("--debug", action="store_true")
|
||||
args = parser.parse_args()
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.debug else logging.WARNING,
|
||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
)
|
||||
return asyncio.run(amain(args))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
149
dev/probe_v1_recon.py
Normal file
149
dev/probe_v1_recon.py
Normal file
@ -0,0 +1,149 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Phase-2 reconnaissance: fetch v1 status replies from the real panel.
|
||||
|
||||
Doesn't parse — just dumps the raw payload bytes for each known v1 opcode
|
||||
so we can match them against the C# message classes before writing
|
||||
parsers. Builds the picture of what your panel actually has configured.
|
||||
|
||||
Run:
|
||||
cd /home/kdm/home-auto/omni-pca
|
||||
uv run python dev/probe_v1_recon.py [--debug]
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Reuse the key loader from probe_v1.
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from probe_v1 import _load_key # type: ignore # noqa: E402
|
||||
|
||||
from omni_pca.opcodes import OmniLinkMessageType
|
||||
from omni_pca.v1.connection import OmniConnectionV1, RequestTimeoutError
|
||||
|
||||
|
||||
async def _request_or_warn(
|
||||
conn: OmniConnectionV1,
|
||||
label: str,
|
||||
opcode: OmniLinkMessageType,
|
||||
payload: bytes = b"",
|
||||
expected_opcode: int | None = None,
|
||||
) -> None:
|
||||
print(f"--- {label} (req opcode {int(opcode)}, payload {payload.hex() or '<empty>'}) ---")
|
||||
try:
|
||||
reply = await conn.request(opcode, payload, timeout=4.0)
|
||||
except RequestTimeoutError as exc:
|
||||
print(f" TIMEOUT: {exc}")
|
||||
return
|
||||
except Exception as exc:
|
||||
print(f" ERROR: {type(exc).__name__}: {exc}")
|
||||
return
|
||||
print(f" reply opcode = {reply.opcode}")
|
||||
print(f" payload ({len(reply.payload)} B) = {reply.payload.hex()}")
|
||||
if expected_opcode is not None and reply.opcode != expected_opcode:
|
||||
print(f" NOTE: expected opcode {expected_opcode}, got {reply.opcode}")
|
||||
|
||||
|
||||
async def amain(args: argparse.Namespace) -> int:
|
||||
key = _load_key(args.key)
|
||||
print(f"[recon] target {args.host}:{args.port}\n")
|
||||
|
||||
async with OmniConnectionV1(
|
||||
host=args.host,
|
||||
port=args.port,
|
||||
controller_key=key,
|
||||
timeout=4.0,
|
||||
retry_count=1,
|
||||
) as conn:
|
||||
print(f"handshake OK state={conn.state.name}\n")
|
||||
|
||||
# --- panel-wide ---
|
||||
await _request_or_warn(
|
||||
conn, "SystemInformation", OmniLinkMessageType.RequestSystemInformation,
|
||||
expected_opcode=int(OmniLinkMessageType.SystemInformation),
|
||||
)
|
||||
await _request_or_warn(
|
||||
conn, "SystemStatus", OmniLinkMessageType.RequestSystemStatus,
|
||||
expected_opcode=int(OmniLinkMessageType.SystemStatus),
|
||||
)
|
||||
await _request_or_warn(
|
||||
conn, "StatusSummary", OmniLinkMessageType.RequestStatusSummary,
|
||||
expected_opcode=int(OmniLinkMessageType.StatusSummary),
|
||||
)
|
||||
|
||||
# --- bulk status, small ranges so we can read the bytes ---
|
||||
await _request_or_warn(
|
||||
conn, "ZoneStatus[1..8]", OmniLinkMessageType.RequestZoneStatus,
|
||||
payload=bytes([1, 8]),
|
||||
expected_opcode=int(OmniLinkMessageType.ZoneStatus),
|
||||
)
|
||||
await _request_or_warn(
|
||||
conn, "ZoneExtendedStatus[1..8]", OmniLinkMessageType.RequestZoneExtendedStatus,
|
||||
payload=bytes([1, 8]),
|
||||
expected_opcode=int(OmniLinkMessageType.ZoneExtendedStatus),
|
||||
)
|
||||
await _request_or_warn(
|
||||
conn, "UnitStatus[1..8]", OmniLinkMessageType.RequestUnitStatus,
|
||||
payload=bytes([1, 8]),
|
||||
expected_opcode=int(OmniLinkMessageType.UnitStatus),
|
||||
)
|
||||
await _request_or_warn(
|
||||
conn, "UnitExtendedStatus[1..8]", OmniLinkMessageType.RequestUnitExtendedStatus,
|
||||
payload=bytes([1, 8]),
|
||||
expected_opcode=int(OmniLinkMessageType.UnitExtendedStatus),
|
||||
)
|
||||
await _request_or_warn(
|
||||
conn, "ThermostatStatus[1..4]", OmniLinkMessageType.RequestThermostatStatus,
|
||||
payload=bytes([1, 4]),
|
||||
expected_opcode=int(OmniLinkMessageType.ThermostatStatus),
|
||||
)
|
||||
await _request_or_warn(
|
||||
conn, "ThermostatExtendedStatus[1..4]", OmniLinkMessageType.RequestThermostatExtendedStatus,
|
||||
payload=bytes([1, 4]),
|
||||
expected_opcode=int(OmniLinkMessageType.ThermostatExtendedStatus),
|
||||
)
|
||||
await _request_or_warn(
|
||||
conn, "AuxiliaryStatus[1..8]", OmniLinkMessageType.RequestAuxiliaryStatus,
|
||||
payload=bytes([1, 8]),
|
||||
expected_opcode=int(OmniLinkMessageType.AuxiliaryStatus),
|
||||
)
|
||||
|
||||
# --- discovery: UploadNames is the READ request; DownloadNames is the
|
||||
# WRITE direction (panel <- client). Reply payload is NameData with the
|
||||
# next defined object's number + name.
|
||||
# Per clsOL2MsgUploadNames: [type, num_hi, num_lo, relative_direction].
|
||||
# type: 1=Zone 2=Unit 3=Button 4=Code 5=Thermostat 6=Area 7=Message
|
||||
# relative_direction: +1=next after num, -1=prev before num, 0=exact
|
||||
for type_byte, type_name in [(1, "Zone"), (2, "Unit"), (5, "Thermostat"), (6, "Area")]:
|
||||
await _request_or_warn(
|
||||
conn,
|
||||
f"UploadNames[type={type_name}, after=0, dir=+1]",
|
||||
OmniLinkMessageType.UploadNames,
|
||||
payload=bytes([type_byte, 0, 0, 1]),
|
||||
expected_opcode=int(OmniLinkMessageType.NameData),
|
||||
)
|
||||
|
||||
print("\n--- recon complete, session closed cleanly ---")
|
||||
return 0
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--host", default="192.168.1.9")
|
||||
parser.add_argument("--port", type=int, default=4369)
|
||||
parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key")
|
||||
parser.add_argument("--debug", action="store_true")
|
||||
args = parser.parse_args()
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.debug else logging.WARNING,
|
||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
)
|
||||
return asyncio.run(amain(args))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
118
dev/probe_v1_stream.py
Normal file
118
dev/probe_v1_stream.py
Normal file
@ -0,0 +1,118 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Probe the v1 UploadNames streaming flow.
|
||||
|
||||
Sends UploadNames (no payload), then a series of Acknowledge messages,
|
||||
dumping each reply until we get an EOD or 30 records (whichever comes
|
||||
first). Confirms the lock-step pattern PC Access uses for bulk reads.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from probe_v1 import _load_key # type: ignore # noqa: E402
|
||||
|
||||
from omni_pca.opcodes import OmniLinkMessageType
|
||||
from omni_pca.v1 import OmniConnectionV1
|
||||
|
||||
|
||||
_NAME_TYPE_LABELS = {
|
||||
1: "Zone", 2: "Unit", 3: "Button", 4: "Code",
|
||||
5: "Thermostat", 6: "Area", 7: "Message",
|
||||
}
|
||||
|
||||
|
||||
def _decode_namedata(payload: bytes) -> str:
|
||||
"""Best-effort decode of a NameData payload for display."""
|
||||
if len(payload) < 3:
|
||||
return f"<short payload: {payload.hex()}>"
|
||||
name_type = payload[0]
|
||||
# Heuristic: zones/messages are 15-char names, others 12. With one-byte
|
||||
# NameNumber, payload length = 1 (type) + 1 (num) + L (name) + 1 (term).
|
||||
# With two-byte NameNumber: 1 + 2 + L + 1.
|
||||
L_15 = 15 + 3 # one-byte form, 15-char name
|
||||
L_12 = 12 + 3 # one-byte form, 12-char name
|
||||
if len(payload) == L_15 or len(payload) == L_15 + 1:
|
||||
# 15-char name (Zone or Message), one-byte num.
|
||||
num = payload[1]
|
||||
name = payload[2:2 + 15].rstrip(b"\x00").decode("utf-8", errors="replace")
|
||||
elif len(payload) == L_12 or len(payload) == L_12 + 1:
|
||||
# 12-char name, one-byte num.
|
||||
num = payload[1]
|
||||
name = payload[2:2 + 12].rstrip(b"\x00").decode("utf-8", errors="replace")
|
||||
else:
|
||||
# Two-byte num form (NameNumber > 255): payload[1..2] = BE u16, then name.
|
||||
num = (payload[1] << 8) | payload[2]
|
||||
name = payload[3:].rstrip(b"\x00").decode("utf-8", errors="replace")
|
||||
|
||||
label = _NAME_TYPE_LABELS.get(name_type, f"type{name_type}")
|
||||
return f"{label} #{num}: {name!r}"
|
||||
|
||||
|
||||
async def amain(args: argparse.Namespace) -> int:
|
||||
key = _load_key(args.key)
|
||||
print(f"[stream probe] target {args.host}:{args.port}\n")
|
||||
|
||||
async with OmniConnectionV1(
|
||||
host=args.host, port=args.port, controller_key=key, timeout=4.0
|
||||
) as conn:
|
||||
from omni_pca.message import Message, START_CHAR_V1_UNADDRESSED
|
||||
|
||||
# Step 1: bare UploadNames.
|
||||
upload = Message(
|
||||
start_char=START_CHAR_V1_UNADDRESSED,
|
||||
data=bytes([int(OmniLinkMessageType.UploadNames)]),
|
||||
)
|
||||
seq, fut = conn._send_encrypted(upload)
|
||||
reply = conn._decode_inner(await fut)
|
||||
print(f"reply 1 (seq={seq}): opcode={reply.opcode} {_decode_namedata(reply.payload) if reply.opcode == int(OmniLinkMessageType.NameData) else f'(payload={reply.payload.hex()!r})'}")
|
||||
|
||||
if reply.opcode != int(OmniLinkMessageType.NameData):
|
||||
print("panel didn't reply with NameData — streaming flow may not apply here")
|
||||
return 0
|
||||
|
||||
# Step 2..N: Acknowledge → next NameData (or EOD).
|
||||
for i in range(2, args.max + 1):
|
||||
ack = Message(
|
||||
start_char=START_CHAR_V1_UNADDRESSED,
|
||||
data=bytes([int(OmniLinkMessageType.Ack)]),
|
||||
)
|
||||
seq, fut = conn._send_encrypted(ack)
|
||||
reply = conn._decode_inner(await fut)
|
||||
|
||||
if reply.opcode == int(OmniLinkMessageType.EOD):
|
||||
print(f"reply {i} (seq={seq}): EOD — end of stream after {i-1} records")
|
||||
return 0
|
||||
if reply.opcode == int(OmniLinkMessageType.NameData):
|
||||
print(f"reply {i} (seq={seq}): {_decode_namedata(reply.payload)}")
|
||||
else:
|
||||
print(f"reply {i} (seq={seq}): unexpected opcode {reply.opcode}, "
|
||||
f"payload={reply.payload.hex()}")
|
||||
return 1
|
||||
|
||||
print(f"\nstopped after {args.max} replies (no EOD seen)")
|
||||
return 0
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--host", default="192.168.1.9")
|
||||
parser.add_argument("--port", type=int, default=4369)
|
||||
parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key")
|
||||
parser.add_argument("--max", type=int, default=20, help="stop after N replies")
|
||||
parser.add_argument("--debug", action="store_true")
|
||||
args = parser.parse_args()
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.debug else logging.WARNING,
|
||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
)
|
||||
return asyncio.run(amain(args))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
93
dev/probe_v1_write.py
Normal file
93
dev/probe_v1_write.py
Normal file
@ -0,0 +1,93 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Phase-2c live write smoke test: round-trip a no-op unit command.
|
||||
|
||||
Reads the current state of one unit, sends a command that should yield
|
||||
the same observable result, then re-reads to confirm. Proves that
|
||||
:meth:`OmniClientV1.execute_command` actually flows through the v1
|
||||
Command opcode against the real panel without changing anything visible.
|
||||
|
||||
Run:
|
||||
cd /home/kdm/home-auto/omni-pca
|
||||
uv run python dev/probe_v1_write.py [--index N]
|
||||
|
||||
Default target is unit #4 ('STAIRS' per current panel config).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from probe_v1 import _load_key # type: ignore # noqa: E402
|
||||
|
||||
from omni_pca.v1 import OmniClientV1
|
||||
|
||||
|
||||
async def amain(args: argparse.Namespace) -> int:
|
||||
key = _load_key(args.key)
|
||||
print(f"[write probe] target {args.host}:{args.port} unit #{args.index}\n")
|
||||
|
||||
async with OmniClientV1(
|
||||
host=args.host, port=args.port, controller_key=key, timeout=4.0
|
||||
) as c:
|
||||
before = (await c.get_unit_status(args.index, args.index))[args.index]
|
||||
print(f"BEFORE: state=0x{before.state:02x} "
|
||||
f"brightness={before.brightness!r} "
|
||||
f"time_remaining={before.time_remaining_secs}s")
|
||||
|
||||
# Pick the safest no-op command for the unit's current state:
|
||||
# - state == 0 → send UNIT_OFF (already off, panel acks)
|
||||
# - state == 1 → send UNIT_ON (already on, panel acks)
|
||||
# - 100 <= state <= 200 → set_unit_level(percent) at the current level
|
||||
# - otherwise (scene/dim/etc.) → fall back to UNIT_ON which is harmless
|
||||
if before.state == 0:
|
||||
print("ACTION: turn_unit_off (already off, expecting Ack)")
|
||||
await c.turn_unit_off(args.index)
|
||||
elif before.state == 1:
|
||||
print("ACTION: turn_unit_on (already on, expecting Ack)")
|
||||
await c.turn_unit_on(args.index)
|
||||
elif 100 <= before.state <= 200:
|
||||
level = before.state - 100
|
||||
print(f"ACTION: set_unit_level({level}%) (already at this level)")
|
||||
await c.set_unit_level(args.index, level)
|
||||
else:
|
||||
print(f"ACTION: turn_unit_on (state=0x{before.state:02x} is exotic; safe ack expected)")
|
||||
await c.turn_unit_on(args.index)
|
||||
|
||||
# Give the panel ~250ms to settle if it does pulse anything.
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
after = (await c.get_unit_status(args.index, args.index))[args.index]
|
||||
print(f"AFTER: state=0x{after.state:02x} "
|
||||
f"brightness={after.brightness!r} "
|
||||
f"time_remaining={after.time_remaining_secs}s")
|
||||
|
||||
if after.state == before.state:
|
||||
print("\n✓ panel acked the Command, state unchanged — wire path verified")
|
||||
else:
|
||||
print(f"\n⚠ state changed (0x{before.state:02x} → 0x{after.state:02x}). "
|
||||
"Probably harmless but worth investigating.")
|
||||
return 0
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--host", default="192.168.1.9")
|
||||
parser.add_argument("--port", type=int, default=4369)
|
||||
parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key")
|
||||
parser.add_argument("--index", type=int, default=4)
|
||||
parser.add_argument("--debug", action="store_true")
|
||||
args = parser.parse_args()
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.debug else logging.WARNING,
|
||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
)
|
||||
return asyncio.run(amain(args))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@ -3,7 +3,7 @@
|
||||
Wire layout (non-addressable):
|
||||
``[start_char][length][...data...][crc_lo][crc_hi]``
|
||||
|
||||
For v1 addressable messages (StartChar=0x5A) a single SerialAddress byte
|
||||
For v1 addressable messages (StartChar=0x41) a single SerialAddress byte
|
||||
is interleaved between start_char and length.
|
||||
|
||||
CRC is CRC-16/MODBUS (poly 0xA001, init 0, reflected) computed over the
|
||||
@ -13,6 +13,8 @@ on the wire (CRC1 = low byte, CRC2 = high byte).
|
||||
References:
|
||||
clsOmniLinkMessage.cs (lines 9, 164-186, 273-289) — frame + CRC
|
||||
clsOmniLink2Message.cs (lines 17-23) — v2 StartChar = 0x21
|
||||
enuOmniLinkMessageFormat.cs — Addressable=0x41, NonAddressable=0x5A,
|
||||
OmniLink2=0x21
|
||||
clsOL2MsgLogin.cs / clsOLMsgLogin.cs — example payloads
|
||||
"""
|
||||
|
||||
@ -23,8 +25,8 @@ from dataclasses import dataclass, field
|
||||
from .opcodes import OmniLink2MessageType, OmniLinkMessageType
|
||||
|
||||
START_CHAR_V2 = 0x21
|
||||
START_CHAR_V1_UNADDRESSED = 0x41
|
||||
START_CHAR_V1_ADDRESSABLE = 0x5A
|
||||
START_CHAR_V1_ADDRESSABLE = 0x41
|
||||
START_CHAR_V1_UNADDRESSED = 0x5A
|
||||
|
||||
_CRC_POLY_REFLECTED = 0xA001
|
||||
|
||||
|
||||
50
src/omni_pca/v1/__init__.py
Normal file
50
src/omni_pca/v1/__init__.py
Normal file
@ -0,0 +1,50 @@
|
||||
"""V1 (legacy) Omni-Link protocol over UDP.
|
||||
|
||||
The v2 path in :mod:`omni_pca` (TCP, OmniLink2Message, StartChar 0x21,
|
||||
parameterised RequestProperties / RequestExtendedStatus) is what most
|
||||
modern firmware speaks. This subpackage exists because some panels are
|
||||
configured at the network module to listen on **UDP only**, in which case
|
||||
PC Access falls back to the v1 wire protocol (typed RequestZoneStatus,
|
||||
RequestUnitStatus, etc., StartChar 0x5A, OmniLinkMessage outer = 0x10).
|
||||
|
||||
Reference: clsOmniLinkConnection.cs:353-360 (ConnectionProtocol() returns
|
||||
V1 for Modem/UDP/Serial, V2 only for TCP).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from .client import OmniClientV1, OmniNakError, OmniProtocolError
|
||||
from .connection import (
|
||||
HandshakeError,
|
||||
InvalidEncryptionKeyError,
|
||||
OmniConnectionV1,
|
||||
RequestTimeoutError,
|
||||
)
|
||||
from .messages import (
|
||||
NameRecord,
|
||||
NameType,
|
||||
parse_v1_aux_status,
|
||||
parse_v1_namedata,
|
||||
parse_v1_system_status,
|
||||
parse_v1_thermostat_status,
|
||||
parse_v1_unit_status,
|
||||
parse_v1_zone_status,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"HandshakeError",
|
||||
"InvalidEncryptionKeyError",
|
||||
"NameRecord",
|
||||
"NameType",
|
||||
"OmniClientV1",
|
||||
"OmniConnectionV1",
|
||||
"OmniNakError",
|
||||
"OmniProtocolError",
|
||||
"RequestTimeoutError",
|
||||
"parse_v1_aux_status",
|
||||
"parse_v1_namedata",
|
||||
"parse_v1_system_status",
|
||||
"parse_v1_thermostat_status",
|
||||
"parse_v1_unit_status",
|
||||
"parse_v1_zone_status",
|
||||
]
|
||||
452
src/omni_pca/v1/client.py
Normal file
452
src/omni_pca/v1/client.py
Normal file
@ -0,0 +1,452 @@
|
||||
"""High-level read-only client for v1-over-UDP Omni-Link panels.
|
||||
|
||||
Mirrors the v2 :class:`omni_pca.client.OmniClient` API where the v1 wire
|
||||
protocol can satisfy the same call. Methods that require v2-only opcodes
|
||||
(e.g. ``RequestProperties``, ``AcknowledgeAlerts``) are intentionally
|
||||
absent until Phase 2b/2c add their v1 equivalents (streaming
|
||||
``UploadNames``, no-op or alternate dispatch).
|
||||
|
||||
API parity goals (this module):
|
||||
get_system_information() — same dataclass as v2
|
||||
get_system_status() — same dataclass as v2
|
||||
get_zone_status(start, end) -> dict — uses v1 ZoneStatus
|
||||
get_unit_status(start, end) -> dict — uses v1 UnitStatus
|
||||
get_thermostat_status(start, end) -> dict — uses v1 ThermostatStatus
|
||||
get_aux_status(start, end) -> dict — uses v1 AuxiliaryStatus
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import struct
|
||||
from collections.abc import AsyncIterator, Callable
|
||||
from typing import Self
|
||||
|
||||
from ..commands import Command, CommandFailedError, SecurityCommandResponse
|
||||
from ..models import (
|
||||
AuxSensorStatus,
|
||||
SecurityMode,
|
||||
SystemInformation,
|
||||
SystemStatus,
|
||||
ThermostatStatus,
|
||||
UnitStatus,
|
||||
ZoneStatus,
|
||||
)
|
||||
from ..opcodes import OmniLinkMessageType
|
||||
from .connection import OmniConnectionV1
|
||||
from .messages import (
|
||||
NameRecord,
|
||||
NameType,
|
||||
parse_v1_aux_status,
|
||||
parse_v1_namedata,
|
||||
parse_v1_system_status,
|
||||
parse_v1_thermostat_status,
|
||||
parse_v1_unit_status,
|
||||
parse_v1_zone_status,
|
||||
)
|
||||
|
||||
_DEFAULT_PORT = 4369
|
||||
|
||||
|
||||
class OmniClientV1:
|
||||
"""Read-only v1-over-UDP Omni-Link client.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
async with OmniClientV1("192.168.1.9", controller_key=key) as c:
|
||||
info = await c.get_system_information()
|
||||
zones = await c.get_zone_status(1, 16)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
port: int = _DEFAULT_PORT,
|
||||
controller_key: bytes = b"",
|
||||
timeout: float = 5.0,
|
||||
retry_count: int = 3,
|
||||
) -> None:
|
||||
self._conn = OmniConnectionV1(
|
||||
host=host,
|
||||
port=port,
|
||||
controller_key=controller_key,
|
||||
timeout=timeout,
|
||||
retry_count=retry_count,
|
||||
)
|
||||
|
||||
@property
|
||||
def connection(self) -> OmniConnectionV1:
|
||||
return self._conn
|
||||
|
||||
async def __aenter__(self) -> Self:
|
||||
await self._conn.connect()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb) -> None:
|
||||
await self._conn.close()
|
||||
|
||||
# ---- panel-wide ----------------------------------------------------
|
||||
|
||||
async def get_system_information(self) -> SystemInformation:
|
||||
"""Fetch model + firmware + dialer phone number.
|
||||
|
||||
Wire format identical to v2 (verified per
|
||||
clsOLMsgSystemInformation.cs vs clsOL2MsgSystemInformation.cs);
|
||||
we reuse the existing dataclass parser unchanged.
|
||||
"""
|
||||
reply = await self._conn.request(
|
||||
OmniLinkMessageType.RequestSystemInformation
|
||||
)
|
||||
self._expect(reply.opcode, OmniLinkMessageType.SystemInformation)
|
||||
return SystemInformation.parse(reply.payload)
|
||||
|
||||
async def get_system_status(self) -> SystemStatus:
|
||||
"""Fetch panel time, sunrise/sunset, battery reading, area modes."""
|
||||
reply = await self._conn.request(
|
||||
OmniLinkMessageType.RequestSystemStatus
|
||||
)
|
||||
self._expect(reply.opcode, OmniLinkMessageType.SystemStatus)
|
||||
return parse_v1_system_status(reply.payload)
|
||||
|
||||
# ---- bulk per-object status ----------------------------------------
|
||||
|
||||
async def get_zone_status(
|
||||
self, start: int, end: int
|
||||
) -> dict[int, ZoneStatus]:
|
||||
return await self._range_status(
|
||||
OmniLinkMessageType.RequestZoneStatus,
|
||||
OmniLinkMessageType.ZoneStatus,
|
||||
start,
|
||||
end,
|
||||
parse_v1_zone_status,
|
||||
)
|
||||
|
||||
async def get_unit_status(
|
||||
self, start: int, end: int
|
||||
) -> dict[int, UnitStatus]:
|
||||
return await self._range_status(
|
||||
OmniLinkMessageType.RequestUnitStatus,
|
||||
OmniLinkMessageType.UnitStatus,
|
||||
start,
|
||||
end,
|
||||
parse_v1_unit_status,
|
||||
)
|
||||
|
||||
async def get_thermostat_status(
|
||||
self, start: int, end: int
|
||||
) -> dict[int, ThermostatStatus]:
|
||||
return await self._range_status(
|
||||
OmniLinkMessageType.RequestThermostatStatus,
|
||||
OmniLinkMessageType.ThermostatStatus,
|
||||
start,
|
||||
end,
|
||||
parse_v1_thermostat_status,
|
||||
)
|
||||
|
||||
async def get_aux_status(
|
||||
self, start: int, end: int
|
||||
) -> dict[int, AuxSensorStatus]:
|
||||
return await self._range_status(
|
||||
OmniLinkMessageType.RequestAuxiliaryStatus,
|
||||
OmniLinkMessageType.AuxiliaryStatus,
|
||||
start,
|
||||
end,
|
||||
parse_v1_aux_status,
|
||||
)
|
||||
|
||||
# ---- discovery (streaming UploadNames) ------------------------------
|
||||
|
||||
async def iter_names(self) -> AsyncIterator[NameRecord]:
|
||||
"""Stream every defined name from the panel.
|
||||
|
||||
v1 has no per-type name request — a bare ``UploadNames`` triggers
|
||||
the panel to dump *all* defined names of *all* types in a fixed
|
||||
order (Zone, Unit, Button, Code, Area, Thermostat, Message, …),
|
||||
each as a separate ``NameData`` reply that the client must
|
||||
``Acknowledge`` to advance. This iterator handles the lock-step
|
||||
protocol and yields each record as it arrives.
|
||||
|
||||
Reference: clsHAC.cs:4418 (sends bare UploadNames),
|
||||
OL1ReadConfigHandleResponse (loops over NameData/EOD).
|
||||
"""
|
||||
async for reply in self._conn.iter_streaming(
|
||||
OmniLinkMessageType.UploadNames
|
||||
):
|
||||
if reply.opcode != int(OmniLinkMessageType.NameData):
|
||||
# Defensive — iter_streaming normally only yields
|
||||
# non-EOD/NAK replies, so this is a wire-format fault.
|
||||
raise OmniProtocolError(
|
||||
f"unexpected opcode {reply.opcode} during UploadNames stream "
|
||||
f"(expected {int(OmniLinkMessageType.NameData)})"
|
||||
)
|
||||
yield parse_v1_namedata(reply.payload)
|
||||
|
||||
async def list_all_names(self) -> dict[int, dict[int, str]]:
|
||||
"""Bucket every defined name by ``NameType``.
|
||||
|
||||
Returns ``{name_type: {object_number: name}}``. Useful when HA
|
||||
needs all four (zones+units+areas+thermostats) in one pass —
|
||||
cheaper than four separate streams since the panel only supports
|
||||
one streaming session at a time anyway.
|
||||
"""
|
||||
out: dict[int, dict[int, str]] = {}
|
||||
async for rec in self.iter_names():
|
||||
out.setdefault(rec.name_type, {})[rec.number] = rec.name
|
||||
return out
|
||||
|
||||
async def list_zone_names(self) -> dict[int, str]:
|
||||
return (await self.list_all_names()).get(int(NameType.ZONE), {})
|
||||
|
||||
async def list_unit_names(self) -> dict[int, str]:
|
||||
return (await self.list_all_names()).get(int(NameType.UNIT), {})
|
||||
|
||||
async def list_area_names(self) -> dict[int, str]:
|
||||
return (await self.list_all_names()).get(int(NameType.AREA), {})
|
||||
|
||||
async def list_thermostat_names(self) -> dict[int, str]:
|
||||
return (await self.list_all_names()).get(int(NameType.THERMOSTAT), {})
|
||||
|
||||
async def list_button_names(self) -> dict[int, str]:
|
||||
return (await self.list_all_names()).get(int(NameType.BUTTON), {})
|
||||
|
||||
# ---- write methods (Command + ExecuteSecurityCommand) ----------------
|
||||
#
|
||||
# The Command and ExecuteSecurityCommand payloads are byte-identical
|
||||
# between v1 and v2 — only the outer opcode differs (15 vs 20 for
|
||||
# Command, 102 vs 74 for ExecuteSecurityCommand). So these methods are
|
||||
# near-duplicates of OmniClient's, just routed through the v1 opcodes.
|
||||
# Reference: clsOLMsgCommand.cs, clsOLMsgExecuteSecurityCommand.cs.
|
||||
|
||||
async def execute_command(
|
||||
self,
|
||||
command: Command,
|
||||
parameter1: int = 0,
|
||||
parameter2: int = 0,
|
||||
) -> None:
|
||||
"""Send a generic Command (v1 opcode 15).
|
||||
|
||||
Wire payload (4 bytes, identical to v2 form):
|
||||
[0] command byte (enuUnitCommand value)
|
||||
[1] parameter1 (single byte; brightness, mode, code index, ...)
|
||||
[2] parameter2 high byte (BE u16)
|
||||
[3] parameter2 low byte (object number for nearly every command)
|
||||
|
||||
Panel acks with v1 Ack (opcode 5) on success, Nak (6) on failure.
|
||||
"""
|
||||
if not 0 <= parameter1 <= 0xFF:
|
||||
raise ValueError(f"parameter1 must fit in a byte: {parameter1}")
|
||||
if not 0 <= parameter2 <= 0xFFFF:
|
||||
raise ValueError(f"parameter2 must fit in u16: {parameter2}")
|
||||
payload = struct.pack(
|
||||
">BBH", int(command), parameter1 & 0xFF, parameter2 & 0xFFFF
|
||||
)
|
||||
reply = await self._conn.request(OmniLinkMessageType.Command, payload)
|
||||
if reply.opcode == int(OmniLinkMessageType.Nak):
|
||||
raise CommandFailedError(
|
||||
f"panel NAK'd Command {command.name} "
|
||||
f"(p1={parameter1}, p2={parameter2})"
|
||||
)
|
||||
if reply.opcode != int(OmniLinkMessageType.Ack):
|
||||
raise CommandFailedError(
|
||||
f"unexpected reply to Command {command.name}: opcode={reply.opcode}"
|
||||
)
|
||||
|
||||
async def execute_security_command(
|
||||
self,
|
||||
area: int,
|
||||
mode: SecurityMode,
|
||||
code: int,
|
||||
) -> None:
|
||||
"""Arm or disarm a security area (v1 opcode 102).
|
||||
|
||||
Wire payload (6 bytes, identical to v2 form — clsOLMsgExecuteSecurityCommand.cs):
|
||||
[0] area number (1-based)
|
||||
[1] security mode byte (raw enuSecurityMode 0..7)
|
||||
[2] code digit 1 (thousands)
|
||||
[3] code digit 2 (hundreds)
|
||||
[4] code digit 3 (tens)
|
||||
[5] code digit 4 (ones)
|
||||
|
||||
Panel responds with:
|
||||
* ``ExecuteSecurityCommandResponse`` (103) carrying a status byte
|
||||
(0 = success, see :class:`SecurityCommandResponse` for others), or
|
||||
* ``Ack`` (5) on success without structured response, or
|
||||
* ``Nak`` (6) on flat-out refusal.
|
||||
|
||||
Raises:
|
||||
ValueError: ``area`` not 1..255 or ``code`` not 0..9999.
|
||||
CommandFailedError: panel Nak'd OR response status was non-zero;
|
||||
``failure_code`` carries the raw status byte when present.
|
||||
"""
|
||||
if not 1 <= area <= 0xFF:
|
||||
raise ValueError(f"area out of range: {area}")
|
||||
if not 0 <= code <= 9999:
|
||||
raise ValueError(f"code out of range (0000-9999): {code}")
|
||||
d1 = (code // 1000) % 10
|
||||
d2 = (code // 100) % 10
|
||||
d3 = (code // 10) % 10
|
||||
d4 = code % 10
|
||||
payload = bytes([area & 0xFF, int(mode) & 0xFF, d1, d2, d3, d4])
|
||||
reply = await self._conn.request(
|
||||
OmniLinkMessageType.ExecuteSecurityCommand, payload
|
||||
)
|
||||
if reply.opcode == int(OmniLinkMessageType.Nak):
|
||||
raise CommandFailedError(
|
||||
f"panel NAK'd ExecuteSecurityCommand "
|
||||
f"(area={area}, mode={mode.name})"
|
||||
)
|
||||
if reply.opcode == int(OmniLinkMessageType.ExecuteSecurityCommandResponse):
|
||||
if not reply.payload:
|
||||
raise CommandFailedError(
|
||||
"ExecuteSecurityCommandResponse with empty payload"
|
||||
)
|
||||
status = reply.payload[0]
|
||||
if status != int(SecurityCommandResponse.SUCCESS):
|
||||
try:
|
||||
label = SecurityCommandResponse(status).name
|
||||
except ValueError:
|
||||
label = f"unknown({status})"
|
||||
raise CommandFailedError(
|
||||
f"ExecuteSecurityCommand failed: {label}",
|
||||
failure_code=status,
|
||||
)
|
||||
return
|
||||
if reply.opcode == int(OmniLinkMessageType.Ack):
|
||||
return
|
||||
raise CommandFailedError(
|
||||
f"unexpected reply to ExecuteSecurityCommand: opcode={reply.opcode}"
|
||||
)
|
||||
|
||||
async def acknowledge_alerts(self) -> None:
|
||||
"""V1 has no AcknowledgeAlerts opcode — silently no-op.
|
||||
|
||||
v2 introduced :attr:`OmniLink2MessageType.AcknowledgeAlerts` (60)
|
||||
as a dedicated panel-wide ack; v1 panels expect alerts to be
|
||||
cleared by per-area arming or by user action at the keypad. To
|
||||
keep the v1↔v2 method shape parallel, this method is a no-op so
|
||||
HA service callers don't need a per-transport branch.
|
||||
"""
|
||||
return
|
||||
|
||||
# ---- thin command wrappers (one-liner conveniences) ------------------
|
||||
|
||||
async def turn_unit_on(self, index: int) -> None:
|
||||
await self.execute_command(Command.UNIT_ON, parameter2=index)
|
||||
|
||||
async def turn_unit_off(self, index: int) -> None:
|
||||
await self.execute_command(Command.UNIT_OFF, parameter2=index)
|
||||
|
||||
async def set_unit_level(self, index: int, percent: int) -> None:
|
||||
if not 0 <= percent <= 100:
|
||||
raise ValueError(f"percent must be 0..100: {percent}")
|
||||
await self.execute_command(
|
||||
Command.UNIT_LEVEL, parameter1=percent, parameter2=index
|
||||
)
|
||||
|
||||
async def bypass_zone(self, index: int, code: int = 0) -> None:
|
||||
await self.execute_command(
|
||||
Command.BYPASS_ZONE, parameter1=code, parameter2=index
|
||||
)
|
||||
|
||||
async def restore_zone(self, index: int, code: int = 0) -> None:
|
||||
await self.execute_command(
|
||||
Command.RESTORE_ZONE, parameter1=code, parameter2=index
|
||||
)
|
||||
|
||||
async def execute_button(self, index: int) -> None:
|
||||
await self.execute_command(Command.EXECUTE_BUTTON, parameter2=index)
|
||||
|
||||
async def set_thermostat_system_mode(self, index: int, mode_value: int) -> None:
|
||||
if not 0 <= mode_value <= 0xFF:
|
||||
raise ValueError(f"mode value must fit in a byte: {mode_value}")
|
||||
await self.execute_command(
|
||||
Command.SET_THERMOSTAT_SYSTEM_MODE,
|
||||
parameter1=mode_value,
|
||||
parameter2=index,
|
||||
)
|
||||
|
||||
async def set_thermostat_fan_mode(self, index: int, mode_value: int) -> None:
|
||||
await self.execute_command(
|
||||
Command.SET_THERMOSTAT_FAN_MODE,
|
||||
parameter1=mode_value,
|
||||
parameter2=index,
|
||||
)
|
||||
|
||||
async def set_thermostat_hold_mode(self, index: int, mode_value: int) -> None:
|
||||
await self.execute_command(
|
||||
Command.SET_THERMOSTAT_HOLD_MODE,
|
||||
parameter1=mode_value,
|
||||
parameter2=index,
|
||||
)
|
||||
|
||||
async def set_thermostat_heat_setpoint_raw(
|
||||
self, index: int, raw_temp: int
|
||||
) -> None:
|
||||
"""Set the heat setpoint by raw byte value (Omni temperature scale).
|
||||
|
||||
Use the same :func:`omni_temp_to_celsius` family of helpers from
|
||||
:mod:`omni_pca.models` to convert from °C/°F if needed.
|
||||
"""
|
||||
if not 0 <= raw_temp <= 0xFF:
|
||||
raise ValueError(f"raw_temp must fit in a byte: {raw_temp}")
|
||||
await self.execute_command(
|
||||
Command.SET_THERMOSTAT_HEAT_SETPOINT,
|
||||
parameter1=raw_temp,
|
||||
parameter2=index,
|
||||
)
|
||||
|
||||
async def set_thermostat_cool_setpoint_raw(
|
||||
self, index: int, raw_temp: int
|
||||
) -> None:
|
||||
if not 0 <= raw_temp <= 0xFF:
|
||||
raise ValueError(f"raw_temp must fit in a byte: {raw_temp}")
|
||||
await self.execute_command(
|
||||
Command.SET_THERMOSTAT_COOL_SETPOINT,
|
||||
parameter1=raw_temp,
|
||||
parameter2=index,
|
||||
)
|
||||
|
||||
# ---- helpers --------------------------------------------------------
|
||||
|
||||
async def _range_status[T](
|
||||
self,
|
||||
request_op: OmniLinkMessageType,
|
||||
reply_op: OmniLinkMessageType,
|
||||
start: int,
|
||||
end: int,
|
||||
parser: Callable[[bytes, int], list[T]],
|
||||
) -> dict[int, T]:
|
||||
if not 1 <= start <= end <= 0xFF:
|
||||
raise ValueError(
|
||||
f"invalid range: start={start}, end={end} (must be 1..255 with start<=end)"
|
||||
)
|
||||
payload = bytes([start, end])
|
||||
reply = await self._conn.request(request_op, payload)
|
||||
self._expect(reply.opcode, reply_op)
|
||||
records = parser(reply.payload, start)
|
||||
return {r.index: r for r in records} # type: ignore[attr-defined]
|
||||
|
||||
@staticmethod
|
||||
def _expect(actual: int, expected: OmniLinkMessageType) -> None:
|
||||
if actual == int(OmniLinkMessageType.Nak):
|
||||
raise OmniNakError(
|
||||
f"panel NAK'd request expecting opcode {int(expected)} "
|
||||
f"({expected.name})"
|
||||
)
|
||||
if actual != int(expected):
|
||||
raise OmniProtocolError(
|
||||
f"unexpected reply opcode {actual}, want {int(expected)} "
|
||||
f"({expected.name})"
|
||||
)
|
||||
|
||||
|
||||
class OmniNakError(RuntimeError):
|
||||
"""Panel returned the v1 Nak opcode (6) instead of the expected reply.
|
||||
|
||||
Thrown when a feature the panel doesn't support is requested — e.g.
|
||||
``RequestZoneExtendedStatus`` on firmware 2.12 NAKs because only the
|
||||
non-extended ``RequestZoneStatus`` is supported.
|
||||
"""
|
||||
|
||||
|
||||
class OmniProtocolError(RuntimeError):
|
||||
"""Panel returned a reply opcode neither matching nor a NAK."""
|
||||
522
src/omni_pca/v1/connection.py
Normal file
522
src/omni_pca/v1/connection.py
Normal file
@ -0,0 +1,522 @@
|
||||
"""Async UDP connection to an Omni-Link controller speaking the v1 wire protocol.
|
||||
|
||||
Differs from :class:`omni_pca.connection.OmniConnection` in three ways:
|
||||
|
||||
1. **Transport**: UDP only. Each datagram carries exactly one outer Packet.
|
||||
2. **Outer packet type for messages**: ``OmniLinkMessage`` (0x10), not
|
||||
``OmniLink2Message`` (0x20). The 4-step handshake packets are identical.
|
||||
3. **Inner message format**: v1 ``Message`` with ``StartChar = 0x5A``
|
||||
(NonAddressable) carrying a v1 opcode, not the v2 ``StartChar = 0x21``
|
||||
carrying a v2 opcode.
|
||||
|
||||
The handshake itself (ClientRequestNewSession → ControllerAckNewSession →
|
||||
ClientRequestSecureSession → ControllerAckSecureSession) and the AES-128
|
||||
session key derivation are protocol-agnostic and we reuse the same crypto
|
||||
primitives.
|
||||
|
||||
Reference: clsOmniLinkConnection.cs (UDP path):
|
||||
udpConnect lines 1239-1295 open + queue ClientRequestNewSession
|
||||
udpListen lines 1298-1399 receive loop, dispatches replies
|
||||
udpHandleRequestNewSession lines 1401-1459 step 2 → step 3
|
||||
udpHandleRequestSecureSession lines 1461-1487 step 4 → OnlineSecure
|
||||
udpSend lines 1514-1560 outer PacketType = OmniLinkMessage (16)
|
||||
EncryptPacket lines 372-401 same crypto as TCP
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
import logging
|
||||
from collections.abc import AsyncIterator
|
||||
from enum import IntEnum
|
||||
from types import TracebackType
|
||||
|
||||
from ..crypto import (
|
||||
BLOCK_SIZE,
|
||||
decrypt_message_payload,
|
||||
derive_session_key,
|
||||
encrypt_message_payload,
|
||||
)
|
||||
from ..message import (
|
||||
START_CHAR_V1_UNADDRESSED,
|
||||
Message,
|
||||
MessageCrcError,
|
||||
)
|
||||
from ..opcodes import OmniLinkMessageType, PacketType
|
||||
from ..packet import Packet
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
_DEFAULT_PORT = 4369
|
||||
_SESSION_ID_LEN = 5
|
||||
_PROTO_VERSION = (0x00, 0x01)
|
||||
_MAX_SEQ = 0xFFFF
|
||||
|
||||
|
||||
class ConnectionState(IntEnum):
|
||||
DISCONNECTED = 0
|
||||
CONNECTING = 1
|
||||
NEW_SESSION = 2
|
||||
SECURE = 3
|
||||
ONLINE = 4
|
||||
|
||||
|
||||
class ConnectionError(OSError): # noqa: A001 - intentional shadow at module scope
|
||||
pass
|
||||
|
||||
|
||||
class HandshakeError(ConnectionError):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidEncryptionKeyError(HandshakeError):
|
||||
"""Controller answered ``ControllerSessionTerminated`` during handshake."""
|
||||
|
||||
|
||||
class ProtocolError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class RequestTimeoutError(TimeoutError):
|
||||
pass
|
||||
|
||||
|
||||
class OmniConnectionV1:
|
||||
"""UDP + v1-wire-format connection to an Omni-Link controller."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
port: int = _DEFAULT_PORT,
|
||||
controller_key: bytes = b"",
|
||||
timeout: float = 5.0,
|
||||
retry_count: int = 3,
|
||||
) -> None:
|
||||
if len(controller_key) != 16:
|
||||
raise ValueError(
|
||||
f"controller_key must be 16 bytes, got {len(controller_key)}"
|
||||
)
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._controller_key = bytes(controller_key)
|
||||
self._default_timeout = timeout
|
||||
self._retry_count = max(0, retry_count)
|
||||
|
||||
self._udp_transport: asyncio.DatagramTransport | None = None
|
||||
self._udp_protocol: _OmniDatagramProtocol | None = None
|
||||
|
||||
self._state = ConnectionState.DISCONNECTED
|
||||
self._session_id: bytes | None = None
|
||||
self._session_key: bytes | None = None
|
||||
|
||||
# First wire packet uses seq=1; wraparound skips 0 (reserved for
|
||||
# unsolicited inbound). See clsOmniLinkConnection.cs:1251 (UDP
|
||||
# init pktSequence=1, then udpSend pre-increments).
|
||||
self._next_seq: int = 1
|
||||
|
||||
self._pending: dict[int, asyncio.Future[Packet]] = {}
|
||||
self._unsolicited_queue: asyncio.Queue[Message] = asyncio.Queue()
|
||||
|
||||
self._handshake_event: asyncio.Event = asyncio.Event()
|
||||
self._handshake_packet: Packet | None = None
|
||||
self._handshake_error: Exception | None = None
|
||||
|
||||
self._closed = False
|
||||
|
||||
@property
|
||||
def state(self) -> ConnectionState:
|
||||
return self._state
|
||||
|
||||
@property
|
||||
def session_key(self) -> bytes | None:
|
||||
return self._session_key
|
||||
|
||||
async def __aenter__(self) -> OmniConnectionV1:
|
||||
await self.connect()
|
||||
return self
|
||||
|
||||
async def __aexit__(
|
||||
self,
|
||||
exc_type: type[BaseException] | None,
|
||||
exc: BaseException | None,
|
||||
tb: TracebackType | None,
|
||||
) -> None:
|
||||
await self.close()
|
||||
|
||||
async def connect(self) -> None:
|
||||
if self._state is not ConnectionState.DISCONNECTED:
|
||||
raise ConnectionError(
|
||||
f"already connecting/connected (state={self._state})"
|
||||
)
|
||||
self._state = ConnectionState.CONNECTING
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
self._udp_transport, self._udp_protocol = (
|
||||
await loop.create_datagram_endpoint(
|
||||
lambda: _OmniDatagramProtocol(self),
|
||||
remote_addr=(self._host, self._port),
|
||||
)
|
||||
)
|
||||
except (TimeoutError, OSError) as exc:
|
||||
self._state = ConnectionState.DISCONNECTED
|
||||
raise ConnectionError(f"failed to open UDP socket: {exc}") from exc
|
||||
|
||||
try:
|
||||
await self._do_handshake()
|
||||
except BaseException:
|
||||
await self.close()
|
||||
raise
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Tear down. Politely terminate the panel session first.
|
||||
|
||||
Without ClientSessionTerminated the panel keeps our slot allocated
|
||||
until its idle timeout — and rejects subsequent connect attempts
|
||||
with ControllerCannotStartNewSession (0x07).
|
||||
"""
|
||||
if self._closed:
|
||||
return
|
||||
self._closed = True
|
||||
previous_state = self._state
|
||||
self._state = ConnectionState.DISCONNECTED
|
||||
|
||||
if previous_state in (
|
||||
ConnectionState.NEW_SESSION,
|
||||
ConnectionState.SECURE,
|
||||
ConnectionState.ONLINE,
|
||||
):
|
||||
try:
|
||||
term = Packet(
|
||||
seq=self._claim_seq(),
|
||||
type=PacketType.ClientSessionTerminated,
|
||||
data=b"",
|
||||
)
|
||||
self._write_packet(term)
|
||||
except Exception as exc: # noqa: BLE001 - close() must be idempotent
|
||||
_log.debug("close: failed to send ClientSessionTerminated: %s", exc)
|
||||
|
||||
for fut in self._pending.values():
|
||||
if not fut.done():
|
||||
fut.set_exception(ConnectionError("connection closed"))
|
||||
self._pending.clear()
|
||||
|
||||
if self._udp_transport is not None:
|
||||
with contextlib.suppress(OSError):
|
||||
self._udp_transport.close()
|
||||
self._udp_transport = None
|
||||
self._udp_protocol = None
|
||||
|
||||
# ---- public request API ---------------------------------------------
|
||||
|
||||
async def request(
|
||||
self,
|
||||
opcode: OmniLinkMessageType | int,
|
||||
payload: bytes = b"",
|
||||
timeout: float | None = None,
|
||||
) -> Message:
|
||||
"""Send a v1 request, await the matching reply, return the inner Message."""
|
||||
if self._state is not ConnectionState.ONLINE:
|
||||
raise ConnectionError(
|
||||
f"cannot send request, connection state={self._state.name}"
|
||||
)
|
||||
message = Message(
|
||||
start_char=START_CHAR_V1_UNADDRESSED,
|
||||
data=bytes([int(opcode)]) + payload,
|
||||
)
|
||||
per_attempt_timeout = timeout if timeout is not None else self._default_timeout
|
||||
max_attempts = 1 + self._retry_count
|
||||
last_exc: Exception | None = None
|
||||
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
seq, fut = self._send_encrypted(message)
|
||||
try:
|
||||
reply_packet = await asyncio.wait_for(fut, per_attempt_timeout)
|
||||
except TimeoutError as exc:
|
||||
last_exc = exc
|
||||
self._pending.pop(seq, None)
|
||||
if attempt < max_attempts:
|
||||
_log.debug(
|
||||
"udp v1 retry %d/%d on opcode=%d seq=%d",
|
||||
attempt, max_attempts, int(opcode), seq,
|
||||
)
|
||||
continue
|
||||
raise RequestTimeoutError(
|
||||
f"no v1 reply for opcode={int(opcode)} "
|
||||
f"after {max_attempts} attempt(s)"
|
||||
) from last_exc
|
||||
return self._decode_inner(reply_packet)
|
||||
raise RequestTimeoutError(
|
||||
f"request loop exited without reply for opcode={int(opcode)}"
|
||||
)
|
||||
|
||||
async def iter_streaming(
|
||||
self,
|
||||
initial_op: OmniLinkMessageType | int,
|
||||
*,
|
||||
ack_op: OmniLinkMessageType | int = OmniLinkMessageType.Ack,
|
||||
end_op: OmniLinkMessageType | int = OmniLinkMessageType.EOD,
|
||||
nak_op: OmniLinkMessageType | int = OmniLinkMessageType.Nak,
|
||||
timeout: float | None = None,
|
||||
) -> AsyncIterator[Message]:
|
||||
"""Drive a v1 lock-step streaming download (UploadNames / UploadSetup / etc).
|
||||
|
||||
Sends ``initial_op`` (no payload), yields each ``ack_op``-elicited
|
||||
reply, and stops when the panel sends ``end_op``. ``nak_op`` is
|
||||
treated as an immediate end-of-stream — no exception (some
|
||||
firmwares use NAK to signal "no records to upload").
|
||||
|
||||
Unlike :meth:`request` we don't retry on timeout — losing a
|
||||
reply mid-stream desynchronises the conversation, so the right
|
||||
answer is to surface the timeout and let the caller restart.
|
||||
"""
|
||||
if self._state is not ConnectionState.ONLINE:
|
||||
raise ConnectionError(
|
||||
f"cannot stream, connection state={self._state.name}"
|
||||
)
|
||||
per_reply_timeout = timeout if timeout is not None else self._default_timeout
|
||||
|
||||
# Step 1: send the initial bare-opcode request, wait for first reply.
|
||||
first_msg = Message(
|
||||
start_char=START_CHAR_V1_UNADDRESSED,
|
||||
data=bytes([int(initial_op)]),
|
||||
)
|
||||
seq, fut = self._send_encrypted(first_msg)
|
||||
try:
|
||||
reply_pkt = await asyncio.wait_for(fut, per_reply_timeout)
|
||||
except TimeoutError as exc:
|
||||
self._pending.pop(seq, None)
|
||||
raise RequestTimeoutError(
|
||||
f"no first reply to streaming opcode={int(initial_op)}"
|
||||
) from exc
|
||||
reply = self._decode_inner(reply_pkt)
|
||||
|
||||
# Step 2..N: ack-and-receive until end_op or nak_op.
|
||||
while True:
|
||||
if reply.opcode == int(end_op) or reply.opcode == int(nak_op):
|
||||
return
|
||||
yield reply
|
||||
|
||||
ack_msg = Message(
|
||||
start_char=START_CHAR_V1_UNADDRESSED,
|
||||
data=bytes([int(ack_op)]),
|
||||
)
|
||||
seq, fut = self._send_encrypted(ack_msg)
|
||||
try:
|
||||
reply_pkt = await asyncio.wait_for(fut, per_reply_timeout)
|
||||
except TimeoutError as exc:
|
||||
self._pending.pop(seq, None)
|
||||
raise RequestTimeoutError(
|
||||
f"no reply after streaming Ack (seq={seq})"
|
||||
) from exc
|
||||
reply = self._decode_inner(reply_pkt)
|
||||
|
||||
def unsolicited(self) -> AsyncIterator[Message]:
|
||||
queue = self._unsolicited_queue
|
||||
|
||||
async def _gen() -> AsyncIterator[Message]:
|
||||
while True:
|
||||
yield await queue.get()
|
||||
|
||||
return _gen()
|
||||
|
||||
# ---- handshake -------------------------------------------------------
|
||||
|
||||
async def _do_handshake(self) -> None:
|
||||
# Step 1: empty ClientRequestNewSession.
|
||||
self._state = ConnectionState.NEW_SESSION
|
||||
step1 = Packet(
|
||||
seq=self._claim_seq(),
|
||||
type=PacketType.ClientRequestNewSession,
|
||||
data=b"",
|
||||
)
|
||||
self._write_packet(step1)
|
||||
|
||||
# Step 2: ControllerAckNewSession (carries protocol version + SessionID).
|
||||
ack1 = await self._await_handshake_packet()
|
||||
if ack1.type is PacketType.ControllerCannotStartNewSession:
|
||||
raise HandshakeError("controller cannot start new session (busy?)")
|
||||
if ack1.type is not PacketType.ControllerAckNewSession:
|
||||
raise HandshakeError(f"unexpected step-2 packet type {ack1.type.name}")
|
||||
if len(ack1.data) < 7:
|
||||
raise HandshakeError(
|
||||
f"ControllerAckNewSession payload too short: {len(ack1.data)} bytes"
|
||||
)
|
||||
if (ack1.data[0], ack1.data[1]) != _PROTO_VERSION:
|
||||
raise HandshakeError(
|
||||
f"unsupported protocol version {ack1.data[0]:#04x}{ack1.data[1]:02x}"
|
||||
)
|
||||
self._session_id = bytes(ack1.data[2 : 2 + _SESSION_ID_LEN])
|
||||
self._session_key = derive_session_key(self._controller_key, self._session_id)
|
||||
|
||||
# Step 3: encrypted ClientRequestSecureSession echoing SessionID.
|
||||
self._state = ConnectionState.SECURE
|
||||
step3_seq = self._claim_seq()
|
||||
step3_ct = encrypt_message_payload(
|
||||
self._session_id, step3_seq, self._session_key
|
||||
)
|
||||
step3 = Packet(
|
||||
seq=step3_seq,
|
||||
type=PacketType.ClientRequestSecureSession,
|
||||
data=step3_ct,
|
||||
)
|
||||
self._write_packet(step3)
|
||||
|
||||
# Step 4: ControllerAckSecureSession (or termination).
|
||||
ack2 = await self._await_handshake_packet()
|
||||
if ack2.type is PacketType.ControllerSessionTerminated:
|
||||
raise InvalidEncryptionKeyError(
|
||||
"controller terminated session during handshake (wrong ControllerKey?)"
|
||||
)
|
||||
if ack2.type is not PacketType.ControllerAckSecureSession:
|
||||
raise HandshakeError(
|
||||
f"unexpected step-4 packet type {ack2.type.name}"
|
||||
)
|
||||
self._state = ConnectionState.ONLINE
|
||||
|
||||
async def _await_handshake_packet(self) -> Packet:
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
self._handshake_event.wait(), self._default_timeout
|
||||
)
|
||||
except TimeoutError as exc:
|
||||
raise HandshakeError(
|
||||
"timeout waiting for controller handshake reply"
|
||||
) from exc
|
||||
if self._handshake_error is not None:
|
||||
err = self._handshake_error
|
||||
self._handshake_error = None
|
||||
raise err
|
||||
pkt = self._handshake_packet
|
||||
self._handshake_packet = None
|
||||
self._handshake_event.clear()
|
||||
if pkt is None:
|
||||
raise HandshakeError("handshake event fired with no packet")
|
||||
return pkt
|
||||
|
||||
# ---- send / receive helpers -----------------------------------------
|
||||
|
||||
def _claim_seq(self) -> int:
|
||||
seq = self._next_seq
|
||||
nxt = seq + 1
|
||||
if nxt > _MAX_SEQ or nxt == 0:
|
||||
nxt = 1
|
||||
self._next_seq = nxt
|
||||
return seq
|
||||
|
||||
def _send_encrypted(
|
||||
self, inner: Message
|
||||
) -> tuple[int, asyncio.Future[Packet]]:
|
||||
if self._session_key is None:
|
||||
raise ConnectionError("no session key (handshake not complete)")
|
||||
seq = self._claim_seq()
|
||||
plaintext = inner.encode()
|
||||
ciphertext = encrypt_message_payload(plaintext, seq, self._session_key)
|
||||
# KEY DIFFERENCE FROM V2: outer type is OmniLinkMessage (0x10),
|
||||
# not OmniLink2Message (0x20). See clsOmniLinkConnection.cs:1536.
|
||||
pkt = Packet(seq=seq, type=PacketType.OmniLinkMessage, data=ciphertext)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
fut: asyncio.Future[Packet] = loop.create_future()
|
||||
self._pending[seq] = fut
|
||||
self._write_packet(pkt)
|
||||
return seq, fut
|
||||
|
||||
def _write_packet(self, pkt: Packet) -> None:
|
||||
if self._udp_transport is None:
|
||||
raise ConnectionError("transport not open")
|
||||
wire = pkt.encode()
|
||||
_log.debug(
|
||||
"TX seq=%d type=%s len=%d", pkt.seq, pkt.type.name, len(pkt.data)
|
||||
)
|
||||
self._udp_transport.sendto(wire)
|
||||
|
||||
def _decode_inner(self, pkt: Packet) -> Message:
|
||||
if self._session_key is None:
|
||||
raise ConnectionError("no session key")
|
||||
if not pkt.data:
|
||||
raise ProtocolError("empty packet data")
|
||||
plaintext = decrypt_message_payload(pkt.data, pkt.seq, self._session_key)
|
||||
try:
|
||||
return Message.decode(plaintext)
|
||||
except MessageCrcError as exc:
|
||||
raise ProtocolError(f"inner v1 message CRC mismatch: {exc}") from exc
|
||||
|
||||
# ---- inbound dispatch (called from the datagram protocol) -----------
|
||||
|
||||
def _dispatch(self, pkt: Packet) -> None:
|
||||
if pkt.data is None:
|
||||
pkt = Packet(seq=pkt.seq, type=pkt.type, data=b"")
|
||||
|
||||
if self._state in (ConnectionState.NEW_SESSION, ConnectionState.SECURE):
|
||||
handshake_types = {
|
||||
PacketType.ControllerAckNewSession,
|
||||
PacketType.ControllerAckSecureSession,
|
||||
PacketType.ControllerSessionTerminated,
|
||||
PacketType.ControllerCannotStartNewSession,
|
||||
}
|
||||
if pkt.type in handshake_types:
|
||||
self._handshake_packet = pkt
|
||||
self._handshake_event.set()
|
||||
return
|
||||
|
||||
if pkt.seq == 0:
|
||||
if pkt.type is PacketType.OmniLinkMessage:
|
||||
try:
|
||||
msg = self._decode_inner(pkt)
|
||||
except (ProtocolError, ConnectionError) as exc:
|
||||
_log.warning(
|
||||
"dropping malformed unsolicited v1 packet: %s", exc
|
||||
)
|
||||
return
|
||||
try:
|
||||
self._unsolicited_queue.put_nowait(msg)
|
||||
except asyncio.QueueFull: # pragma: no cover - unbounded queue
|
||||
_log.warning("unsolicited queue full; dropping message")
|
||||
return
|
||||
|
||||
fut = self._pending.pop(pkt.seq, None)
|
||||
if fut is None:
|
||||
_log.debug(
|
||||
"no waiter for seq=%d type=%s; dropping",
|
||||
pkt.seq, pkt.type.name,
|
||||
)
|
||||
return
|
||||
if pkt.type is PacketType.ControllerSessionTerminated:
|
||||
fut.set_exception(ConnectionError("controller terminated session"))
|
||||
return
|
||||
if not fut.done():
|
||||
fut.set_result(pkt)
|
||||
|
||||
|
||||
class _OmniDatagramProtocol(asyncio.DatagramProtocol):
|
||||
"""asyncio.DatagramProtocol bound to a single OmniConnectionV1.
|
||||
|
||||
Each datagram is one complete Packet. We decode it and hand it to the
|
||||
connection's dispatcher; the dispatcher already knows how to sort
|
||||
handshake / solicited / unsolicited paths.
|
||||
"""
|
||||
|
||||
def __init__(self, conn: OmniConnectionV1) -> None:
|
||||
self._conn = conn
|
||||
|
||||
def connection_made(self, transport: asyncio.BaseTransport) -> None:
|
||||
pass
|
||||
|
||||
def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
|
||||
try:
|
||||
pkt = Packet.decode(data)
|
||||
except Exception as exc:
|
||||
_log.warning("dropping malformed UDP datagram: %s", exc)
|
||||
return
|
||||
try:
|
||||
self._conn._dispatch(pkt)
|
||||
except Exception:
|
||||
_log.exception("UDP v1 dispatch crashed for seq=%d", pkt.seq)
|
||||
|
||||
def error_received(self, exc: Exception) -> None:
|
||||
_log.warning("UDP v1 socket error: %s", exc)
|
||||
|
||||
def connection_lost(self, exc: Exception | None) -> None:
|
||||
if exc is not None:
|
||||
_log.warning("UDP v1 transport lost: %s", exc)
|
||||
310
src/omni_pca/v1/messages.py
Normal file
310
src/omni_pca/v1/messages.py
Normal file
@ -0,0 +1,310 @@
|
||||
"""V1 status-reply and name parsers.
|
||||
|
||||
The v1 wire protocol's typed status messages (ZoneStatus, UnitStatus,
|
||||
ThermostatStatus, AuxiliaryStatus) carry one record per object in the
|
||||
range the client requested — but, unlike v2's ExtendedStatus, the records
|
||||
do **not** include the object number. The starting index is implicit
|
||||
from the request payload, and each record is at a fixed offset.
|
||||
|
||||
This module supplies "block" parsers that take both the reply payload
|
||||
and the starting index, and produce a list of the existing top-level
|
||||
dataclasses (:class:`omni_pca.models.ZoneStatus` etc) so HA entity code
|
||||
doesn't need a v1-specific schema. The :func:`parse_v1_namedata` helper
|
||||
decodes the bulk-name-download replies streamed by ``UploadNames``.
|
||||
|
||||
Per-record byte counts (verified against firmware 2.12 over UDP):
|
||||
ZoneStatus 2 bytes per zone (status, analog_loop)
|
||||
UnitStatus 3 bytes per unit (status, time_hi, time_lo)
|
||||
ThermostatStatus 7 bytes per tstat (status, current_t, heat_sp,
|
||||
cool_sp, sys_mode, fan_mode,
|
||||
hold_mode)
|
||||
AuxiliaryStatus 4 bytes per aux (relay, current, low_sp,
|
||||
high_sp)
|
||||
|
||||
References:
|
||||
clsOLMsgZoneStatus.cs / clsOLMsgRequestZoneStatus.cs
|
||||
clsOLMsgUnitStatus.cs / clsOLMsgRequestUnitStatus.cs
|
||||
clsOLMsgThermostatStatus.cs / clsOLMsgRequestThermostatStatus.cs
|
||||
clsOLMsgAuxiliaryStatus.cs / clsOLMsgRequestAuxiliaryStatus.cs
|
||||
clsOLMsgSystemStatus.cs — v1 byte 14 = battery, then per-area Mode
|
||||
clsOLMsgNameData.cs — bulk name download record format
|
||||
enuNameType.cs — Zone=1 Unit=2 Button=3 Code=4 Area=5
|
||||
Tstat=6 Message=7 UserSetting=8
|
||||
AccessControlReader=9
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from enum import IntEnum
|
||||
|
||||
from ..models import (
|
||||
AuxSensorStatus,
|
||||
SystemStatus,
|
||||
ThermostatStatus,
|
||||
UnitStatus,
|
||||
ZoneStatus,
|
||||
)
|
||||
|
||||
_ZONE_RECORD_BYTES = 2
|
||||
_UNIT_RECORD_BYTES = 3
|
||||
_THERMOSTAT_RECORD_BYTES = 7
|
||||
_AUX_RECORD_BYTES = 4
|
||||
|
||||
|
||||
def parse_v1_zone_status(payload: bytes, start_index: int) -> list[ZoneStatus]:
|
||||
"""Parse a v1 ZoneStatus reply payload into per-zone dataclasses.
|
||||
|
||||
``payload`` is the inner Message ``payload`` (data minus opcode byte);
|
||||
its length must be a multiple of ``_ZONE_RECORD_BYTES``.
|
||||
"""
|
||||
if len(payload) % _ZONE_RECORD_BYTES != 0:
|
||||
raise ValueError(
|
||||
f"v1 ZoneStatus payload length {len(payload)} not a multiple of "
|
||||
f"{_ZONE_RECORD_BYTES}"
|
||||
)
|
||||
out: list[ZoneStatus] = []
|
||||
for i, off in enumerate(range(0, len(payload), _ZONE_RECORD_BYTES)):
|
||||
out.append(
|
||||
ZoneStatus(
|
||||
index=start_index + i,
|
||||
raw_status=payload[off],
|
||||
loop=payload[off + 1],
|
||||
)
|
||||
)
|
||||
return out
|
||||
|
||||
|
||||
def parse_v1_unit_status(payload: bytes, start_index: int) -> list[UnitStatus]:
|
||||
"""Parse a v1 UnitStatus reply payload into per-unit dataclasses."""
|
||||
if len(payload) % _UNIT_RECORD_BYTES != 0:
|
||||
raise ValueError(
|
||||
f"v1 UnitStatus payload length {len(payload)} not a multiple of "
|
||||
f"{_UNIT_RECORD_BYTES}"
|
||||
)
|
||||
out: list[UnitStatus] = []
|
||||
for i, off in enumerate(range(0, len(payload), _UNIT_RECORD_BYTES)):
|
||||
out.append(
|
||||
UnitStatus(
|
||||
index=start_index + i,
|
||||
state=payload[off],
|
||||
time_remaining_secs=(payload[off + 1] << 8) | payload[off + 2],
|
||||
)
|
||||
)
|
||||
return out
|
||||
|
||||
|
||||
def parse_v1_thermostat_status(
|
||||
payload: bytes, start_index: int
|
||||
) -> list[ThermostatStatus]:
|
||||
"""Parse a v1 ThermostatStatus reply payload into per-tstat dataclasses.
|
||||
|
||||
The v1 record only carries 7 fields; the v2 dataclass has 4 more
|
||||
(humidity, humidify_setpoint, dehumidify_setpoint, outdoor_temp,
|
||||
horc_status). We zero-fill those — HA's climate platform doesn't
|
||||
require them and an explicit 0 is more honest than a fake value.
|
||||
"""
|
||||
if len(payload) % _THERMOSTAT_RECORD_BYTES != 0:
|
||||
raise ValueError(
|
||||
f"v1 ThermostatStatus payload length {len(payload)} not a multiple "
|
||||
f"of {_THERMOSTAT_RECORD_BYTES}"
|
||||
)
|
||||
out: list[ThermostatStatus] = []
|
||||
for i, off in enumerate(range(0, len(payload), _THERMOSTAT_RECORD_BYTES)):
|
||||
out.append(
|
||||
ThermostatStatus(
|
||||
index=start_index + i,
|
||||
status=payload[off],
|
||||
temperature_raw=payload[off + 1],
|
||||
heat_setpoint_raw=payload[off + 2],
|
||||
cool_setpoint_raw=payload[off + 3],
|
||||
system_mode=payload[off + 4],
|
||||
fan_mode=payload[off + 5],
|
||||
hold_mode=payload[off + 6],
|
||||
humidity_raw=0,
|
||||
humidify_setpoint_raw=0,
|
||||
dehumidify_setpoint_raw=0,
|
||||
outdoor_temperature_raw=0,
|
||||
horc_status=0,
|
||||
)
|
||||
)
|
||||
return out
|
||||
|
||||
|
||||
def parse_v1_aux_status(payload: bytes, start_index: int) -> list[AuxSensorStatus]:
|
||||
"""Parse a v1 AuxiliaryStatus reply payload into per-aux dataclasses."""
|
||||
if len(payload) % _AUX_RECORD_BYTES != 0:
|
||||
raise ValueError(
|
||||
f"v1 AuxiliaryStatus payload length {len(payload)} not a multiple "
|
||||
f"of {_AUX_RECORD_BYTES}"
|
||||
)
|
||||
out: list[AuxSensorStatus] = []
|
||||
for i, off in enumerate(range(0, len(payload), _AUX_RECORD_BYTES)):
|
||||
out.append(
|
||||
AuxSensorStatus(
|
||||
index=start_index + i,
|
||||
output=payload[off],
|
||||
value_raw=payload[off + 1],
|
||||
low_raw=payload[off + 2],
|
||||
high_raw=payload[off + 3],
|
||||
)
|
||||
)
|
||||
return out
|
||||
|
||||
|
||||
def parse_v1_system_status(payload: bytes) -> SystemStatus:
|
||||
"""Parse a v1 SystemStatus reply.
|
||||
|
||||
Bytes 0..13 are byte-identical to v2 (time/date + sunrise/sunset +
|
||||
battery). After byte 13 v1 carries per-area Mode bytes (1 byte each)
|
||||
while v2 carries 2-byte alarm-flag pairs. We translate to the v2
|
||||
dataclass's ``area_alarms`` shape by promoting each v1 mode byte to
|
||||
a ``(mode, 0)`` tuple — that way HA code that already consumes
|
||||
:class:`SystemStatus` keeps working without a v1-specific branch.
|
||||
"""
|
||||
if len(payload) < 14:
|
||||
raise ValueError(
|
||||
f"v1 SystemStatus payload too short: {len(payload)} bytes"
|
||||
)
|
||||
time_valid = payload[0] != 0
|
||||
year = payload[1]
|
||||
month = payload[2]
|
||||
day = payload[3]
|
||||
# day_of_week = payload[4]
|
||||
hour = payload[5]
|
||||
minute = payload[6]
|
||||
second = payload[7]
|
||||
# daylight = payload[8]
|
||||
sunrise_h = payload[9]
|
||||
sunrise_m = payload[10]
|
||||
sunset_h = payload[11]
|
||||
sunset_m = payload[12]
|
||||
battery = payload[13]
|
||||
|
||||
panel_time: datetime | None = None
|
||||
if time_valid:
|
||||
try:
|
||||
panel_time = datetime(
|
||||
year=2000 + year,
|
||||
month=month,
|
||||
day=day,
|
||||
hour=hour,
|
||||
minute=minute,
|
||||
second=second,
|
||||
)
|
||||
except ValueError:
|
||||
panel_time = None
|
||||
|
||||
# Promote each v1 per-area mode byte to a (mode, 0) pair so the v2
|
||||
# area_alarms tuple shape carries the same information without a
|
||||
# second dataclass.
|
||||
mode_bytes = payload[14:]
|
||||
area_alarms = tuple((b, 0) for b in mode_bytes)
|
||||
|
||||
return SystemStatus(
|
||||
time_valid=time_valid,
|
||||
panel_time=panel_time,
|
||||
sunrise_hour=sunrise_h,
|
||||
sunrise_minute=sunrise_m,
|
||||
sunset_hour=sunset_h,
|
||||
sunset_minute=sunset_m,
|
||||
battery_reading=battery,
|
||||
area_alarms=area_alarms,
|
||||
)
|
||||
|
||||
|
||||
# ---- NameData --------------------------------------------------------------
|
||||
|
||||
|
||||
class NameType(IntEnum):
|
||||
"""Categories of named objects panels can stream over UploadNames.
|
||||
|
||||
Reference: enuNameType.cs.
|
||||
"""
|
||||
|
||||
ZONE = 1
|
||||
UNIT = 2
|
||||
BUTTON = 3
|
||||
CODE = 4
|
||||
AREA = 5
|
||||
THERMOSTAT = 6
|
||||
MESSAGE = 7
|
||||
USER_SETTING = 8
|
||||
ACCESS_CONTROL_READER = 9
|
||||
|
||||
|
||||
# Per-type max name length (clsCapOMNI_PRO_II.cs lines 55-71).
|
||||
# Other Omni models share these numbers — the few exceptions are
|
||||
# documented but not relevant for the panels we know speak v1+UDP.
|
||||
_NAME_TYPE_LENGTH: dict[int, int] = {
|
||||
NameType.ZONE: 15,
|
||||
NameType.UNIT: 12,
|
||||
NameType.BUTTON: 12,
|
||||
NameType.CODE: 12,
|
||||
NameType.AREA: 12,
|
||||
NameType.THERMOSTAT: 12,
|
||||
NameType.MESSAGE: 15,
|
||||
NameType.USER_SETTING: 15,
|
||||
NameType.ACCESS_CONTROL_READER: 15,
|
||||
}
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class NameRecord:
|
||||
"""One name record from a v1 ``NameData`` reply (opcode 11)."""
|
||||
|
||||
name_type: int
|
||||
number: int
|
||||
name: str
|
||||
|
||||
@property
|
||||
def name_type_label(self) -> str:
|
||||
try:
|
||||
return NameType(self.name_type).name
|
||||
except ValueError:
|
||||
return f"Unknown({self.name_type})"
|
||||
|
||||
|
||||
def parse_v1_namedata(payload: bytes) -> NameRecord:
|
||||
"""Decode a v1 ``NameData`` payload (opcode 11) into a :class:`NameRecord`.
|
||||
|
||||
Wire layout (per clsOLMsgNameData.cs, MessageLength is the
|
||||
full Data byte count including the opcode):
|
||||
|
||||
* One-byte form (NameNumber ≤ 255), MessageLength = 4 + NameTypeLen:
|
||||
``[opcode][type][num][name×L][\\0]`` — one trailing reserved byte.
|
||||
* Two-byte form (NameNumber > 255), MessageLength = 5 + NameTypeLen:
|
||||
``[opcode][type][num_hi][num_lo][name×L][\\0]``.
|
||||
|
||||
``payload`` here is the *inner* :attr:`Message.payload` (data minus
|
||||
the leading opcode), so the lengths to compare against are L+3 and
|
||||
L+4 respectively.
|
||||
"""
|
||||
if len(payload) < 3:
|
||||
raise ValueError(f"NameData payload too short: {len(payload)} bytes")
|
||||
name_type = payload[0]
|
||||
name_len = _NAME_TYPE_LENGTH.get(name_type)
|
||||
|
||||
if name_len is not None:
|
||||
# Disambiguate by payload length against the expected forms.
|
||||
one_byte_len = name_len + 3 # type + num + name + 1 trailing
|
||||
two_byte_len = name_len + 4 # type + num_hi + num_lo + name + 1 trailing
|
||||
if len(payload) >= two_byte_len:
|
||||
number = (payload[1] << 8) | payload[2]
|
||||
name_bytes = payload[3 : 3 + name_len]
|
||||
elif len(payload) >= one_byte_len:
|
||||
number = payload[1]
|
||||
name_bytes = payload[2 : 2 + name_len]
|
||||
else:
|
||||
# Short payload — best-effort one-byte decode of whatever is left.
|
||||
number = payload[1]
|
||||
name_bytes = payload[2:]
|
||||
else:
|
||||
# Unknown type — can't tell the form. Assume one-byte and consume
|
||||
# the rest; HA filters by known type anyway.
|
||||
number = payload[1]
|
||||
name_bytes = payload[2:]
|
||||
|
||||
name = name_bytes.split(b"\x00", 1)[0].decode("utf-8", errors="replace")
|
||||
return NameRecord(name_type=name_type, number=number, name=name)
|
||||
290
tests/test_v1_client_commands.py
Normal file
290
tests/test_v1_client_commands.py
Normal file
@ -0,0 +1,290 @@
|
||||
"""Unit tests for the OmniClientV1 write methods.
|
||||
|
||||
These exercise wire-payload construction by monkey-patching the
|
||||
connection's ``request`` method so we never have to open a UDP socket.
|
||||
The contract under test:
|
||||
|
||||
* :meth:`OmniClientV1.execute_command` packs ``[cmd][p1][p2_hi][p2_lo]``.
|
||||
* :meth:`OmniClientV1.execute_security_command` packs
|
||||
``[area][mode][d1][d2][d3][d4]`` with the C# digit-by-digit form.
|
||||
* Convenience wrappers (``turn_unit_on`` etc) route through
|
||||
:meth:`execute_command` with the right Command enum values.
|
||||
* Replies are interpreted: Ack → return, Nak → CommandFailedError,
|
||||
non-zero SecurityCommandResponse → CommandFailedError with code.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import struct
|
||||
|
||||
import pytest
|
||||
|
||||
from omni_pca.commands import (
|
||||
Command,
|
||||
CommandFailedError,
|
||||
SecurityCommandResponse,
|
||||
)
|
||||
from omni_pca.message import START_CHAR_V1_UNADDRESSED, Message
|
||||
from omni_pca.models import SecurityMode
|
||||
from omni_pca.opcodes import OmniLinkMessageType
|
||||
from omni_pca.v1.client import OmniClientV1
|
||||
|
||||
|
||||
class _FakeConn:
|
||||
"""Records each request, returns a canned reply.
|
||||
|
||||
Tests construct one with a list of (opcode, payload_bytes) replies in
|
||||
order; each call to :meth:`request` consumes one.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
replies: list[tuple[int, bytes]] | None = None,
|
||||
) -> None:
|
||||
self.replies = replies or []
|
||||
self.calls: list[tuple[int, bytes]] = []
|
||||
|
||||
async def request(
|
||||
self,
|
||||
opcode: int,
|
||||
payload: bytes = b"",
|
||||
timeout: float | None = None,
|
||||
) -> Message:
|
||||
self.calls.append((int(opcode), bytes(payload)))
|
||||
if not self.replies:
|
||||
# Default: panel ack — works for the boring success path.
|
||||
return Message(
|
||||
start_char=START_CHAR_V1_UNADDRESSED,
|
||||
data=bytes([int(OmniLinkMessageType.Ack)]),
|
||||
)
|
||||
reply_op, reply_payload = self.replies.pop(0)
|
||||
return Message(
|
||||
start_char=START_CHAR_V1_UNADDRESSED,
|
||||
data=bytes([reply_op]) + reply_payload,
|
||||
)
|
||||
|
||||
|
||||
def _make_client(replies: list[tuple[int, bytes]] | None = None) -> tuple[OmniClientV1, _FakeConn]:
|
||||
client = OmniClientV1(
|
||||
host="127.0.0.1",
|
||||
controller_key=b"\x00" * 16,
|
||||
)
|
||||
fake = _FakeConn(replies)
|
||||
# Swap out the real connection with our recorder.
|
||||
client._conn = fake # type: ignore[assignment]
|
||||
return client, fake
|
||||
|
||||
|
||||
# ---- execute_command ---------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_command_packs_payload_be() -> None:
|
||||
client, fake = _make_client()
|
||||
await client.execute_command(Command.UNIT_LEVEL, parameter1=42, parameter2=0x1234)
|
||||
assert len(fake.calls) == 1
|
||||
opcode, payload = fake.calls[0]
|
||||
assert opcode == int(OmniLinkMessageType.Command)
|
||||
# [cmd][p1][p2_hi][p2_lo]
|
||||
assert payload == struct.pack(">BBH", int(Command.UNIT_LEVEL), 42, 0x1234)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_command_rejects_oversized_parameters() -> None:
|
||||
client, _ = _make_client()
|
||||
with pytest.raises(ValueError, match="parameter1"):
|
||||
await client.execute_command(Command.UNIT_LEVEL, parameter1=256, parameter2=1)
|
||||
with pytest.raises(ValueError, match="parameter2"):
|
||||
await client.execute_command(Command.UNIT_LEVEL, parameter1=0, parameter2=0x10000)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_command_nak_raises_command_failed() -> None:
|
||||
client, _ = _make_client([(int(OmniLinkMessageType.Nak), b"")])
|
||||
with pytest.raises(CommandFailedError, match="NAK"):
|
||||
await client.execute_command(Command.UNIT_ON, parameter2=5)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_command_unexpected_reply_raises() -> None:
|
||||
# Panel returns SystemInformation reply to a Command request — that's bogus.
|
||||
client, _ = _make_client(
|
||||
[(int(OmniLinkMessageType.SystemInformation), b"\x00")]
|
||||
)
|
||||
with pytest.raises(CommandFailedError, match="unexpected reply"):
|
||||
await client.execute_command(Command.UNIT_ON, parameter2=5)
|
||||
|
||||
|
||||
# ---- thin wrappers -----------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_turn_unit_on_sends_unit_on_command() -> None:
|
||||
client, fake = _make_client()
|
||||
await client.turn_unit_on(7)
|
||||
opcode, payload = fake.calls[0]
|
||||
assert opcode == int(OmniLinkMessageType.Command)
|
||||
assert payload[0] == int(Command.UNIT_ON)
|
||||
assert (payload[2] << 8) | payload[3] == 7
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_turn_unit_off_sends_unit_off_command() -> None:
|
||||
client, fake = _make_client()
|
||||
await client.turn_unit_off(255)
|
||||
payload = fake.calls[0][1]
|
||||
assert payload[0] == int(Command.UNIT_OFF)
|
||||
assert (payload[2] << 8) | payload[3] == 255
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_unit_level_packs_percent_as_p1() -> None:
|
||||
client, fake = _make_client()
|
||||
await client.set_unit_level(3, 75)
|
||||
payload = fake.calls[0][1]
|
||||
assert payload[0] == int(Command.UNIT_LEVEL)
|
||||
assert payload[1] == 75
|
||||
assert (payload[2] << 8) | payload[3] == 3
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_unit_level_rejects_out_of_range_percent() -> None:
|
||||
client, _ = _make_client()
|
||||
with pytest.raises(ValueError, match="0..100"):
|
||||
await client.set_unit_level(1, 101)
|
||||
with pytest.raises(ValueError, match="0..100"):
|
||||
await client.set_unit_level(1, -1)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bypass_zone_packs_code_as_p1_and_zone_as_p2() -> None:
|
||||
client, fake = _make_client()
|
||||
await client.bypass_zone(12, code=5)
|
||||
payload = fake.calls[0][1]
|
||||
assert payload[0] == int(Command.BYPASS_ZONE)
|
||||
assert payload[1] == 5
|
||||
assert (payload[2] << 8) | payload[3] == 12
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_restore_zone_packs_code_and_zone() -> None:
|
||||
client, fake = _make_client()
|
||||
await client.restore_zone(99, code=3)
|
||||
payload = fake.calls[0][1]
|
||||
assert payload[0] == int(Command.RESTORE_ZONE)
|
||||
assert payload[1] == 3
|
||||
assert (payload[2] << 8) | payload[3] == 99
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_button() -> None:
|
||||
client, fake = _make_client()
|
||||
await client.execute_button(15)
|
||||
payload = fake.calls[0][1]
|
||||
assert payload[0] == int(Command.EXECUTE_BUTTON)
|
||||
assert (payload[2] << 8) | payload[3] == 15
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_thermostat_modes_route_through_command() -> None:
|
||||
client, fake = _make_client()
|
||||
await client.set_thermostat_system_mode(2, 1) # 1 = Heat
|
||||
await client.set_thermostat_fan_mode(2, 2) # 2 = On
|
||||
await client.set_thermostat_hold_mode(2, 1) # 1 = Hold
|
||||
cmds = [p[1][0] for p in fake.calls]
|
||||
assert cmds == [
|
||||
int(Command.SET_THERMOSTAT_SYSTEM_MODE),
|
||||
int(Command.SET_THERMOSTAT_FAN_MODE),
|
||||
int(Command.SET_THERMOSTAT_HOLD_MODE),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_thermostat_setpoint_raw_validates_byte_range() -> None:
|
||||
client, _ = _make_client()
|
||||
with pytest.raises(ValueError, match="raw_temp"):
|
||||
await client.set_thermostat_heat_setpoint_raw(1, 256)
|
||||
with pytest.raises(ValueError, match="raw_temp"):
|
||||
await client.set_thermostat_cool_setpoint_raw(1, -1)
|
||||
|
||||
|
||||
# ---- execute_security_command ------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_security_command_digit_packing() -> None:
|
||||
# Code 1234 → digits 1, 2, 3, 4.
|
||||
client, fake = _make_client([(int(OmniLinkMessageType.Ack), b"")])
|
||||
await client.execute_security_command(area=1, mode=SecurityMode.OFF, code=1234)
|
||||
opcode, payload = fake.calls[0]
|
||||
assert opcode == int(OmniLinkMessageType.ExecuteSecurityCommand)
|
||||
assert payload == bytes([1, int(SecurityMode.OFF), 1, 2, 3, 4])
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_security_command_pads_short_codes() -> None:
|
||||
# Code 7 → digits 0, 0, 0, 7.
|
||||
client, fake = _make_client([(int(OmniLinkMessageType.Ack), b"")])
|
||||
await client.execute_security_command(area=8, mode=SecurityMode.AWAY, code=7)
|
||||
payload = fake.calls[0][1]
|
||||
assert payload == bytes([8, int(SecurityMode.AWAY), 0, 0, 0, 7])
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_security_command_response_success_returns() -> None:
|
||||
# Panel returns ExecuteSecurityCommandResponse with status=0 (success).
|
||||
client, _ = _make_client(
|
||||
[(
|
||||
int(OmniLinkMessageType.ExecuteSecurityCommandResponse),
|
||||
bytes([int(SecurityCommandResponse.SUCCESS)]),
|
||||
)]
|
||||
)
|
||||
await client.execute_security_command(area=1, mode=SecurityMode.OFF, code=0)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_security_command_response_failure_raises() -> None:
|
||||
# Panel returns ExecuteSecurityCommandResponse with status=
|
||||
# SecureSystem (1) — wrong code or area not enabled for this code.
|
||||
client, _ = _make_client(
|
||||
[(
|
||||
int(OmniLinkMessageType.ExecuteSecurityCommandResponse),
|
||||
bytes([int(SecurityCommandResponse.INVALID_CODE)]),
|
||||
)]
|
||||
)
|
||||
with pytest.raises(CommandFailedError) as ei:
|
||||
await client.execute_security_command(
|
||||
area=1, mode=SecurityMode.AWAY, code=9999
|
||||
)
|
||||
assert ei.value.failure_code == int(SecurityCommandResponse.INVALID_CODE)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_security_command_nak_raises() -> None:
|
||||
client, _ = _make_client([(int(OmniLinkMessageType.Nak), b"")])
|
||||
with pytest.raises(CommandFailedError, match="NAK"):
|
||||
await client.execute_security_command(
|
||||
area=1, mode=SecurityMode.OFF, code=0
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_security_command_rejects_bad_inputs() -> None:
|
||||
client, _ = _make_client()
|
||||
with pytest.raises(ValueError, match="area"):
|
||||
await client.execute_security_command(area=0, mode=SecurityMode.OFF, code=0)
|
||||
with pytest.raises(ValueError, match="code"):
|
||||
await client.execute_security_command(
|
||||
area=1, mode=SecurityMode.OFF, code=10000
|
||||
)
|
||||
|
||||
|
||||
# ---- acknowledge_alerts -------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_acknowledge_alerts_is_noop_on_v1() -> None:
|
||||
"""v1 has no AcknowledgeAlerts opcode — method should not call request."""
|
||||
client, fake = _make_client()
|
||||
await client.acknowledge_alerts()
|
||||
assert fake.calls == []
|
||||
276
tests/test_v1_messages.py
Normal file
276
tests/test_v1_messages.py
Normal file
@ -0,0 +1,276 @@
|
||||
"""Unit tests for omni_pca.v1.messages parsers.
|
||||
|
||||
Test vectors are real wire payloads captured from a firmware-2.12 Omni
|
||||
Pro II panel via dev/probe_v1_recon.py — see the comment above each
|
||||
test for the inputs that produced it.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from omni_pca.v1.messages import (
|
||||
parse_v1_aux_status,
|
||||
parse_v1_system_status,
|
||||
parse_v1_thermostat_status,
|
||||
parse_v1_unit_status,
|
||||
parse_v1_zone_status,
|
||||
)
|
||||
|
||||
|
||||
# ---- ZoneStatus ---------------------------------------------------------
|
||||
|
||||
|
||||
def test_v1_zone_status_secure_and_open() -> None:
|
||||
# Captured: RequestZoneStatus(1, 8) → 16-byte payload, 8 zones × 2 bytes.
|
||||
# zone 6 raw_status=0x01 (open), all others 0x00.
|
||||
payload = bytes.fromhex("0080007f007f0080008001fd00810080")
|
||||
zones = parse_v1_zone_status(payload, start_index=1)
|
||||
assert len(zones) == 8
|
||||
assert {z.index for z in zones} == set(range(1, 9))
|
||||
assert zones[0].is_secure # zone 1
|
||||
assert zones[5].is_open # zone 6
|
||||
assert zones[5].raw_status == 0x01
|
||||
assert zones[5].loop == 0xFD
|
||||
|
||||
|
||||
def test_v1_zone_status_indexes_offset_by_start() -> None:
|
||||
# If we requested zones 17..24, the same 16-byte payload should
|
||||
# produce indexes 17..24.
|
||||
payload = bytes.fromhex("0080007f007f0080008001fd00810080")
|
||||
zones = parse_v1_zone_status(payload, start_index=17)
|
||||
assert {z.index for z in zones} == set(range(17, 25))
|
||||
|
||||
|
||||
def test_v1_zone_status_invalid_length() -> None:
|
||||
with pytest.raises(ValueError, match="multiple of 2"):
|
||||
parse_v1_zone_status(b"\x00\x00\x00", start_index=1)
|
||||
|
||||
|
||||
# ---- UnitStatus ---------------------------------------------------------
|
||||
|
||||
|
||||
def test_v1_unit_status_dimmer_levels() -> None:
|
||||
# Captured: RequestUnitStatus(1, 8) → 24-byte payload, 8 units × 3 bytes.
|
||||
# state bytes: 01, 01, 69, 96, 69, 00, 73, 00 → 100%, 100%, 5%, 50%, 5%, 0%, 15%, 0%
|
||||
payload = bytes.fromhex("010000010000690000960000690000000000730000000000")
|
||||
units = parse_v1_unit_status(payload, start_index=1)
|
||||
assert len(units) == 8
|
||||
assert units[0].is_on and units[0].brightness == 100 # state=0x01
|
||||
assert units[2].brightness == 5 # state=0x69 = 105 → -100 = 5%
|
||||
assert units[3].brightness == 50 # state=0x96 = 150 → -100 = 50%
|
||||
assert not units[5].is_on # state=0x00
|
||||
assert units[6].brightness == 15 # state=0x73 = 115 → -100 = 15%
|
||||
|
||||
|
||||
def test_v1_unit_status_time_remaining_be_u16() -> None:
|
||||
# Single record with remaining=0x1234.
|
||||
payload = bytes([0x01, 0x12, 0x34])
|
||||
units = parse_v1_unit_status(payload, start_index=42)
|
||||
assert len(units) == 1
|
||||
assert units[0].index == 42
|
||||
assert units[0].time_remaining_secs == 0x1234
|
||||
|
||||
|
||||
def test_v1_unit_status_invalid_length() -> None:
|
||||
with pytest.raises(ValueError, match="multiple of 3"):
|
||||
parse_v1_unit_status(b"\x00\x00", start_index=1)
|
||||
|
||||
|
||||
# ---- ThermostatStatus ---------------------------------------------------
|
||||
|
||||
|
||||
def test_v1_thermostat_status_unconfigured() -> None:
|
||||
# Captured: RequestThermostatStatus(1, 4) → 28 B, all values 0/0/0/0/0/0/0
|
||||
# except byte 0 of records 0-1 which is 0x01 (status). The "raw=0" temps
|
||||
# decode to -40°C / -40°F per omni_temp_to_fahrenheit.
|
||||
payload = bytes.fromhex(
|
||||
"01000000000000010000000000000000000000000000000000000000"
|
||||
)
|
||||
tstats = parse_v1_thermostat_status(payload, start_index=1)
|
||||
assert len(tstats) == 4
|
||||
assert tstats[0].status == 0x01
|
||||
assert tstats[2].status == 0x00
|
||||
assert tstats[0].humidity_raw == 0 # zero-filled (v1 doesn't carry it)
|
||||
assert tstats[0].outdoor_temperature_raw == 0
|
||||
assert tstats[0].horc_status == 0
|
||||
|
||||
|
||||
def test_v1_thermostat_full_record() -> None:
|
||||
# Hand-constructed: status=0x01, temp=170 (=45°F), heat=140 (30°F),
|
||||
# cool=200 (60°F), mode=1, fan=2, hold=3.
|
||||
payload = bytes([0x01, 170, 140, 200, 1, 2, 3])
|
||||
tstats = parse_v1_thermostat_status(payload, start_index=5)
|
||||
assert len(tstats) == 1
|
||||
t = tstats[0]
|
||||
assert t.index == 5
|
||||
assert t.status == 0x01
|
||||
assert t.temperature_raw == 170
|
||||
assert t.heat_setpoint_raw == 140
|
||||
assert t.cool_setpoint_raw == 200
|
||||
assert t.system_mode == 1
|
||||
assert t.fan_mode == 2
|
||||
assert t.hold_mode == 3
|
||||
|
||||
|
||||
def test_v1_thermostat_invalid_length() -> None:
|
||||
with pytest.raises(ValueError, match="multiple of 7"):
|
||||
parse_v1_thermostat_status(b"\x00" * 6, start_index=1)
|
||||
|
||||
|
||||
# ---- AuxiliaryStatus ----------------------------------------------------
|
||||
|
||||
|
||||
def test_v1_aux_status_all_zero() -> None:
|
||||
# Captured: RequestAuxiliaryStatus(1, 8) → 32 B all zeros.
|
||||
payload = bytes(32)
|
||||
auxes = parse_v1_aux_status(payload, start_index=1)
|
||||
assert len(auxes) == 8
|
||||
assert all(a.output == 0 and a.value_raw == 0 for a in auxes)
|
||||
|
||||
|
||||
def test_v1_aux_status_record_field_order() -> None:
|
||||
# Single record: output=11, value=22, low=33, high=44
|
||||
payload = bytes([11, 22, 33, 44])
|
||||
auxes = parse_v1_aux_status(payload, start_index=99)
|
||||
assert len(auxes) == 1
|
||||
a = auxes[0]
|
||||
assert a.index == 99
|
||||
assert a.output == 11
|
||||
assert a.value_raw == 22
|
||||
assert a.low_raw == 33
|
||||
assert a.high_raw == 44
|
||||
|
||||
|
||||
def test_v1_aux_invalid_length() -> None:
|
||||
with pytest.raises(ValueError, match="multiple of 4"):
|
||||
parse_v1_aux_status(b"\x00\x00\x00", start_index=1)
|
||||
|
||||
|
||||
# ---- SystemStatus -------------------------------------------------------
|
||||
|
||||
|
||||
def test_v1_system_status_full_payload() -> None:
|
||||
# Captured: RequestSystemStatus → 38 B payload from firmware 2.12.
|
||||
# Bytes: 011a050a07163b1c01061c150003 + 24 area-mode bytes
|
||||
# decode: time_valid=1, year=26 (=2026), month=05, day=10,
|
||||
# dow=07, hour=22, min=59, sec=28, dst=01, sun_h=06, sun_m=28,
|
||||
# sun_h2=21, sun_m2=21, battery=0x00, then area modes.
|
||||
# Note: the 14th byte (0x03) is the BATTERY reading = 3, not 0.
|
||||
payload = bytes.fromhex(
|
||||
"011a050a07163b1c01061c150003000000000000000002090000000000000000000000000000"
|
||||
)
|
||||
s = parse_v1_system_status(payload)
|
||||
assert s.time_valid is True
|
||||
assert s.panel_time is not None
|
||||
assert s.panel_time.year == 2000 + 0x1A # 2026
|
||||
assert s.panel_time.month == 0x05
|
||||
assert s.panel_time.day == 0x0A
|
||||
assert s.sunrise_hour == 0x06
|
||||
assert s.sunrise_minute == 0x1C # 28
|
||||
assert s.sunset_hour == 0x15 # 21
|
||||
assert s.sunset_minute == 0x00
|
||||
assert s.battery_reading == 0x03
|
||||
# 24 trailing bytes promoted to area_alarms tuples (mode_byte, 0).
|
||||
assert len(s.area_alarms) == 24
|
||||
assert s.area_alarms[0] == (0, 0)
|
||||
# Area 9 in this capture had mode=2.
|
||||
assert s.area_alarms[8] == (2, 0)
|
||||
|
||||
|
||||
def test_v1_system_status_minimum_payload() -> None:
|
||||
# Just the 14 header bytes, no area modes.
|
||||
payload = bytes(14)
|
||||
s = parse_v1_system_status(payload)
|
||||
assert s.time_valid is False
|
||||
assert s.panel_time is None
|
||||
assert s.battery_reading == 0
|
||||
assert s.area_alarms == ()
|
||||
|
||||
|
||||
def test_v1_system_status_too_short_raises() -> None:
|
||||
with pytest.raises(ValueError, match="too short"):
|
||||
parse_v1_system_status(b"\x00" * 13)
|
||||
|
||||
|
||||
# ---- NameData -----------------------------------------------------------
|
||||
|
||||
|
||||
from omni_pca.v1.messages import NameType, parse_v1_namedata # noqa: E402
|
||||
|
||||
|
||||
def test_v1_namedata_zone_one_byte_form() -> None:
|
||||
# Captured: UploadNames stream → first reply = Zone #1 'GARAGE ENTRY'.
|
||||
# Payload 18 B = type(1) + num(1) + name(15) + reserved(1).
|
||||
payload = bytes.fromhex("010147415241474520454e54525900000000")
|
||||
rec = parse_v1_namedata(payload)
|
||||
assert rec.name_type == int(NameType.ZONE)
|
||||
assert rec.name_type_label == "ZONE"
|
||||
assert rec.number == 1
|
||||
assert rec.name == "GARAGE ENTRY"
|
||||
|
||||
|
||||
def test_v1_namedata_unit_one_byte_form() -> None:
|
||||
# Hand-crafted: Unit #5 = "GARAGE ENTRY" (12-char name slot, no padding need).
|
||||
name = "GARAGE ENTRY"
|
||||
payload = (
|
||||
bytes([int(NameType.UNIT), 5])
|
||||
+ name.encode("ascii").ljust(12, b"\x00")
|
||||
+ b"\x00" # reserved trailing byte
|
||||
)
|
||||
rec = parse_v1_namedata(payload)
|
||||
assert rec.name_type == int(NameType.UNIT)
|
||||
assert rec.number == 5
|
||||
assert rec.name == name
|
||||
|
||||
|
||||
def test_v1_namedata_unit_two_byte_form() -> None:
|
||||
# Unit #257 = 'Z1-LANDSCAPE' — captured from the real panel after the
|
||||
# numbered units rolled over 256.
|
||||
payload = (
|
||||
bytes([int(NameType.UNIT), 0x01, 0x01]) # type, num_hi=1, num_lo=1
|
||||
+ b"Z1-LANDSCAPE".ljust(12, b"\x00") # 12-char name
|
||||
+ b"\x00"
|
||||
)
|
||||
rec = parse_v1_namedata(payload)
|
||||
assert rec.name_type == int(NameType.UNIT)
|
||||
assert rec.number == 257
|
||||
assert rec.name == "Z1-LANDSCAPE"
|
||||
|
||||
|
||||
def test_v1_namedata_thermostat() -> None:
|
||||
payload = (
|
||||
bytes([int(NameType.THERMOSTAT), 1])
|
||||
+ b"DOWNSTAIRS".ljust(12, b"\x00")
|
||||
+ b"\x00"
|
||||
)
|
||||
rec = parse_v1_namedata(payload)
|
||||
assert rec.name_type == int(NameType.THERMOSTAT)
|
||||
assert rec.number == 1
|
||||
assert rec.name == "DOWNSTAIRS"
|
||||
|
||||
|
||||
def test_v1_namedata_strips_trailing_nulls() -> None:
|
||||
payload = (
|
||||
bytes([int(NameType.ZONE), 9])
|
||||
+ b"HALL MOTION".ljust(15, b"\x00")
|
||||
+ b"\x00"
|
||||
)
|
||||
rec = parse_v1_namedata(payload)
|
||||
assert rec.name == "HALL MOTION" # no embedded nulls in result
|
||||
|
||||
|
||||
def test_v1_namedata_unknown_type_falls_through() -> None:
|
||||
# Unknown name type — parser should still return SOMETHING by
|
||||
# consuming the rest as the name. HA filters by NameType anyway.
|
||||
payload = bytes([99, 7]) + b"WHATEVER\x00\x00"
|
||||
rec = parse_v1_namedata(payload)
|
||||
assert rec.name_type == 99
|
||||
assert rec.name_type_label == "Unknown(99)"
|
||||
assert rec.number == 7
|
||||
assert rec.name == "WHATEVER"
|
||||
|
||||
|
||||
def test_v1_namedata_short_payload_raises() -> None:
|
||||
with pytest.raises(ValueError, match="too short"):
|
||||
parse_v1_namedata(b"\x01\x00")
|
||||
Loading…
x
Reference in New Issue
Block a user