Some Omni network modules are configured for UDP, in which case PC Access
falls back to the v1 wire protocol (OmniLinkMessage outer = 0x10, inner
StartChar 0x5A, typed Request*Status opcodes) instead of v2's TCP path
(OmniLink2Message + StartChar 0x21 + parameterised RequestProperties).
This adds a parallel implementation rather than overloading the v2 path.
omni_pca/v1/
connection.py UDP-only OmniConnectionV1; reuses crypto + handshake,
routes post-handshake messages through OmniLinkMessage
(0x10) wrapping v1 inner format. Adds iter_streaming
for the lock-step UploadNames/Acknowledge/EOD pattern.
messages.py Block parsers for the typed v1 status replies (zone,
unit, thermostat, aux), v1 SystemStatus, and NameData
(handles both one-byte and two-byte NameNumber forms).
client.py OmniClientV1: read API (get_system_information,
get_*_status), discovery (iter_names + list_*_names),
write API (execute_command, execute_security_command,
turn_unit_*, set_unit_level, bypass/restore_zone,
execute_button, set_thermostat_*). acknowledge_alerts
is a no-op (v1 has no equivalent opcode).
Discovery uses bare UploadNames; panel streams every defined name across
all types in a fixed order with per-record Acknowledge. Verified against
firmware 2.12 — pulled 16 zones, 44 units, 16 buttons, 8 codes,
2 thermostats, 8 messages in one stream.
src/omni_pca/message.py
Fix flipped START_CHAR_V1_* constants. enuOmniLinkMessageFormat says
Addressable=0x41 and NonAddressable=0x5A; our names had them swapped.
Wire bytes were unchanged, so existing tests kept passing — but
encode_v1() with no serial_address now correctly emits 0x5A, which
is what UDP needs.
tests/
test_v1_messages.py 22 cases; payloads are real wire captures
from a firmware-2.12 panel via probe_v1_recon.
test_v1_client_commands.py 20 cases; payload-packing for the Command
and ExecuteSecurityCommand opcodes,
including BE u16 parameter2 and the
digit-by-digit security code form.
dev/
probe_v1.py Phase-1 smoke: handshake + RequestSystemInformation.
probe_v1_recon.py Raw opcode dump for protocol reconnaissance.
probe_v1_stream.py Streaming UploadNames flow exploration.
probe_v1_client.py Full read-path smoke test via OmniClientV1.
probe_v1_write.py Live no-op execute_command round-trip.
.gitignore: ignore dev/.omni_key (probe scripts read controller key from
this file as one fallback option).
Discovery on firmware 2.12: Request*ExtendedStatus opcodes (63/65/69)
NAK on this firmware — only the basic Request*Status opcodes are
implemented, so OmniClientV1 uses those (3 bytes/unit, 7 bytes/tstat,
4 bytes/aux records). HA still gets enough signal for polling; full
properties discovery uses streaming UploadNames instead.
Test totals: 387 passed, 1 skipped (existing fixture skip).
152 lines
5.5 KiB
Python
152 lines
5.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Phase-1 smoke test: v1-over-UDP handshake + RequestSystemInformation.
|
|
|
|
Run inside the project venv:
|
|
cd /home/kdm/home-auto/omni-pca
|
|
uv run python dev/probe_v1.py [--host 192.168.1.9] [--port 4369]
|
|
|
|
Requires the panel's controller key. Picks it up from (in order):
|
|
1. ``--key 32hex`` on the command line
|
|
2. ``OMNI_KEY`` env var
|
|
3. ``dev/.omni_key`` file (gitignored)
|
|
4. The bundled ``.pca`` plain fixture (developer-only fallback)
|
|
|
|
Success criteria: panel returns a v1 SystemInformation message (opcode 18)
|
|
within the timeout. Failure modes we want to distinguish:
|
|
* UDP socket fails to open → routing / firewall
|
|
* Handshake step 2 timeout → wrong port, wrong panel
|
|
* Handshake step 4 termination → wrong controller key
|
|
* SystemInformation timeout → v1 path isn't doing what we think
|
|
* SystemInformation reply → v1-over-UDP is real, proceed to Phase 2
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from omni_pca.v1.connection import (
|
|
HandshakeError,
|
|
InvalidEncryptionKeyError,
|
|
OmniConnectionV1,
|
|
RequestTimeoutError,
|
|
)
|
|
from omni_pca.opcodes import OmniLinkMessageType
|
|
|
|
|
|
def _load_key(arg_key: str | None) -> bytes:
|
|
if arg_key:
|
|
return bytes.fromhex(arg_key)
|
|
env = os.environ.get("OMNI_KEY")
|
|
if env:
|
|
return bytes.fromhex(env)
|
|
keyfile = Path(__file__).parent / ".omni_key"
|
|
if keyfile.exists():
|
|
return bytes.fromhex(keyfile.read_text().strip())
|
|
fixture = Path("/home/kdm/home-auto/HAI/pca-re/extracted/Our_House.pca.plain")
|
|
if fixture.exists():
|
|
from omni_pca.pca_file import (
|
|
PcaReader,
|
|
_CAP_OMNI_PRO_II,
|
|
_parse_header,
|
|
_walk_to_connection,
|
|
)
|
|
|
|
r = PcaReader(fixture.read_bytes())
|
|
_parse_header(r)
|
|
_walk_to_connection(r, _CAP_OMNI_PRO_II)
|
|
r.string8_fixed(120) # network_address
|
|
r.string8_fixed(5) # port
|
|
return bytes.fromhex(r.string8_fixed(32).ljust(32, "0")[:32])
|
|
raise SystemExit("no controller key: pass --key, set OMNI_KEY, or create dev/.omni_key")
|
|
|
|
|
|
def _decode_system_information(payload: bytes) -> dict[str, object]:
|
|
"""Parse the v1 SystemInformation payload (mirrors clsOLMsgSystemInformation)."""
|
|
if len(payload) < 29:
|
|
raise ValueError(f"SystemInformation payload too short: {len(payload)} bytes")
|
|
return {
|
|
"opcode": payload[0],
|
|
"model": payload[1],
|
|
"fw_major": payload[2],
|
|
"fw_minor": payload[3],
|
|
"fw_revision": int.from_bytes(payload[4:5], "big", signed=True),
|
|
"local_phone": payload[5:29].rstrip(b"\x00").decode("ascii", errors="replace"),
|
|
}
|
|
|
|
|
|
async def amain(args: argparse.Namespace) -> int:
|
|
key = _load_key(args.key)
|
|
print(f"[probe] target {args.host}:{args.port} key=...{key[-2:].hex()} (16 B)")
|
|
|
|
try:
|
|
async with OmniConnectionV1(
|
|
host=args.host,
|
|
port=args.port,
|
|
controller_key=key,
|
|
timeout=args.timeout,
|
|
retry_count=args.retries,
|
|
) as conn:
|
|
print(f"[probe] handshake OK state={conn.state.name} "
|
|
f"session_key=...{conn.session_key[-2:].hex() if conn.session_key else 'n/a'}")
|
|
|
|
print("[probe] sending v1 RequestSystemInformation (opcode 17)")
|
|
reply = await conn.request(OmniLinkMessageType.RequestSystemInformation)
|
|
print(f"[probe] reply: start_char={reply.start_char:#04x} "
|
|
f"opcode={reply.opcode} payload={reply.data.hex()}")
|
|
|
|
if reply.opcode != int(OmniLinkMessageType.SystemInformation):
|
|
print(f"[probe] WARNING: expected opcode 18 (SystemInformation), "
|
|
f"got {reply.opcode}")
|
|
return 2
|
|
|
|
info = _decode_system_information(reply.data)
|
|
print(f"[probe] ✓ v1-over-UDP works")
|
|
print(f" model = {info['model']}")
|
|
print(f" firmware = {info['fw_major']}.{info['fw_minor']}.{info['fw_revision']}")
|
|
print(f" phone = {info['local_phone']!r}")
|
|
|
|
except InvalidEncryptionKeyError as exc:
|
|
print(f"[probe] handshake terminated: wrong controller key? ({exc})")
|
|
return 1
|
|
except HandshakeError as exc:
|
|
print(f"[probe] handshake failed: {exc}")
|
|
return 1
|
|
except RequestTimeoutError as exc:
|
|
print(f"[probe] no reply to RequestSystemInformation: {exc}")
|
|
print(" → handshake worked but v1 path isn't responding. "
|
|
"Check tcpdump for what's on the wire.")
|
|
return 2
|
|
except OSError as exc:
|
|
print(f"[probe] socket error: {exc}")
|
|
return 1
|
|
|
|
return 0
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--host", default="192.168.1.9")
|
|
parser.add_argument("--port", type=int, default=4369)
|
|
parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key")
|
|
parser.add_argument("--timeout", type=float, default=5.0)
|
|
parser.add_argument("--retries", type=int, default=2)
|
|
parser.add_argument("--debug", action="store_true",
|
|
help="enable DEBUG logging (TX/RX packet dump)")
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.debug else logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
|
)
|
|
|
|
return asyncio.run(amain(args))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|