Compare commits

...

6 Commits

Author SHA1 Message Date
259c46e558 Release 2026.5.11: v1-over-UDP + HA integration
First PyPI release of the v1 wire path. Wheel published from local
source 2026-05-11 with omni_pca/v1/ subpackage included.

What's in 2026.5.11 vs 2026.5.10 (already on PyPI):
* New omni_pca.v1 subpackage -- OmniConnectionV1, OmniClientV1,
  OmniClientV1Adapter -- for panels that listen on UDP only and
  speak the legacy OmniLink (not OmniLink2) wire dialect.
* HA integration wires the adapter into the coordinator when
  Transport=UDP is selected at config-flow time; v2/TCP path is
  unchanged.
* Streaming UploadNames discovery (bare opcode + lock-step
  Acknowledge until EOD/NAK).
* Long-form RequestUnitStatus for unit indices > 255 (sprinklers,
  named flags, expansion-enclosure outputs).
* Chunked status polls -- firmware 2.12 NAKs at ~63 records per
  request, so we batch in groups of 40.
* OmniConnection.close() now sends ClientSessionTerminated so the
  panel frees our session slot immediately on disconnect.

Verified end-to-end against a firmware 2.12 OmniPro II panel at
192.168.1.9: discovery (16 zones, 44 units, 16 buttons, 8 codes,
2 thermostats, 8 messages) + status polling + execute_command
round-trip all working under HA, side-by-side with the existing
TCP mock-panel path in the dev stack.

README: new "Two wire dialects" section explaining when to pick
TCP/OmniClient vs UDP/OmniClientV1.
manifest.json: requirements bump to omni-pca==2026.5.11.
2026-05-11 13:40:34 -06:00
abf96601e8 dev/screenshot.py: tolerate post-onboarding /api/onboarding 404
After HA finishes its first-run wizard the /api/onboarding endpoint
returns 404 plain-text instead of a JSON step list. The previous
screenshot run blew up trying to json-parse "404: Not Found".

Both call sites (_onboard and _complete_onboarding) now check the
status code first and treat anything non-200 as "already complete --
skip and go to the login path".
2026-05-11 13:35:12 -06:00
df628aa56f dev stack: expose HA at juliet.warehack.ing via caddy-docker-proxy
Adds the homeassistant service to the external caddy network with
labels for juliet.warehack.ing so caddy-docker-proxy issues a public
cert and proxies traffic to port 8123. Uses the same streaming-
friendly transport tuning the docs-site service uses, because HA's
frontend keeps long-lived WebSockets open for lovelace state pushes
and config flows -- without stream_timeout: 24h etc., caddy closes
the socket every ~15s and the UI churns reconnects.

Keeps the 8123 host-port mapping intact for direct localhost dev
access; public traffic flows over the caddy bridge.

dev/ha-config/configuration.yaml (not tracked here -- root-owned in
the HA container) was updated separately to add:

    http:
      use_x_forwarded_for: true
      trusted_proxies:
        - 10.10.16.0/20   # caddy bridge subnet

Without that block HA rejects the OAuth redirect_uri at login because
the auth check sees the internal docker IP instead of the public host.
2026-05-11 12:05:18 -06:00
09e2d83b49 dev stack: pip-install local omni-pca on HA startup
Replace the brittle bind-mount-over-site-packages trick with a proper
``pip install --no-deps /opt/omni-pca-src`` in the HA container's
entrypoint. This gives HA a real ``omni_pca-2026.5.10.dist-info`` so
the manifest's requirement check passes, plus the v1 subpackage that's
not in the published wheel yet (omni-pca==2026.5.10 isn't on PyPI).

Before: ``--force-recreate`` broke the dev stack because the bind mount
overlaid the package contents but left no dist-info, and HA's uv-based
installer can't fetch omni-pca from PyPI.

After: container recreate just works. ``docker compose restart
homeassistant`` re-installs from the latest local source on every
start, so HA + library are always in sync with the working tree.

Header comments updated to mention the real-panel (UDP/v1) config-flow
fields alongside the existing mock-panel ones.
2026-05-11 02:58:19 -06:00
30b482a8cb HA integration: wire v1+UDP into the coordinator + config flow
OmniClientV1Adapter (src/omni_pca/v1/adapter.py)
  V2-shape facade over OmniClientV1. Exposes the OmniClient surface the
  HA coordinator was written against — get_system_information,
  list_*_names, get_object_properties (synthesized from streamed names),
  get_extended_status (chunked, routed to v1 typed status opcodes),
  get_object_status(AREA, ...) (derived from SystemStatus.area_alarms),
  events() (EventStream on v1 SystemEvents opcode 35), plus all the
  write-method shims.

  Chunks unit/zone/thermostat/aux polls per-type because firmware 2.12
  NAKs Request*Status with >~62 records in one shot (verified live).
  Falls back to "Area 1".."Area 8" when the UploadNames stream returns
  zero areas — common on panels where the installer didn't name them.

custom_components/omni_pca/coordinator.py
  _ensure_connected picks OmniClientV1Adapter for transport=udp. New
  _walk_properties_v1 replaces the v2 RequestProperties walk with a
  name-stream + synthesized-Properties pass.

custom_components/omni_pca/config_flow.py
  _probe routes to OmniClientV1Adapter for transport=udp instead of
  trying to drive v2 OmniClient over UDP (which silently dropped after
  handshake, per the earlier diagnosis).

src/omni_pca/events.py
  parse_events / _ensure_system_events / EventStream now take an
  expected_opcode arg (default v2 SystemEvents=55, v1 callers pass 35).
  Word format is byte-identical between v1 and v2, so the typed-event
  decoder is unchanged.

src/omni_pca/v1/client.py
  _range_status supports the long-form RequestUnitStatus (BE u16
  start/end) so panels with unit indices > 255 (sprinklers, flags) work.

Verified end-to-end against firmware 2.12 panel at 192.168.1.9:
  config entries:
    state=loaded  Omni Pro II (host.docker.internal)  (mock)
    state=loaded  Omni Pro II (192.168.1.9)           (real, v1+UDP)
  real-panel entities created in HA: 96 (30 binary_sensor, 26 light,
  15 switch, 13 button, 9 sensor, 3 climate)
  cross-check: light.omni_pro_ii_front_porch_2 = on  (matches live
  probe: unit #2 'FRONT PORCH' state=0x01 brightness=100)

dev/probe_v1_coordinator.py
  Coordinator-shaped end-to-end smoke test against the real panel
  without HA — drives the full discovery + poll cycle through the
  adapter. Useful for regression-checking the v1 wire path.

dev/add_real_panel.py
  Programmatically adds the real-panel config entry to the dev HA
  stack via the REST config-flow endpoints. Idempotent.
2026-05-11 01:30:49 -06:00
92c8b695b4 v1-over-UDP: parallel OmniClientV1 for panels that listen UDP-only
Some Omni network modules are configured for UDP, in which case PC Access
falls back to the v1 wire protocol (OmniLinkMessage outer = 0x10, inner
StartChar 0x5A, typed Request*Status opcodes) instead of v2's TCP path
(OmniLink2Message + StartChar 0x21 + parameterised RequestProperties).
This adds a parallel implementation rather than overloading the v2 path.

omni_pca/v1/
  connection.py   UDP-only OmniConnectionV1; reuses crypto + handshake,
                  routes post-handshake messages through OmniLinkMessage
                  (0x10) wrapping v1 inner format. Adds iter_streaming
                  for the lock-step UploadNames/Acknowledge/EOD pattern.
  messages.py     Block parsers for the typed v1 status replies (zone,
                  unit, thermostat, aux), v1 SystemStatus, and NameData
                  (handles both one-byte and two-byte NameNumber forms).
  client.py       OmniClientV1: read API (get_system_information,
                  get_*_status), discovery (iter_names + list_*_names),
                  write API (execute_command, execute_security_command,
                  turn_unit_*, set_unit_level, bypass/restore_zone,
                  execute_button, set_thermostat_*). acknowledge_alerts
                  is a no-op (v1 has no equivalent opcode).

Discovery uses bare UploadNames; panel streams every defined name across
all types in a fixed order with per-record Acknowledge. Verified against
firmware 2.12 — pulled 16 zones, 44 units, 16 buttons, 8 codes,
2 thermostats, 8 messages in one stream.

src/omni_pca/message.py
  Fix flipped START_CHAR_V1_* constants. enuOmniLinkMessageFormat says
  Addressable=0x41 and NonAddressable=0x5A; our names had them swapped.
  Wire bytes were unchanged, so existing tests kept passing — but
  encode_v1() with no serial_address now correctly emits 0x5A, which
  is what UDP needs.

tests/
  test_v1_messages.py        22 cases; payloads are real wire captures
                              from a firmware-2.12 panel via probe_v1_recon.
  test_v1_client_commands.py 20 cases; payload-packing for the Command
                              and ExecuteSecurityCommand opcodes,
                              including BE u16 parameter2 and the
                              digit-by-digit security code form.

dev/
  probe_v1.py        Phase-1 smoke: handshake + RequestSystemInformation.
  probe_v1_recon.py  Raw opcode dump for protocol reconnaissance.
  probe_v1_stream.py Streaming UploadNames flow exploration.
  probe_v1_client.py Full read-path smoke test via OmniClientV1.
  probe_v1_write.py  Live no-op execute_command round-trip.

.gitignore: ignore dev/.omni_key (probe scripts read controller key from
this file as one fallback option).

Discovery on firmware 2.12: Request*ExtendedStatus opcodes (63/65/69)
NAK on this firmware — only the basic Request*Status opcodes are
implemented, so OmniClientV1 uses those (3 bytes/unit, 7 bytes/tstat,
4 bytes/aux records). HA still gets enough signal for polling; full
properties discovery uses streaming UploadNames instead.

Test totals: 387 passed, 1 skipped (existing fixture skip).
2026-05-11 01:08:01 -06:00
24 changed files with 3486 additions and 43 deletions

1
.gitignore vendored
View File

@ -41,3 +41,4 @@ panel_key*
.wine-pca/
ha-config/
dist/
dev/.omni_key

View File

@ -53,6 +53,25 @@ asyncio.run(main())
For the panel walkthrough — connect, list zones, react to push events — see the [tutorial](https://hai-omni-pro-ii.warehack.ing/tutorials/first-script/).
## Two wire dialects — TCP/v2 vs UDP/v1
The Omni network module is configurable at the panel keypad to listen on **TCP, UDP, or both**. Each transport speaks a different wire dialect — `OmniClient` above handles the TCP path (OmniLink2, the modern wire format used by PC Access ≥ 3); panels configured UDP-only fall back to the legacy v1 protocol with typed `RequestZoneStatus` / `RequestUnitStatus` opcodes, no `RequestProperties`, and streaming name downloads. For those, use [`OmniClientV1`](https://hai-omni-pro-ii.warehack.ing/reference/library-api/#v1-udp-omniclientv1) from the `omni_pca.v1` subpackage:
```python
from omni_pca.v1 import OmniClientV1
async with OmniClientV1(
host="192.168.1.9",
controller_key=bytes.fromhex("..."),
) as panel:
info = await panel.get_system_information() # same dataclass as v2
names = await panel.list_all_names() # streaming UploadNames
zones = await panel.get_zone_status(1, 16) # typed status by range
await panel.execute_security_command(area=1, mode=SecurityMode.AWAY, code=1234)
```
The HA integration picks the right client automatically based on the **Transport** dropdown in the config flow (TCP vs UDP). See [zone & unit numbering](https://hai-omni-pro-ii.warehack.ing/explanation/zone-unit-numbering/) for why v1 panels need the long-form `RequestUnitStatus` for unit indices > 255.
## Quick start (Home Assistant)
```bash

View File

@ -168,15 +168,40 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
key: bytes,
transport: str = DEFAULT_TRANSPORT,
) -> tuple[str | None, str | None]:
"""Try to connect once. Returns (title, error_code)."""
"""Try to connect once. Returns (title, error_code).
TCP uses :class:`OmniClient` (v2 wire protocol). UDP uses the v1
adapter UDP-listening panels speak the legacy wire protocol,
not OmniLink2 see :mod:`omni_pca.v1.adapter` for the bridge.
"""
try:
async with OmniClient(
host,
port=port,
controller_key=key,
transport=transport, # type: ignore[arg-type]
) as client:
info = await client.get_system_information()
if transport == TRANSPORT_UDP:
from omni_pca.v1 import (
HandshakeError as V1HandshakeError,
)
from omni_pca.v1 import (
InvalidEncryptionKeyError as V1InvalidEncryptionKeyError,
)
from omni_pca.v1 import OmniClientV1Adapter
from omni_pca.v1.connection import (
ConnectionError as V1ConnectionError,
)
try:
async with OmniClientV1Adapter(
host, port=port, controller_key=key,
) as client:
info = await client.get_system_information()
except (V1HandshakeError, V1InvalidEncryptionKeyError):
return None, "invalid_auth"
except (V1ConnectionError, OSError, TimeoutError) as err:
LOGGER.debug("v1 probe failed: %s", err)
return None, "cannot_connect"
else:
async with OmniClient(
host, port=port, controller_key=key, transport=transport, # type: ignore[arg-type]
) as client:
info = await client.get_system_information()
except (HandshakeError, InvalidEncryptionKeyError):
return None, "invalid_auth"
except (OmniConnectionError, OSError, TimeoutError) as err:

View File

@ -234,12 +234,25 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
async def _ensure_connected(self) -> OmniClient:
if self._client is not None:
return self._client
client = OmniClient(
self._host,
port=self._port,
controller_key=self._controller_key,
transport=self._transport, # type: ignore[arg-type]
)
if self._transport == "udp":
# Panels listening UDP-only speak the v1 wire protocol, not
# v2. The adapter exposes the OmniClient API surface this
# coordinator was written against, but underneath it drives
# an OmniConnectionV1 + the typed v1 status/command opcodes.
from omni_pca.v1 import OmniClientV1Adapter
client: OmniClient = OmniClientV1Adapter( # type: ignore[assignment]
self._host,
port=self._port,
controller_key=self._controller_key,
)
else:
client = OmniClient(
self._host,
port=self._port,
controller_key=self._controller_key,
transport=self._transport, # type: ignore[arg-type]
)
# Drive __aenter__ manually so the client survives across update
# cycles; we close it explicitly on shutdown / failure.
await client.__aenter__()
@ -392,9 +405,15 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
client's internal parser table only covers zones/units/areas in
v1.0). We drive ``RequestProperties`` directly on the connection
so we don't have to monkey-patch the library.
On UDP/v1 panels there is no ``RequestProperties`` opcode at all,
so we fall back to the v1 adapter's name-stream-based discovery
(each object's ``Properties`` is synthesized from its name).
"""
if parser is None or OBJECT_TYPE_TO_PROPERTIES.get(int(object_type)) is None:
return {}
if self._transport == "udp":
return await self._walk_properties_v1(client, object_type)
out: dict[int, object] = {}
cursor = 0
conn = client.connection
@ -443,6 +462,42 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
break
return out
async def _walk_properties_v1(
self, client: OmniClient, object_type: ObjectType
) -> dict[int, object]:
"""V1 fallback for :meth:`_walk_properties`.
v1 has no RequestProperties opcode names come from streaming
UploadNames and the rest of the Properties fields can't be
recovered from the wire. We delegate to the adapter's
``get_object_properties`` (which synthesizes a minimal record
from the cached name list) and skip anything it returns ``None``
for.
"""
# Pick the right per-type name lister. The adapter caches the
# UploadNames stream output so these are nearly free after the
# first call this discovery pass.
if object_type == ObjectType.THERMOSTAT:
names = await client.list_thermostat_names() # type: ignore[attr-defined]
elif object_type == ObjectType.BUTTON:
names = await client.list_button_names() # type: ignore[attr-defined]
else:
# Programs / Messages / etc — nothing to walk.
return {}
out: dict[int, object] = {}
for idx in sorted(names):
try:
props = await client.get_object_properties(object_type, idx)
except Exception:
LOGGER.debug(
"v1 properties synth failed for %s #%d",
object_type.name, idx, exc_info=True,
)
continue
if props is not None:
out[idx] = props
return out
@staticmethod
async def _best_effort(coro_fn, *, default):
"""Call ``coro_fn()`` and swallow non-transport errors, returning ``default``.

View File

@ -1,12 +1,12 @@
{
"domain": "omni_pca",
"name": "HAI/Leviton Omni Panel",
"version": "2026.5.10",
"version": "2026.5.11",
"iot_class": "local_push",
"config_flow": true,
"dependencies": [],
"codeowners": ["@rsp2k"],
"requirements": ["omni-pca==2026.5.10"],
"requirements": ["omni-pca==2026.5.11"],
"documentation": "https://git.supported.systems/warehack.ing/omni-pca",
"issue_tracker": "https://git.supported.systems/warehack.ing/omni-pca/issues",
"integration_type": "hub"

150
dev/add_real_panel.py Normal file
View File

@ -0,0 +1,150 @@
#!/usr/bin/env python3
"""Add a *second* omni_pca config entry pointing at the real panel.
The dev stack already has one entry pointing at the mock panel
(``host.docker.internal:14369``). This script adds another entry for
the real panel at ``192.168.1.9:4369`` using ``transport=udp`` and the
controller key from the bundled .pca fixture.
Run inside the project venv:
cd /home/kdm/home-auto/omni-pca
uv run python dev/add_real_panel.py
"""
from __future__ import annotations
import argparse
import asyncio
import sys
from pathlib import Path
import httpx
sys.path.insert(0, str(Path(__file__).parent))
from probe_v1 import _load_key # type: ignore # noqa: E402
DEFAULT_HA_URL = "http://localhost:8123"
PANEL_HOST = "192.168.1.9"
PANEL_PORT = 4369
DEFAULT_USERNAME = "demo"
DEFAULT_PASSWORD = "demo-password-1234"
async def _get_token(ha_url: str) -> str:
"""Re-use the cached access token; otherwise log in via /auth/login_flow."""
token_file = (
Path(__file__).parent / "ha-config" / ".storage" / ".demo_access_token"
)
if token_file.exists():
return token_file.read_text().strip()
async with httpx.AsyncClient(base_url=ha_url, timeout=15.0) as client:
r = await client.post(
"/auth/login_flow",
json={
"client_id": ha_url,
"handler": ["homeassistant", None],
"redirect_uri": ha_url,
},
)
r.raise_for_status()
flow_id = r.json()["flow_id"]
r = await client.post(
f"/auth/login_flow/{flow_id}",
json={
"client_id": ha_url,
"username": DEFAULT_USERNAME,
"password": DEFAULT_PASSWORD,
},
)
r.raise_for_status()
auth_code = r.json()["result"]
r = await client.post(
"/auth/token",
data={
"client_id": ha_url,
"grant_type": "authorization_code",
"code": auth_code,
},
)
r.raise_for_status()
token = r.json()["access_token"]
# Cache for next run.
try:
token_file.write_text(token)
except Exception:
pass
return token
async def amain(args: argparse.Namespace) -> int:
key_bytes = _load_key(None)
key_hex = key_bytes.hex()
print(f"[add-real-panel] target HA: {args.ha_url}")
print(f"[add-real-panel] panel: {PANEL_HOST}:{PANEL_PORT} (UDP)")
print(f"[add-real-panel] key: ...{key_hex[-4:]} (16 bytes)\n")
token = await _get_token(args.ha_url)
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient(base_url=args.ha_url, timeout=30.0) as client:
# ---- check if an entry already exists for this host ----
r = await client.get(
"/api/config/config_entries/entry", headers=headers
)
r.raise_for_status()
for entry in r.json():
if entry.get("domain") != "omni_pca":
continue
data = entry.get("data", {})
if data.get("host") == PANEL_HOST and data.get("port") == PANEL_PORT:
print(f" already configured: {entry['title']} ({entry['entry_id']})")
return 0
# ---- start the config flow ----
r = await client.post(
"/api/config/config_entries/flow",
headers=headers,
json={"handler": "omni_pca", "show_advanced_options": False},
)
r.raise_for_status()
flow = r.json()
flow_id = flow["flow_id"]
print(f" flow opened: {flow_id} (step={flow.get('step_id')})")
# ---- submit the form for the real panel ----
r = await client.post(
f"/api/config/config_entries/flow/{flow_id}",
headers=headers,
json={
"host": PANEL_HOST,
"port": PANEL_PORT,
"controller_key": key_hex,
"transport": "udp",
},
timeout=60.0, # the probe round-trip can take a few seconds
)
r.raise_for_status()
result = r.json()
if result.get("type") == "create_entry":
print(f" ✓ entry created: {result.get('title')}")
print(f" entry_id: {result.get('result')}")
elif result.get("type") == "form":
print(f" form re-shown — errors: {result.get('errors')}")
return 1
else:
print(f" unexpected outcome: {result}")
return 1
return 0
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--ha-url", default=DEFAULT_HA_URL)
args = parser.parse_args()
return asyncio.run(amain(args))
if __name__ == "__main__":
sys.exit(main())

View File

@ -6,15 +6,27 @@
# make dev-logs # tail HA logs
# make dev-down # stop and clean
#
# On every container start the HA service pip-installs the local
# `omni-pca` library from ../ into site-packages (the version pinned in
# the integration manifest isn't on PyPI yet, and we want our latest
# v1/ subpackage available either way). Source changes in src/omni_pca
# require a ``docker compose restart homeassistant`` to take effect.
#
# Once running, open http://localhost:8123 and:
# 1. Onboard with any name / location.
# 2. Settings -> Devices & Services -> Add Integration ->
# "HAI/Leviton Omni Panel".
# 3. Use:
# host host.docker.internal
# port 14369
# controller_key 000102030405060708090a0b0c0d0e0f
# (matches scripts/run_mock_panel.py defaults)
# 3. Use one of:
# Mock panel (TCP):
# host host.docker.internal
# port 14369
# transport TCP
# controller_key 000102030405060708090a0b0c0d0e0f
# Real panel (UDP, v1 wire protocol):
# host <panel IP, e.g. 192.168.1.9>
# port 4369
# transport UDP
# controller_key <32 hex chars from the panel's .pca file>
services:
mock-panel:
@ -31,6 +43,8 @@ services:
- "uv pip install --system --quiet cryptography && python /tmp/mock/run_mock_panel.py --host 0.0.0.0 --port 14369"
ports:
- "14369:14369"
networks:
- default
homeassistant:
image: ghcr.io/home-assistant/home-assistant:2026.5
@ -40,9 +54,48 @@ services:
volumes:
- ./ha-config:/config
- ../custom_components/omni_pca:/config/custom_components/omni_pca:ro
# Make the whole library project (pyproject + src/ + dist/) available
# so the entrypoint override below can pip-install from local source
# before /init starts. This gives HA real dist-info for
# ``omni-pca==2026.5.10`` (which isn't on PyPI yet) and ensures the
# v1 subpackage is present.
- ../:/opt/omni-pca-src:ro
# Keep 8123 mapped on localhost for direct access during development;
# public traffic comes in via caddy-docker-proxy on the `caddy` net.
ports:
- "8123:8123"
extra_hosts:
- "host.docker.internal:host-gateway"
environment:
- TZ=America/Boise
networks:
- default
- caddy
labels:
caddy: juliet.warehack.ing
caddy.reverse_proxy: "{{upstreams 8123}}"
# HA uses WebSockets for the frontend (lovelace state updates,
# config flow, etc.) so we need the streaming-friendly settings
# from CLAUDE.md, otherwise caddy closes the socket every ~15s.
caddy.reverse_proxy.flush_interval: "-1"
caddy.reverse_proxy.transport: http
caddy.reverse_proxy.transport.read_timeout: "0"
caddy.reverse_proxy.transport.write_timeout: "0"
caddy.reverse_proxy.transport.keepalive: 5m
caddy.reverse_proxy.transport.keepalive_idle_conns: "10"
caddy.reverse_proxy.stream_timeout: 24h
caddy.reverse_proxy.stream_close_delay: 5s
# HA's image entrypoint is /init (s6-overlay). We pre-install our
# local library against site-packages so HA's manifest-requirement
# check finds it, then exec /init normally.
entrypoint:
- sh
- -c
- |
set -e
pip install --quiet --no-deps --upgrade /opt/omni-pca-src
exec /init
networks:
caddy:
external: true

151
dev/probe_v1.py Normal file
View File

@ -0,0 +1,151 @@
#!/usr/bin/env python3
"""Phase-1 smoke test: v1-over-UDP handshake + RequestSystemInformation.
Run inside the project venv:
cd /home/kdm/home-auto/omni-pca
uv run python dev/probe_v1.py [--host 192.168.1.9] [--port 4369]
Requires the panel's controller key. Picks it up from (in order):
1. ``--key 32hex`` on the command line
2. ``OMNI_KEY`` env var
3. ``dev/.omni_key`` file (gitignored)
4. The bundled ``.pca`` plain fixture (developer-only fallback)
Success criteria: panel returns a v1 SystemInformation message (opcode 18)
within the timeout. Failure modes we want to distinguish:
* UDP socket fails to open routing / firewall
* Handshake step 2 timeout wrong port, wrong panel
* Handshake step 4 termination wrong controller key
* SystemInformation timeout v1 path isn't doing what we think
* SystemInformation reply v1-over-UDP is real, proceed to Phase 2
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
from pathlib import Path
from omni_pca.v1.connection import (
HandshakeError,
InvalidEncryptionKeyError,
OmniConnectionV1,
RequestTimeoutError,
)
from omni_pca.opcodes import OmniLinkMessageType
def _load_key(arg_key: str | None) -> bytes:
if arg_key:
return bytes.fromhex(arg_key)
env = os.environ.get("OMNI_KEY")
if env:
return bytes.fromhex(env)
keyfile = Path(__file__).parent / ".omni_key"
if keyfile.exists():
return bytes.fromhex(keyfile.read_text().strip())
fixture = Path("/home/kdm/home-auto/HAI/pca-re/extracted/Our_House.pca.plain")
if fixture.exists():
from omni_pca.pca_file import (
PcaReader,
_CAP_OMNI_PRO_II,
_parse_header,
_walk_to_connection,
)
r = PcaReader(fixture.read_bytes())
_parse_header(r)
_walk_to_connection(r, _CAP_OMNI_PRO_II)
r.string8_fixed(120) # network_address
r.string8_fixed(5) # port
return bytes.fromhex(r.string8_fixed(32).ljust(32, "0")[:32])
raise SystemExit("no controller key: pass --key, set OMNI_KEY, or create dev/.omni_key")
def _decode_system_information(payload: bytes) -> dict[str, object]:
"""Parse the v1 SystemInformation payload (mirrors clsOLMsgSystemInformation)."""
if len(payload) < 29:
raise ValueError(f"SystemInformation payload too short: {len(payload)} bytes")
return {
"opcode": payload[0],
"model": payload[1],
"fw_major": payload[2],
"fw_minor": payload[3],
"fw_revision": int.from_bytes(payload[4:5], "big", signed=True),
"local_phone": payload[5:29].rstrip(b"\x00").decode("ascii", errors="replace"),
}
async def amain(args: argparse.Namespace) -> int:
key = _load_key(args.key)
print(f"[probe] target {args.host}:{args.port} key=...{key[-2:].hex()} (16 B)")
try:
async with OmniConnectionV1(
host=args.host,
port=args.port,
controller_key=key,
timeout=args.timeout,
retry_count=args.retries,
) as conn:
print(f"[probe] handshake OK state={conn.state.name} "
f"session_key=...{conn.session_key[-2:].hex() if conn.session_key else 'n/a'}")
print("[probe] sending v1 RequestSystemInformation (opcode 17)")
reply = await conn.request(OmniLinkMessageType.RequestSystemInformation)
print(f"[probe] reply: start_char={reply.start_char:#04x} "
f"opcode={reply.opcode} payload={reply.data.hex()}")
if reply.opcode != int(OmniLinkMessageType.SystemInformation):
print(f"[probe] WARNING: expected opcode 18 (SystemInformation), "
f"got {reply.opcode}")
return 2
info = _decode_system_information(reply.data)
print(f"[probe] ✓ v1-over-UDP works")
print(f" model = {info['model']}")
print(f" firmware = {info['fw_major']}.{info['fw_minor']}.{info['fw_revision']}")
print(f" phone = {info['local_phone']!r}")
except InvalidEncryptionKeyError as exc:
print(f"[probe] handshake terminated: wrong controller key? ({exc})")
return 1
except HandshakeError as exc:
print(f"[probe] handshake failed: {exc}")
return 1
except RequestTimeoutError as exc:
print(f"[probe] no reply to RequestSystemInformation: {exc}")
print(" → handshake worked but v1 path isn't responding. "
"Check tcpdump for what's on the wire.")
return 2
except OSError as exc:
print(f"[probe] socket error: {exc}")
return 1
return 0
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--host", default="192.168.1.9")
parser.add_argument("--port", type=int, default=4369)
parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key")
parser.add_argument("--timeout", type=float, default=5.0)
parser.add_argument("--retries", type=int, default=2)
parser.add_argument("--debug", action="store_true",
help="enable DEBUG logging (TX/RX packet dump)")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
return asyncio.run(amain(args))
if __name__ == "__main__":
sys.exit(main())

122
dev/probe_v1_client.py Normal file
View File

@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""Phase-2a smoke test: drive OmniClientV1 against the real panel.
Hits the read-only methods we care about for HA polling. Compares parsed
values against the recon dump so we catch off-by-one byte errors fast.
Run:
cd /home/kdm/home-auto/omni-pca
uv run python dev/probe_v1_client.py
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from probe_v1 import _load_key # type: ignore # noqa: E402
from omni_pca.v1 import OmniClientV1, OmniNakError
async def amain(args: argparse.Namespace) -> int:
key = _load_key(args.key)
print(f"[client probe] target {args.host}:{args.port}\n")
async with OmniClientV1(
host=args.host, port=args.port, controller_key=key, timeout=4.0,
) as c:
info = await c.get_system_information()
print(f"system: model={info.model_name} fw={info.firmware_version} "
f"phone={info.local_phone!r}")
print("\n--- discovery (streaming UploadNames) ---")
all_names = await c.list_all_names()
for type_byte in sorted(all_names):
try:
from omni_pca.v1 import NameType
label = NameType(type_byte).name
except ValueError:
label = f"type{type_byte}"
print(f" {label} ({len(all_names[type_byte])} entries)")
for num in sorted(all_names[type_byte]):
print(f" #{num}: {all_names[type_byte][num]!r}")
try:
sysstatus = await c.get_system_status()
print(f"status: time={sysstatus.panel_time} "
f"battery=0x{sysstatus.battery_reading:02x} "
f"sunrise={sysstatus.sunrise_hour:02d}:{sysstatus.sunrise_minute:02d} "
f"sunset={sysstatus.sunset_hour:02d}:{sysstatus.sunset_minute:02d} "
f"area_modes={[m for m, _ in sysstatus.area_alarms]}")
except Exception as exc:
print(f"system status failed: {type(exc).__name__}: {exc}")
print("\n--- zones 1..16 ---")
zones = await c.get_zone_status(1, 16)
for idx in sorted(zones):
z = zones[idx]
flags = []
if z.is_open: flags.append("open")
if z.is_in_alarm: flags.append("alarm")
if z.is_bypassed: flags.append("bypass")
if z.is_trouble: flags.append("trouble")
tag = ",".join(flags) or "secure"
print(f" zone {idx:2d}: status=0x{z.raw_status:02x} loop=0x{z.loop:02x} ({tag})")
print("\n--- units 1..16 ---")
units = await c.get_unit_status(1, 16)
for idx in sorted(units):
u = units[idx]
br = u.brightness
br_s = f"{br}%" if br is not None else "n/a"
print(f" unit {idx:2d}: state=0x{u.state:02x} ({br_s}) "
f"time_remaining={u.time_remaining_secs}s")
print("\n--- thermostats 1..4 ---")
try:
tstats = await c.get_thermostat_status(1, 4)
for idx in sorted(tstats):
t = tstats[idx]
print(f" tstat {idx}: status=0x{t.status:02x} "
f"temp_F={t.temperature_f:.1f} "
f"heat={t.heat_setpoint_f:.0f} cool={t.cool_setpoint_f:.0f} "
f"mode=0x{t.system_mode:02x} fan=0x{t.fan_mode:02x} "
f"hold=0x{t.hold_mode:02x}")
except OmniNakError as exc:
print(f" no thermostats configured: {exc}")
print("\n--- aux 1..8 ---")
try:
auxes = await c.get_aux_status(1, 8)
for idx in sorted(auxes):
a = auxes[idx]
print(f" aux {idx}: output=0x{a.output:02x} value=0x{a.value_raw:02x} "
f"low=0x{a.low_raw:02x} high=0x{a.high_raw:02x}")
except OmniNakError as exc:
print(f" no aux sensors: {exc}")
print("\n[client probe] ✓ disconnected cleanly")
return 0
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--host", default="192.168.1.9")
parser.add_argument("--port", type=int, default=4369)
parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key")
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.WARNING,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
return asyncio.run(amain(args))
if __name__ == "__main__":
sys.exit(main())

129
dev/probe_v1_coordinator.py Normal file
View File

@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""Phase-3 smoke test: drive OmniClientV1Adapter through the same
sequence the HA coordinator runs in async_config_entry_first_refresh.
Doesn't pull in HA; just executes the discovery + initial poll pattern
against the real panel and prints what an OmniData snapshot would look
like. If this works, the actual HA coordinator should work too.
Run:
cd /home/kdm/home-auto/omni-pca
uv run python dev/probe_v1_coordinator.py
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from probe_v1 import _load_key # type: ignore # noqa: E402
from omni_pca.models import ObjectType
from omni_pca.v1 import OmniClientV1Adapter
async def amain(args: argparse.Namespace) -> int:
key = _load_key(args.key)
print(f"[coord probe] target {args.host}:{args.port}\n")
async with OmniClientV1Adapter(
host=args.host, port=args.port, controller_key=key, timeout=10.0
) as c:
# ---- 1. SystemInformation ----
info = await c.get_system_information()
print(f"=== SystemInformation ===\n"
f" model={info.model_name} fw={info.firmware_version}\n")
# ---- 2. Discovery: per-type names + synthesized properties ----
print("=== Discovery (UploadNames stream + synth Properties) ===")
zone_names = await c.list_zone_names()
unit_names = await c.list_unit_names()
area_names = await c.list_area_names()
tstat_names = await c.list_thermostat_names()
button_names = await c.list_button_names()
print(f" zones: {len(zone_names)}")
print(f" units: {len(unit_names)}")
print(f" areas: {len(area_names)} (fallback if 0 streamed)")
print(f" thermostats: {len(tstat_names)}")
print(f" buttons: {len(button_names)}")
print()
# Sanity-check that get_object_properties returns a real dataclass
# for one zone, one unit, one area, one thermostat, one button.
for type_byte, name_dict, label in [
(ObjectType.ZONE, zone_names, "Zone"),
(ObjectType.UNIT, unit_names, "Unit"),
(ObjectType.AREA, area_names, "Area"),
(ObjectType.THERMOSTAT, tstat_names, "Thermostat"),
(ObjectType.BUTTON, button_names, "Button"),
]:
if not name_dict:
print(f" {label}: no entries, skipping property synth")
continue
idx = min(name_dict)
props = await c.get_object_properties(type_byte, idx)
print(f" {label} #{idx}: {props}")
print()
# ---- 3. Polling: bulk status for each type, plus area derivation ----
print("=== Polling (bulk status) ===")
if zone_names:
zone_end = max(zone_names)
zones = await c.get_extended_status(ObjectType.ZONE, 1, zone_end)
open_zones = [z for z in zones if getattr(z, "is_open", False)]
print(f" ZoneStatus[1..{zone_end}]: {len(zones)} records, "
f"{len(open_zones)} currently open")
if unit_names:
unit_end = max(unit_names)
units = await c.get_extended_status(ObjectType.UNIT, 1, unit_end)
on_units = [u for u in units if getattr(u, "is_on", False)]
print(f" UnitStatus[1..{unit_end}]: {len(units)} records, "
f"{len(on_units)} currently on")
if tstat_names:
tstat_end = max(tstat_names)
tstats = await c.get_extended_status(
ObjectType.THERMOSTAT, 1, tstat_end
)
print(f" ThermostatStatus[1..{tstat_end}]: {len(tstats)} records")
# Areas: derived from SystemStatus
if area_names:
area_end = max(area_names)
areas = await c.get_object_status(ObjectType.AREA, 1, area_end)
modes = [a.mode for a in areas]
print(f" AreaStatus[1..{area_end}]: {len(areas)} records, "
f"modes={modes}")
print()
# ---- 4. SystemStatus ----
status = await c.get_system_status()
print(f"=== SystemStatus ===\n"
f" panel_time={status.panel_time} "
f"battery=0x{status.battery_reading:02x}\n"
f" sunrise={status.sunrise_hour:02d}:{status.sunrise_minute:02d} "
f"sunset={status.sunset_hour:02d}:{status.sunset_minute:02d}\n")
print("[coord probe] ✓ full discovery + poll cycle worked over v1+UDP")
return 0
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--host", default="192.168.1.9")
parser.add_argument("--port", type=int, default=4369)
parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key")
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.WARNING,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
return asyncio.run(amain(args))
if __name__ == "__main__":
sys.exit(main())

149
dev/probe_v1_recon.py Normal file
View File

@ -0,0 +1,149 @@
#!/usr/bin/env python3
"""Phase-2 reconnaissance: fetch v1 status replies from the real panel.
Doesn't parse — just dumps the raw payload bytes for each known v1 opcode
so we can match them against the C# message classes before writing
parsers. Builds the picture of what your panel actually has configured.
Run:
cd /home/kdm/home-auto/omni-pca
uv run python dev/probe_v1_recon.py [--debug]
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import sys
from pathlib import Path
# Reuse the key loader from probe_v1.
sys.path.insert(0, str(Path(__file__).parent))
from probe_v1 import _load_key # type: ignore # noqa: E402
from omni_pca.opcodes import OmniLinkMessageType
from omni_pca.v1.connection import OmniConnectionV1, RequestTimeoutError
async def _request_or_warn(
conn: OmniConnectionV1,
label: str,
opcode: OmniLinkMessageType,
payload: bytes = b"",
expected_opcode: int | None = None,
) -> None:
print(f"--- {label} (req opcode {int(opcode)}, payload {payload.hex() or '<empty>'}) ---")
try:
reply = await conn.request(opcode, payload, timeout=4.0)
except RequestTimeoutError as exc:
print(f" TIMEOUT: {exc}")
return
except Exception as exc:
print(f" ERROR: {type(exc).__name__}: {exc}")
return
print(f" reply opcode = {reply.opcode}")
print(f" payload ({len(reply.payload)} B) = {reply.payload.hex()}")
if expected_opcode is not None and reply.opcode != expected_opcode:
print(f" NOTE: expected opcode {expected_opcode}, got {reply.opcode}")
async def amain(args: argparse.Namespace) -> int:
key = _load_key(args.key)
print(f"[recon] target {args.host}:{args.port}\n")
async with OmniConnectionV1(
host=args.host,
port=args.port,
controller_key=key,
timeout=4.0,
retry_count=1,
) as conn:
print(f"handshake OK state={conn.state.name}\n")
# --- panel-wide ---
await _request_or_warn(
conn, "SystemInformation", OmniLinkMessageType.RequestSystemInformation,
expected_opcode=int(OmniLinkMessageType.SystemInformation),
)
await _request_or_warn(
conn, "SystemStatus", OmniLinkMessageType.RequestSystemStatus,
expected_opcode=int(OmniLinkMessageType.SystemStatus),
)
await _request_or_warn(
conn, "StatusSummary", OmniLinkMessageType.RequestStatusSummary,
expected_opcode=int(OmniLinkMessageType.StatusSummary),
)
# --- bulk status, small ranges so we can read the bytes ---
await _request_or_warn(
conn, "ZoneStatus[1..8]", OmniLinkMessageType.RequestZoneStatus,
payload=bytes([1, 8]),
expected_opcode=int(OmniLinkMessageType.ZoneStatus),
)
await _request_or_warn(
conn, "ZoneExtendedStatus[1..8]", OmniLinkMessageType.RequestZoneExtendedStatus,
payload=bytes([1, 8]),
expected_opcode=int(OmniLinkMessageType.ZoneExtendedStatus),
)
await _request_or_warn(
conn, "UnitStatus[1..8]", OmniLinkMessageType.RequestUnitStatus,
payload=bytes([1, 8]),
expected_opcode=int(OmniLinkMessageType.UnitStatus),
)
await _request_or_warn(
conn, "UnitExtendedStatus[1..8]", OmniLinkMessageType.RequestUnitExtendedStatus,
payload=bytes([1, 8]),
expected_opcode=int(OmniLinkMessageType.UnitExtendedStatus),
)
await _request_or_warn(
conn, "ThermostatStatus[1..4]", OmniLinkMessageType.RequestThermostatStatus,
payload=bytes([1, 4]),
expected_opcode=int(OmniLinkMessageType.ThermostatStatus),
)
await _request_or_warn(
conn, "ThermostatExtendedStatus[1..4]", OmniLinkMessageType.RequestThermostatExtendedStatus,
payload=bytes([1, 4]),
expected_opcode=int(OmniLinkMessageType.ThermostatExtendedStatus),
)
await _request_or_warn(
conn, "AuxiliaryStatus[1..8]", OmniLinkMessageType.RequestAuxiliaryStatus,
payload=bytes([1, 8]),
expected_opcode=int(OmniLinkMessageType.AuxiliaryStatus),
)
# --- discovery: UploadNames is the READ request; DownloadNames is the
# WRITE direction (panel <- client). Reply payload is NameData with the
# next defined object's number + name.
# Per clsOL2MsgUploadNames: [type, num_hi, num_lo, relative_direction].
# type: 1=Zone 2=Unit 3=Button 4=Code 5=Thermostat 6=Area 7=Message
# relative_direction: +1=next after num, -1=prev before num, 0=exact
for type_byte, type_name in [(1, "Zone"), (2, "Unit"), (5, "Thermostat"), (6, "Area")]:
await _request_or_warn(
conn,
f"UploadNames[type={type_name}, after=0, dir=+1]",
OmniLinkMessageType.UploadNames,
payload=bytes([type_byte, 0, 0, 1]),
expected_opcode=int(OmniLinkMessageType.NameData),
)
print("\n--- recon complete, session closed cleanly ---")
return 0
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--host", default="192.168.1.9")
parser.add_argument("--port", type=int, default=4369)
parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key")
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.WARNING,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
return asyncio.run(amain(args))
if __name__ == "__main__":
sys.exit(main())

118
dev/probe_v1_stream.py Normal file
View File

@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""Probe the v1 UploadNames streaming flow.
Sends UploadNames (no payload), then a series of Acknowledge messages,
dumping each reply until we get an EOD or 30 records (whichever comes
first). Confirms the lock-step pattern PC Access uses for bulk reads.
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from probe_v1 import _load_key # type: ignore # noqa: E402
from omni_pca.opcodes import OmniLinkMessageType
from omni_pca.v1 import OmniConnectionV1
_NAME_TYPE_LABELS = {
1: "Zone", 2: "Unit", 3: "Button", 4: "Code",
5: "Thermostat", 6: "Area", 7: "Message",
}
def _decode_namedata(payload: bytes) -> str:
"""Best-effort decode of a NameData payload for display."""
if len(payload) < 3:
return f"<short payload: {payload.hex()}>"
name_type = payload[0]
# Heuristic: zones/messages are 15-char names, others 12. With one-byte
# NameNumber, payload length = 1 (type) + 1 (num) + L (name) + 1 (term).
# With two-byte NameNumber: 1 + 2 + L + 1.
L_15 = 15 + 3 # one-byte form, 15-char name
L_12 = 12 + 3 # one-byte form, 12-char name
if len(payload) == L_15 or len(payload) == L_15 + 1:
# 15-char name (Zone or Message), one-byte num.
num = payload[1]
name = payload[2:2 + 15].rstrip(b"\x00").decode("utf-8", errors="replace")
elif len(payload) == L_12 or len(payload) == L_12 + 1:
# 12-char name, one-byte num.
num = payload[1]
name = payload[2:2 + 12].rstrip(b"\x00").decode("utf-8", errors="replace")
else:
# Two-byte num form (NameNumber > 255): payload[1..2] = BE u16, then name.
num = (payload[1] << 8) | payload[2]
name = payload[3:].rstrip(b"\x00").decode("utf-8", errors="replace")
label = _NAME_TYPE_LABELS.get(name_type, f"type{name_type}")
return f"{label} #{num}: {name!r}"
async def amain(args: argparse.Namespace) -> int:
key = _load_key(args.key)
print(f"[stream probe] target {args.host}:{args.port}\n")
async with OmniConnectionV1(
host=args.host, port=args.port, controller_key=key, timeout=4.0
) as conn:
from omni_pca.message import Message, START_CHAR_V1_UNADDRESSED
# Step 1: bare UploadNames.
upload = Message(
start_char=START_CHAR_V1_UNADDRESSED,
data=bytes([int(OmniLinkMessageType.UploadNames)]),
)
seq, fut = conn._send_encrypted(upload)
reply = conn._decode_inner(await fut)
print(f"reply 1 (seq={seq}): opcode={reply.opcode} {_decode_namedata(reply.payload) if reply.opcode == int(OmniLinkMessageType.NameData) else f'(payload={reply.payload.hex()!r})'}")
if reply.opcode != int(OmniLinkMessageType.NameData):
print("panel didn't reply with NameData — streaming flow may not apply here")
return 0
# Step 2..N: Acknowledge → next NameData (or EOD).
for i in range(2, args.max + 1):
ack = Message(
start_char=START_CHAR_V1_UNADDRESSED,
data=bytes([int(OmniLinkMessageType.Ack)]),
)
seq, fut = conn._send_encrypted(ack)
reply = conn._decode_inner(await fut)
if reply.opcode == int(OmniLinkMessageType.EOD):
print(f"reply {i} (seq={seq}): EOD — end of stream after {i-1} records")
return 0
if reply.opcode == int(OmniLinkMessageType.NameData):
print(f"reply {i} (seq={seq}): {_decode_namedata(reply.payload)}")
else:
print(f"reply {i} (seq={seq}): unexpected opcode {reply.opcode}, "
f"payload={reply.payload.hex()}")
return 1
print(f"\nstopped after {args.max} replies (no EOD seen)")
return 0
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--host", default="192.168.1.9")
parser.add_argument("--port", type=int, default=4369)
parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key")
parser.add_argument("--max", type=int, default=20, help="stop after N replies")
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.WARNING,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
return asyncio.run(amain(args))
if __name__ == "__main__":
sys.exit(main())

93
dev/probe_v1_write.py Normal file
View File

@ -0,0 +1,93 @@
#!/usr/bin/env python3
"""Phase-2c live write smoke test: round-trip a no-op unit command.
Reads the current state of one unit, sends a command that should yield
the same observable result, then re-reads to confirm. Proves that
:meth:`OmniClientV1.execute_command` actually flows through the v1
Command opcode against the real panel without changing anything visible.
Run:
cd /home/kdm/home-auto/omni-pca
uv run python dev/probe_v1_write.py [--index N]
Default target is unit #4 ('STAIRS' per current panel config).
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from probe_v1 import _load_key # type: ignore # noqa: E402
from omni_pca.v1 import OmniClientV1
async def amain(args: argparse.Namespace) -> int:
key = _load_key(args.key)
print(f"[write probe] target {args.host}:{args.port} unit #{args.index}\n")
async with OmniClientV1(
host=args.host, port=args.port, controller_key=key, timeout=4.0
) as c:
before = (await c.get_unit_status(args.index, args.index))[args.index]
print(f"BEFORE: state=0x{before.state:02x} "
f"brightness={before.brightness!r} "
f"time_remaining={before.time_remaining_secs}s")
# Pick the safest no-op command for the unit's current state:
# - state == 0 → send UNIT_OFF (already off, panel acks)
# - state == 1 → send UNIT_ON (already on, panel acks)
# - 100 <= state <= 200 → set_unit_level(percent) at the current level
# - otherwise (scene/dim/etc.) → fall back to UNIT_ON which is harmless
if before.state == 0:
print("ACTION: turn_unit_off (already off, expecting Ack)")
await c.turn_unit_off(args.index)
elif before.state == 1:
print("ACTION: turn_unit_on (already on, expecting Ack)")
await c.turn_unit_on(args.index)
elif 100 <= before.state <= 200:
level = before.state - 100
print(f"ACTION: set_unit_level({level}%) (already at this level)")
await c.set_unit_level(args.index, level)
else:
print(f"ACTION: turn_unit_on (state=0x{before.state:02x} is exotic; safe ack expected)")
await c.turn_unit_on(args.index)
# Give the panel ~250ms to settle if it does pulse anything.
await asyncio.sleep(0.25)
after = (await c.get_unit_status(args.index, args.index))[args.index]
print(f"AFTER: state=0x{after.state:02x} "
f"brightness={after.brightness!r} "
f"time_remaining={after.time_remaining_secs}s")
if after.state == before.state:
print("\n✓ panel acked the Command, state unchanged — wire path verified")
else:
print(f"\n⚠ state changed (0x{before.state:02x} → 0x{after.state:02x}). "
"Probably harmless but worth investigating.")
return 0
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--host", default="192.168.1.9")
parser.add_argument("--port", type=int, default=4369)
parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key")
parser.add_argument("--index", type=int, default=4)
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.WARNING,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
return asyncio.run(amain(args))
if __name__ == "__main__":
sys.exit(main())

View File

@ -32,7 +32,17 @@ async def _complete_onboarding(
) -> None:
"""POST every remaining onboarding step in turn so HA stops greeting us."""
r = await client.get("/api/onboarding")
pending = [s["step"] for s in r.json() if not s.get("done")]
if r.status_code != 200:
# Endpoint disappears once onboarding is fully complete — nothing
# to do, the user is already past the welcome wizard.
print(" onboarding endpoint 404 — already complete")
return
try:
steps = r.json()
except Exception:
print(" onboarding endpoint returned non-JSON — assuming complete")
return
pending = [s["step"] for s in steps if not s.get("done")]
print(f" pending onboarding: {pending}")
if "core_config" in pending:
@ -73,7 +83,15 @@ async def _onboard(ha_url: str) -> str:
"""
async with httpx.AsyncClient(base_url=ha_url, timeout=30.0) as client:
r = await client.get("/api/onboarding")
steps = r.json()
# Once onboarding is fully complete the endpoint 404s with a
# plain-text body instead of a JSON step list — skip straight to
# the subsequent-run login path in that case.
steps: list[dict] = []
if r.status_code == 200:
try:
steps = r.json()
except Exception:
steps = []
user_step = next((s for s in steps if s["step"] == "user"), None)
if user_step and not user_step.get("done"):
@ -199,7 +217,8 @@ async def _take_screenshots(ha_url: str, token: str, outdir: Path) -> list[Path]
viewport={"width": 1440, "height": 900},
device_scale_factor=2,
)
# Inject auth so we skip the login screen.
# Inject auth so we skip the login screen + force HA's dark theme
# so screenshots match the docs site's default theme.
await context.add_init_script(
f"""window.localStorage.setItem('hassTokens', JSON.stringify({{
access_token: '{token}',
@ -211,6 +230,15 @@ async def _take_screenshots(ha_url: str, token: str, outdir: Path) -> list[Path]
refresh_token: 'placeholder',
}}));
window.localStorage.setItem('selectedLanguage', '"en"');
// Force dark theme HA reads selectedTheme from localStorage
// before the user-settings panel loads. The empty 'theme' object
// tells HA "use the default dark theme, not a custom one".
window.localStorage.setItem('selectedTheme', JSON.stringify({{
theme: 'default',
dark: true,
primaryColor: null,
accentColor: null,
}}));
"""
)
page = await context.new_page()

View File

@ -1,6 +1,6 @@
[project]
name = "omni-pca"
version = "2026.5.10"
version = "2026.5.11"
description = "Async Python client for HAI/Leviton Omni-Link II home automation panels (Omni Pro II, Omni IIe, Omni LTe, Lumina)."
readme = "README.md"
license = { text = "MIT" }

View File

@ -127,17 +127,22 @@ class UpbLinkAction(IntEnum):
# --------------------------------------------------------------------------
def _ensure_system_events(message: Message) -> bytes:
"""Validate that ``message`` is a v2 SystemEvents reply, return its
payload bytes (everything after the opcode).
def _ensure_system_events(
message: Message,
expected_opcode: int = int(OmniLink2MessageType.SystemEvents),
) -> bytes:
"""Validate that ``message`` is a SystemEvents reply, return payload bytes.
Reference: clsOLMsgSystemEvents.cs (entire file) the message body
is just ``[opcode][word1_hi][word1_lo][word2_hi][word2_lo]``.
The v1 and v2 SystemEvents inner-message bodies are byte-identical
(clsOLMsgSystemEvents.cs vs clsOL2MsgSystemEvents.cs both yield
``[opcode][word1_hi][word1_lo][word2_hi][word2_lo]``); only the
opcode byte differs (35 vs 55). Pass ``expected_opcode`` to dispatch
the v1 path from :class:`omni_pca.v1.adapter.OmniClientV1Adapter`.
"""
if message.opcode != int(OmniLink2MessageType.SystemEvents):
if message.opcode != expected_opcode:
raise ValueError(
"not a SystemEvents message: opcode "
f"{message.opcode} (expected {int(OmniLink2MessageType.SystemEvents)})"
f"not a SystemEvents message: opcode {message.opcode} "
f"(expected {expected_opcode})"
)
payload = message.payload
if len(payload) % 2 != 0:
@ -700,18 +705,23 @@ def _classify(word: int) -> SystemEvent:
# --------------------------------------------------------------------------
def parse_events(message: Message) -> list[SystemEvent]:
"""Decode a v2 ``SystemEvents`` (opcode 55) message into typed events.
def parse_events(
message: Message,
expected_opcode: int = int(OmniLink2MessageType.SystemEvents),
) -> list[SystemEvent]:
"""Decode a ``SystemEvents`` message into typed events.
The panel batches multiple state changes into a single message, so
the return type is always a list even for messages that carry just
one event. Empty SystemEvents messages return an empty list rather
than raising.
Reference: clsOLMsgSystemEvents.cs:10-18 (SystemEventsCount + per-
word accessor).
``expected_opcode`` defaults to v2 (55); pass v1's value (35) when
decoding from a ``v1.OmniConnectionV1`` push stream.
Reference: clsOLMsgSystemEvents.cs / clsOL2MsgSystemEvents.cs.
"""
payload = _ensure_system_events(message)
payload = _ensure_system_events(message, expected_opcode)
return [_classify(w) for w in _iter_event_words(payload)]
@ -790,6 +800,7 @@ class EventStream:
"""
source: object # OmniConnection or duck-typed equivalent
expected_opcode: int = int(OmniLink2MessageType.SystemEvents)
_buffer: list[SystemEvent] = field(default_factory=list)
def __post_init__(self) -> None:
@ -817,10 +828,10 @@ class EventStream:
raise
except asyncio.CancelledError:
raise
if msg.opcode != int(OmniLink2MessageType.SystemEvents):
if msg.opcode != self.expected_opcode:
# Non-event message (Status, Ack, …) — silently ignore.
continue
self._buffer = parse_events(msg)
self._buffer = parse_events(msg, self.expected_opcode)
return self._buffer.pop(0)

View File

@ -3,7 +3,7 @@
Wire layout (non-addressable):
``[start_char][length][...data...][crc_lo][crc_hi]``
For v1 addressable messages (StartChar=0x5A) a single SerialAddress byte
For v1 addressable messages (StartChar=0x41) a single SerialAddress byte
is interleaved between start_char and length.
CRC is CRC-16/MODBUS (poly 0xA001, init 0, reflected) computed over the
@ -13,6 +13,8 @@ on the wire (CRC1 = low byte, CRC2 = high byte).
References:
clsOmniLinkMessage.cs (lines 9, 164-186, 273-289) frame + CRC
clsOmniLink2Message.cs (lines 17-23) v2 StartChar = 0x21
enuOmniLinkMessageFormat.cs Addressable=0x41, NonAddressable=0x5A,
OmniLink2=0x21
clsOL2MsgLogin.cs / clsOLMsgLogin.cs example payloads
"""
@ -23,8 +25,8 @@ from dataclasses import dataclass, field
from .opcodes import OmniLink2MessageType, OmniLinkMessageType
START_CHAR_V2 = 0x21
START_CHAR_V1_UNADDRESSED = 0x41
START_CHAR_V1_ADDRESSABLE = 0x5A
START_CHAR_V1_ADDRESSABLE = 0x41
START_CHAR_V1_UNADDRESSED = 0x5A
_CRC_POLY_REFLECTED = 0xA001

View File

@ -0,0 +1,52 @@
"""V1 (legacy) Omni-Link protocol over UDP.
The v2 path in :mod:`omni_pca` (TCP, OmniLink2Message, StartChar 0x21,
parameterised RequestProperties / RequestExtendedStatus) is what most
modern firmware speaks. This subpackage exists because some panels are
configured at the network module to listen on **UDP only**, in which case
PC Access falls back to the v1 wire protocol (typed RequestZoneStatus,
RequestUnitStatus, etc., StartChar 0x5A, OmniLinkMessage outer = 0x10).
Reference: clsOmniLinkConnection.cs:353-360 (ConnectionProtocol() returns
V1 for Modem/UDP/Serial, V2 only for TCP).
"""
from __future__ import annotations
from .adapter import OmniClientV1Adapter
from .client import OmniClientV1, OmniNakError, OmniProtocolError
from .connection import (
HandshakeError,
InvalidEncryptionKeyError,
OmniConnectionV1,
RequestTimeoutError,
)
from .messages import (
NameRecord,
NameType,
parse_v1_aux_status,
parse_v1_namedata,
parse_v1_system_status,
parse_v1_thermostat_status,
parse_v1_unit_status,
parse_v1_zone_status,
)
__all__ = [
"HandshakeError",
"InvalidEncryptionKeyError",
"NameRecord",
"NameType",
"OmniClientV1",
"OmniClientV1Adapter",
"OmniConnectionV1",
"OmniNakError",
"OmniProtocolError",
"RequestTimeoutError",
"parse_v1_aux_status",
"parse_v1_namedata",
"parse_v1_system_status",
"parse_v1_thermostat_status",
"parse_v1_unit_status",
"parse_v1_zone_status",
]

424
src/omni_pca/v1/adapter.py Normal file
View File

@ -0,0 +1,424 @@
"""V2-shape adapter over :class:`OmniClientV1`.
The Home Assistant coordinator was written against :class:`omni_pca.client.OmniClient`
(the v2 API). When the user configures ``transport=udp`` we need a client
that *looks* like ``OmniClient`` but speaks v1-over-UDP underneath.
This adapter exposes only the methods the coordinator and entity
platforms actually call. Where v1 lacks a v2 opcode (Properties for
zones/units/areas, AcknowledgeAlerts), we synthesize a sensible
fallback rather than raise HA users shouldn't have to care that their
panel is on a different wire protocol.
What the adapter does:
* **Discovery (``list_*_names``)**: delegates to ``OmniClientV1`` (which
drives the streaming ``UploadNames`` flow once per call).
* **Properties (``get_object_properties``)**: synthesizes a minimal
``*Properties`` dataclass from the name alone. v1 has no Properties
opcode, so we can't fetch zone_type / unit_type / area_alarms / etc.
Defaults are zero entity platforms read mostly the name + the live
``*Status`` snapshot, so this works for the common case.
* **Bulk status (``get_extended_status``)**: routes Zone/Unit/Thermostat/
AuxSensor through the v1 typed ``get_*_status`` calls and returns the
resulting dataclass list (same shape v2 produces).
* **Area status (``get_object_status(AREA, )``)**: derives ``AreaStatus``
records from the per-area mode bytes in v1 ``SystemStatus`` v1 has
no per-area status opcode and the modes are the only thing the panel
reports on UDP.
* **Events (``events()``)**: returns an :class:`EventStream` filtered on
v1's SystemEvents opcode (35) instead of v2's (55). Word format is
identical, so the existing typed-event decoder works unchanged.
* **Writes**: pass-through to the underlying ``OmniClientV1`` methods,
whose Command / ExecuteSecurityCommand payloads are byte-identical
to v2 only the opcode differs.
"""
from __future__ import annotations
from collections.abc import AsyncIterator, Awaitable, Callable
from typing import Self
from ..commands import Command
from ..events import EventStream, SystemEvent
from ..models import (
AreaProperties,
AreaStatus,
AuxSensorStatus,
ButtonProperties,
ObjectType,
SecurityMode,
SystemInformation,
SystemStatus,
ThermostatProperties,
ThermostatStatus,
UnitProperties,
UnitStatus,
ZoneProperties,
ZoneStatus,
)
from ..opcodes import OmniLinkMessageType
from .client import OmniClientV1
from .connection import OmniConnectionV1
# Type used by coordinator for object_type arg (the IntEnum in
# omni_pca.client is just a re-export of models.ObjectType).
_ObjectType = ObjectType
_DEFAULT_PORT = 4369
class OmniClientV1Adapter:
"""V2-shaped facade over :class:`OmniClientV1`.
Construct with the same kwargs as :class:`OmniClient`; the
coordinator does not need to know which one it has.
"""
def __init__(
self,
host: str,
port: int = _DEFAULT_PORT,
controller_key: bytes = b"",
timeout: float = 5.0,
retry_count: int = 3,
**_ignored,
) -> None:
# `transport=` and similar kwargs are accepted-and-ignored so the
# coordinator's construction call stays identical across v1/v2.
self._client = OmniClientV1(
host=host,
port=port,
controller_key=controller_key,
timeout=timeout,
retry_count=retry_count,
)
# ---- lifecycle ------------------------------------------------------
async def __aenter__(self) -> Self:
await self._client.__aenter__()
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
await self._client.__aexit__(exc_type, exc, tb)
@property
def connection(self) -> OmniConnectionV1:
"""Underlying :class:`OmniConnectionV1` — used by the coordinator's
low-level walks. v1's connection has the same ``unsolicited()`` /
``request()`` surface as v2's, just a different wire dialect.
"""
return self._client.connection
# ---- panel-wide reads ----------------------------------------------
async def get_system_information(self) -> SystemInformation:
return await self._client.get_system_information()
async def get_system_status(self) -> SystemStatus:
return await self._client.get_system_status()
# ---- discovery (cached once per coordinator setup) -----------------
#
# The coordinator calls list_*_names() once per object type. Each
# call drives a fresh UploadNames stream, which on this panel takes
# ~250ms per ~100 names. We cache the full bucketed dict on first
# call so the four list_*_names() calls + several synthesize-
# properties calls all share one network roundtrip.
async def _ensure_names(self) -> dict[int, dict[int, str]]:
cached = getattr(self, "_name_cache", None)
if cached is None:
cached = await self._client.list_all_names()
self._name_cache = cached
return cached
def _invalidate_names(self) -> None:
"""Force the next discovery call to re-stream UploadNames."""
self._name_cache = None # type: ignore[assignment]
async def list_zone_names(self) -> dict[int, str]:
return (await self._ensure_names()).get(1, {}) # NameType.ZONE
async def list_unit_names(self) -> dict[int, str]:
return (await self._ensure_names()).get(2, {}) # NameType.UNIT
async def list_area_names(self) -> dict[int, str]:
"""Return area names, falling back to "Area N" when stream is empty.
Most v1 panels don't expose user-assigned area names — the slots
exist (8 for Omni Pro II) but the .pca file leaves them zero-
filled. HA needs *something* to label each area entity, so we
synthesize "Area 1".."Area 8" as a fixed-size fallback. The 8
is the Omni Pro II cap; we cap here even when ``SystemStatus``
reports more mode bytes because the long-form SystemStatus
payload mixes in EE-expansion telemetry past byte 22.
"""
named = (await self._ensure_names()).get(5, {}) # NameType.AREA
if named:
return named
return {i: f"Area {i}" for i in range(1, 9)}
async def list_thermostat_names(self) -> dict[int, str]:
return (await self._ensure_names()).get(6, {}) # NameType.THERMOSTAT
async def list_button_names(self) -> dict[int, str]:
return (await self._ensure_names()).get(3, {}) # NameType.BUTTON
async def list_code_names(self) -> dict[int, str]:
return (await self._ensure_names()).get(4, {}) # NameType.CODE
async def list_message_names(self) -> dict[int, str]:
return (await self._ensure_names()).get(7, {}) # NameType.MESSAGE
# ---- properties synthesis ------------------------------------------
async def get_object_properties(
self, object_type: ObjectType, index: int
) -> ZoneProperties | UnitProperties | AreaProperties | ThermostatProperties | None:
"""Synthesize a Properties dataclass from the name alone.
v1 has no ``RequestProperties`` opcode; the rich fields v2 carries
(zone_type, unit areas bitfield, exit/entry delays, ) simply
aren't reachable on UDP. We return a minimal dataclass with just
``index`` + ``name`` populated and everything else defaulted to
0/False so entity setup doesn't need a transport branch.
Returns ``None`` if the object isn't defined (no name and not in
the default area-fallback range), which mirrors v2's behavior
when ``RequestProperties`` walks past the last defined object.
"""
names = await self._ensure_names()
if object_type == ObjectType.ZONE:
name = names.get(1, {}).get(index)
if not name:
return None
return ZoneProperties(
index=index, name=name, zone_type=0, area=1,
options=0, status=0, loop=0,
)
if object_type == ObjectType.UNIT:
name = names.get(2, {}).get(index)
if not name:
return None
return UnitProperties(
index=index, name=name, unit_type=0,
status=0, time=0, areas=0,
)
if object_type == ObjectType.AREA:
# Use the same fallback logic as list_area_names so HA always
# gets at least the 8 default-area entries.
label = (await self.list_area_names()).get(index)
if label is None:
return None
return AreaProperties(
index=index, name=label, mode=0, alarms=0,
enabled=True, entry_delay=0, exit_delay=0,
)
if object_type == ObjectType.THERMOSTAT:
name = names.get(6, {}).get(index)
if not name:
return None
return ThermostatProperties(
index=index, name=name, thermostat_type=0,
communicating=True,
)
if object_type == ObjectType.BUTTON:
name = names.get(3, {}).get(index)
if not name:
return None
return ButtonProperties(index=index, name=name)
return None
# ---- bulk status ---------------------------------------------------
# Per-type max records per chunk. Empirically firmware 2.12 caps unit
# responses around 62 records regardless of the MessageLength byte
# limit; other types follow similar conservative caps. We chunk well
# under those thresholds to leave headroom for any per-firmware
# variance and the AES zero-padding the wire frames add.
_CHUNK_SIZES: dict[int, int] = {
ObjectType.ZONE: 80, # 2 B/rec, panel caps high enough
ObjectType.UNIT: 40, # firmware 2.12 NAKs at 63+ records
ObjectType.THERMOSTAT: 30,
ObjectType.AUXILIARY: 60,
}
async def get_extended_status(
self,
object_type: ObjectType,
start: int,
end: int | None = None,
) -> list:
"""Route v2 ``get_extended_status`` to the matching v1 typed call.
v1 panels (Omni Pro II) can have 511 units across a sparse
address space. We chunk wide ranges into per-type-sized batches
and concatenate the records same effect for the caller, only
the wire transcript is different.
"""
last = end if end is not None else start
if object_type == ObjectType.ZONE:
fetch = self._client.get_zone_status
elif object_type == ObjectType.UNIT:
fetch = self._client.get_unit_status
elif object_type == ObjectType.THERMOSTAT:
fetch = self._client.get_thermostat_status
elif object_type == ObjectType.AUXILIARY:
fetch = self._client.get_aux_status
else:
raise ValueError(
f"v1 has no bulk extended-status opcode for {object_type.name}"
)
chunk = self._CHUNK_SIZES.get(int(object_type), 40)
out: dict[int, object] = {}
cur = start
while cur <= last:
chunk_end = min(cur + chunk - 1, last)
records = await fetch(cur, chunk_end)
out.update(records)
cur = chunk_end + 1
return [out[i] for i in sorted(out)]
async def get_object_status(
self,
object_type: ObjectType,
start: int,
end: int | None = None,
) -> list:
"""Synthesize AreaStatus from SystemStatus's per-area mode bytes.
v1 has no per-area status opcode but the SystemStatus payload
carries one ``Mode`` byte per area (single-area panels see one
byte at offset 15, multi-area panels see N consecutive bytes).
We promote each into an :class:`AreaStatus` with just ``index``
and ``mode`` populated; entry/exit timers and alarms are zero
because the protocol doesn't expose them at this level.
For non-area object types we fall back to extended-status, which
on v1 maps to the basic typed-status opcodes (which is what the
v2 coordinator actually wants anyway since v2's basic and
extended status are interchangeable in shape).
"""
if object_type != ObjectType.AREA:
return await self.get_extended_status(object_type, start, end)
last = end if end is not None else start
status = await self._client.get_system_status()
# First N bytes of area_alarms are valid area modes; the rest are
# EE-expansion data on long SystemStatus payloads (firmware 2.12
# length=39 form). We can't reliably tell where modes stop, so
# match against the list_area_names() count from the same
# SystemStatus.
area_count = max(1, min(8, len(status.area_alarms)))
out: list[AreaStatus] = []
for idx in range(start, min(last, area_count) + 1):
mode_pair = (
status.area_alarms[idx - 1] if idx - 1 < len(status.area_alarms)
else (0, 0)
)
out.append(
AreaStatus(
index=idx,
mode=mode_pair[0],
last_user=0,
entry_timer_secs=0,
exit_timer_secs=0,
alarms=mode_pair[1],
)
)
return out
# ---- events --------------------------------------------------------
def events(self) -> AsyncIterator[SystemEvent]:
"""v1-aware EventStream — filters on v1 SystemEvents opcode (35)."""
return EventStream(
self._client.connection,
expected_opcode=int(OmniLinkMessageType.SystemEvents),
).__aiter__()
async def subscribe(
self, callback: Callable[[object], Awaitable[None]]
) -> None:
"""Not used by the coordinator (which prefers ``events()``); kept
for API parity with :class:`OmniClient`. Raises ``NotImplementedError``
to flag accidental use when we need it, copy the v2 implementation.
"""
raise NotImplementedError(
"OmniClientV1Adapter.subscribe is not implemented; "
"use events() instead"
)
# ---- writes (pure pass-through) ------------------------------------
async def execute_command(
self, command: Command, parameter1: int = 0, parameter2: int = 0
) -> None:
await self._client.execute_command(command, parameter1, parameter2)
async def execute_security_command(
self, area: int, mode: SecurityMode, code: int
) -> None:
await self._client.execute_security_command(area, mode, code)
async def acknowledge_alerts(self) -> None:
await self._client.acknowledge_alerts()
async def turn_unit_on(self, index: int) -> None:
await self._client.turn_unit_on(index)
async def turn_unit_off(self, index: int) -> None:
await self._client.turn_unit_off(index)
async def set_unit_level(self, index: int, percent: int) -> None:
await self._client.set_unit_level(index, percent)
async def bypass_zone(self, index: int, code: int = 0) -> None:
await self._client.bypass_zone(index, code)
async def restore_zone(self, index: int, code: int = 0) -> None:
await self._client.restore_zone(index, code)
async def execute_button(self, index: int) -> None:
await self._client.execute_button(index)
async def execute_program(self, index: int) -> None:
"""Run a panel program by index.
v1 ``enuUnitCommand.Execute`` (raw byte not aliased in our enum)
and v2 both use a generic Command. The Command enum's
``EXECUTE_PROGRAM`` value works on both because the on-the-wire
Command body is byte-identical.
"""
await self.execute_command(Command.EXECUTE_PROGRAM, parameter2=index)
async def show_message(self, index: int, beep: bool = True) -> None:
await self.execute_command(
Command.SHOW_MESSAGE_WITH_BEEP if beep else Command.SHOW_MESSAGE_NO_BEEP,
parameter2=index,
)
async def clear_message(self, index: int) -> None:
await self.execute_command(Command.CLEAR_MESSAGE, parameter2=index)
async def set_thermostat_system_mode(self, index: int, mode_value: int) -> None:
await self._client.set_thermostat_system_mode(index, mode_value)
async def set_thermostat_fan_mode(self, index: int, mode_value: int) -> None:
await self._client.set_thermostat_fan_mode(index, mode_value)
async def set_thermostat_hold_mode(self, index: int, mode_value: int) -> None:
await self._client.set_thermostat_hold_mode(index, mode_value)
async def set_thermostat_heat_setpoint_raw(
self, index: int, raw_temp: int
) -> None:
await self._client.set_thermostat_heat_setpoint_raw(index, raw_temp)
async def set_thermostat_cool_setpoint_raw(
self, index: int, raw_temp: int
) -> None:
await self._client.set_thermostat_cool_setpoint_raw(index, raw_temp)

463
src/omni_pca/v1/client.py Normal file
View File

@ -0,0 +1,463 @@
"""High-level read-only client for v1-over-UDP Omni-Link panels.
Mirrors the v2 :class:`omni_pca.client.OmniClient` API where the v1 wire
protocol can satisfy the same call. Methods that require v2-only opcodes
(e.g. ``RequestProperties``, ``AcknowledgeAlerts``) are intentionally
absent until Phase 2b/2c add their v1 equivalents (streaming
``UploadNames``, no-op or alternate dispatch).
API parity goals (this module):
get_system_information() same dataclass as v2
get_system_status() same dataclass as v2
get_zone_status(start, end) -> dict uses v1 ZoneStatus
get_unit_status(start, end) -> dict uses v1 UnitStatus
get_thermostat_status(start, end) -> dict uses v1 ThermostatStatus
get_aux_status(start, end) -> dict uses v1 AuxiliaryStatus
"""
from __future__ import annotations
import struct
from collections.abc import AsyncIterator, Callable
from typing import Self
from ..commands import Command, CommandFailedError, SecurityCommandResponse
from ..models import (
AuxSensorStatus,
SecurityMode,
SystemInformation,
SystemStatus,
ThermostatStatus,
UnitStatus,
ZoneStatus,
)
from ..opcodes import OmniLinkMessageType
from .connection import OmniConnectionV1
from .messages import (
NameRecord,
NameType,
parse_v1_aux_status,
parse_v1_namedata,
parse_v1_system_status,
parse_v1_thermostat_status,
parse_v1_unit_status,
parse_v1_zone_status,
)
_DEFAULT_PORT = 4369
class OmniClientV1:
"""Read-only v1-over-UDP Omni-Link client.
.. code-block:: python
async with OmniClientV1("192.168.1.9", controller_key=key) as c:
info = await c.get_system_information()
zones = await c.get_zone_status(1, 16)
"""
def __init__(
self,
host: str,
port: int = _DEFAULT_PORT,
controller_key: bytes = b"",
timeout: float = 5.0,
retry_count: int = 3,
) -> None:
self._conn = OmniConnectionV1(
host=host,
port=port,
controller_key=controller_key,
timeout=timeout,
retry_count=retry_count,
)
@property
def connection(self) -> OmniConnectionV1:
return self._conn
async def __aenter__(self) -> Self:
await self._conn.connect()
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
await self._conn.close()
# ---- panel-wide ----------------------------------------------------
async def get_system_information(self) -> SystemInformation:
"""Fetch model + firmware + dialer phone number.
Wire format identical to v2 (verified per
clsOLMsgSystemInformation.cs vs clsOL2MsgSystemInformation.cs);
we reuse the existing dataclass parser unchanged.
"""
reply = await self._conn.request(
OmniLinkMessageType.RequestSystemInformation
)
self._expect(reply.opcode, OmniLinkMessageType.SystemInformation)
return SystemInformation.parse(reply.payload)
async def get_system_status(self) -> SystemStatus:
"""Fetch panel time, sunrise/sunset, battery reading, area modes."""
reply = await self._conn.request(
OmniLinkMessageType.RequestSystemStatus
)
self._expect(reply.opcode, OmniLinkMessageType.SystemStatus)
return parse_v1_system_status(reply.payload)
# ---- bulk per-object status ----------------------------------------
async def get_zone_status(
self, start: int, end: int
) -> dict[int, ZoneStatus]:
return await self._range_status(
OmniLinkMessageType.RequestZoneStatus,
OmniLinkMessageType.ZoneStatus,
start,
end,
parse_v1_zone_status,
)
async def get_unit_status(
self, start: int, end: int
) -> dict[int, UnitStatus]:
return await self._range_status(
OmniLinkMessageType.RequestUnitStatus,
OmniLinkMessageType.UnitStatus,
start,
end,
parse_v1_unit_status,
)
async def get_thermostat_status(
self, start: int, end: int
) -> dict[int, ThermostatStatus]:
return await self._range_status(
OmniLinkMessageType.RequestThermostatStatus,
OmniLinkMessageType.ThermostatStatus,
start,
end,
parse_v1_thermostat_status,
)
async def get_aux_status(
self, start: int, end: int
) -> dict[int, AuxSensorStatus]:
return await self._range_status(
OmniLinkMessageType.RequestAuxiliaryStatus,
OmniLinkMessageType.AuxiliaryStatus,
start,
end,
parse_v1_aux_status,
)
# ---- discovery (streaming UploadNames) ------------------------------
async def iter_names(self) -> AsyncIterator[NameRecord]:
"""Stream every defined name from the panel.
v1 has no per-type name request a bare ``UploadNames`` triggers
the panel to dump *all* defined names of *all* types in a fixed
order (Zone, Unit, Button, Code, Area, Thermostat, Message, ),
each as a separate ``NameData`` reply that the client must
``Acknowledge`` to advance. This iterator handles the lock-step
protocol and yields each record as it arrives.
Reference: clsHAC.cs:4418 (sends bare UploadNames),
OL1ReadConfigHandleResponse (loops over NameData/EOD).
"""
async for reply in self._conn.iter_streaming(
OmniLinkMessageType.UploadNames
):
if reply.opcode != int(OmniLinkMessageType.NameData):
# Defensive — iter_streaming normally only yields
# non-EOD/NAK replies, so this is a wire-format fault.
raise OmniProtocolError(
f"unexpected opcode {reply.opcode} during UploadNames stream "
f"(expected {int(OmniLinkMessageType.NameData)})"
)
yield parse_v1_namedata(reply.payload)
async def list_all_names(self) -> dict[int, dict[int, str]]:
"""Bucket every defined name by ``NameType``.
Returns ``{name_type: {object_number: name}}``. Useful when HA
needs all four (zones+units+areas+thermostats) in one pass
cheaper than four separate streams since the panel only supports
one streaming session at a time anyway.
"""
out: dict[int, dict[int, str]] = {}
async for rec in self.iter_names():
out.setdefault(rec.name_type, {})[rec.number] = rec.name
return out
async def list_zone_names(self) -> dict[int, str]:
return (await self.list_all_names()).get(int(NameType.ZONE), {})
async def list_unit_names(self) -> dict[int, str]:
return (await self.list_all_names()).get(int(NameType.UNIT), {})
async def list_area_names(self) -> dict[int, str]:
return (await self.list_all_names()).get(int(NameType.AREA), {})
async def list_thermostat_names(self) -> dict[int, str]:
return (await self.list_all_names()).get(int(NameType.THERMOSTAT), {})
async def list_button_names(self) -> dict[int, str]:
return (await self.list_all_names()).get(int(NameType.BUTTON), {})
# ---- write methods (Command + ExecuteSecurityCommand) ----------------
#
# The Command and ExecuteSecurityCommand payloads are byte-identical
# between v1 and v2 — only the outer opcode differs (15 vs 20 for
# Command, 102 vs 74 for ExecuteSecurityCommand). So these methods are
# near-duplicates of OmniClient's, just routed through the v1 opcodes.
# Reference: clsOLMsgCommand.cs, clsOLMsgExecuteSecurityCommand.cs.
async def execute_command(
self,
command: Command,
parameter1: int = 0,
parameter2: int = 0,
) -> None:
"""Send a generic Command (v1 opcode 15).
Wire payload (4 bytes, identical to v2 form):
[0] command byte (enuUnitCommand value)
[1] parameter1 (single byte; brightness, mode, code index, ...)
[2] parameter2 high byte (BE u16)
[3] parameter2 low byte (object number for nearly every command)
Panel acks with v1 Ack (opcode 5) on success, Nak (6) on failure.
"""
if not 0 <= parameter1 <= 0xFF:
raise ValueError(f"parameter1 must fit in a byte: {parameter1}")
if not 0 <= parameter2 <= 0xFFFF:
raise ValueError(f"parameter2 must fit in u16: {parameter2}")
payload = struct.pack(
">BBH", int(command), parameter1 & 0xFF, parameter2 & 0xFFFF
)
reply = await self._conn.request(OmniLinkMessageType.Command, payload)
if reply.opcode == int(OmniLinkMessageType.Nak):
raise CommandFailedError(
f"panel NAK'd Command {command.name} "
f"(p1={parameter1}, p2={parameter2})"
)
if reply.opcode != int(OmniLinkMessageType.Ack):
raise CommandFailedError(
f"unexpected reply to Command {command.name}: opcode={reply.opcode}"
)
async def execute_security_command(
self,
area: int,
mode: SecurityMode,
code: int,
) -> None:
"""Arm or disarm a security area (v1 opcode 102).
Wire payload (6 bytes, identical to v2 form clsOLMsgExecuteSecurityCommand.cs):
[0] area number (1-based)
[1] security mode byte (raw enuSecurityMode 0..7)
[2] code digit 1 (thousands)
[3] code digit 2 (hundreds)
[4] code digit 3 (tens)
[5] code digit 4 (ones)
Panel responds with:
* ``ExecuteSecurityCommandResponse`` (103) carrying a status byte
(0 = success, see :class:`SecurityCommandResponse` for others), or
* ``Ack`` (5) on success without structured response, or
* ``Nak`` (6) on flat-out refusal.
Raises:
ValueError: ``area`` not 1..255 or ``code`` not 0..9999.
CommandFailedError: panel Nak'd OR response status was non-zero;
``failure_code`` carries the raw status byte when present.
"""
if not 1 <= area <= 0xFF:
raise ValueError(f"area out of range: {area}")
if not 0 <= code <= 9999:
raise ValueError(f"code out of range (0000-9999): {code}")
d1 = (code // 1000) % 10
d2 = (code // 100) % 10
d3 = (code // 10) % 10
d4 = code % 10
payload = bytes([area & 0xFF, int(mode) & 0xFF, d1, d2, d3, d4])
reply = await self._conn.request(
OmniLinkMessageType.ExecuteSecurityCommand, payload
)
if reply.opcode == int(OmniLinkMessageType.Nak):
raise CommandFailedError(
f"panel NAK'd ExecuteSecurityCommand "
f"(area={area}, mode={mode.name})"
)
if reply.opcode == int(OmniLinkMessageType.ExecuteSecurityCommandResponse):
if not reply.payload:
raise CommandFailedError(
"ExecuteSecurityCommandResponse with empty payload"
)
status = reply.payload[0]
if status != int(SecurityCommandResponse.SUCCESS):
try:
label = SecurityCommandResponse(status).name
except ValueError:
label = f"unknown({status})"
raise CommandFailedError(
f"ExecuteSecurityCommand failed: {label}",
failure_code=status,
)
return
if reply.opcode == int(OmniLinkMessageType.Ack):
return
raise CommandFailedError(
f"unexpected reply to ExecuteSecurityCommand: opcode={reply.opcode}"
)
async def acknowledge_alerts(self) -> None:
"""V1 has no AcknowledgeAlerts opcode — silently no-op.
v2 introduced :attr:`OmniLink2MessageType.AcknowledgeAlerts` (60)
as a dedicated panel-wide ack; v1 panels expect alerts to be
cleared by per-area arming or by user action at the keypad. To
keep the v1v2 method shape parallel, this method is a no-op so
HA service callers don't need a per-transport branch.
"""
return
# ---- thin command wrappers (one-liner conveniences) ------------------
async def turn_unit_on(self, index: int) -> None:
await self.execute_command(Command.UNIT_ON, parameter2=index)
async def turn_unit_off(self, index: int) -> None:
await self.execute_command(Command.UNIT_OFF, parameter2=index)
async def set_unit_level(self, index: int, percent: int) -> None:
if not 0 <= percent <= 100:
raise ValueError(f"percent must be 0..100: {percent}")
await self.execute_command(
Command.UNIT_LEVEL, parameter1=percent, parameter2=index
)
async def bypass_zone(self, index: int, code: int = 0) -> None:
await self.execute_command(
Command.BYPASS_ZONE, parameter1=code, parameter2=index
)
async def restore_zone(self, index: int, code: int = 0) -> None:
await self.execute_command(
Command.RESTORE_ZONE, parameter1=code, parameter2=index
)
async def execute_button(self, index: int) -> None:
await self.execute_command(Command.EXECUTE_BUTTON, parameter2=index)
async def set_thermostat_system_mode(self, index: int, mode_value: int) -> None:
if not 0 <= mode_value <= 0xFF:
raise ValueError(f"mode value must fit in a byte: {mode_value}")
await self.execute_command(
Command.SET_THERMOSTAT_SYSTEM_MODE,
parameter1=mode_value,
parameter2=index,
)
async def set_thermostat_fan_mode(self, index: int, mode_value: int) -> None:
await self.execute_command(
Command.SET_THERMOSTAT_FAN_MODE,
parameter1=mode_value,
parameter2=index,
)
async def set_thermostat_hold_mode(self, index: int, mode_value: int) -> None:
await self.execute_command(
Command.SET_THERMOSTAT_HOLD_MODE,
parameter1=mode_value,
parameter2=index,
)
async def set_thermostat_heat_setpoint_raw(
self, index: int, raw_temp: int
) -> None:
"""Set the heat setpoint by raw byte value (Omni temperature scale).
Use the same :func:`omni_temp_to_celsius` family of helpers from
:mod:`omni_pca.models` to convert from °C/°F if needed.
"""
if not 0 <= raw_temp <= 0xFF:
raise ValueError(f"raw_temp must fit in a byte: {raw_temp}")
await self.execute_command(
Command.SET_THERMOSTAT_HEAT_SETPOINT,
parameter1=raw_temp,
parameter2=index,
)
async def set_thermostat_cool_setpoint_raw(
self, index: int, raw_temp: int
) -> None:
if not 0 <= raw_temp <= 0xFF:
raise ValueError(f"raw_temp must fit in a byte: {raw_temp}")
await self.execute_command(
Command.SET_THERMOSTAT_COOL_SETPOINT,
parameter1=raw_temp,
parameter2=index,
)
# ---- helpers --------------------------------------------------------
async def _range_status[T](
self,
request_op: OmniLinkMessageType,
reply_op: OmniLinkMessageType,
start: int,
end: int,
parser: Callable[[bytes, int], list[T]],
) -> dict[int, T]:
if not 1 <= start <= end <= 0xFFFF:
raise ValueError(
f"invalid range: start={start}, end={end} "
f"(must be 1..65535 with start<=end)"
)
# v1 has two payload forms (clsOLMsgRequestUnitStatus.cs:18-31):
# short (3-byte msg with 1-byte start+end) when both ≤ 255, long
# (5-byte msg with BE u16 start+end) otherwise. The panel picks
# the right reply format based on what it received.
if start <= 0xFF and end <= 0xFF:
payload = bytes([start, end])
else:
payload = bytes(
[(start >> 8) & 0xFF, start & 0xFF,
(end >> 8) & 0xFF, end & 0xFF]
)
reply = await self._conn.request(request_op, payload)
self._expect(reply.opcode, reply_op)
records = parser(reply.payload, start)
return {r.index: r for r in records} # type: ignore[attr-defined]
@staticmethod
def _expect(actual: int, expected: OmniLinkMessageType) -> None:
if actual == int(OmniLinkMessageType.Nak):
raise OmniNakError(
f"panel NAK'd request expecting opcode {int(expected)} "
f"({expected.name})"
)
if actual != int(expected):
raise OmniProtocolError(
f"unexpected reply opcode {actual}, want {int(expected)} "
f"({expected.name})"
)
class OmniNakError(RuntimeError):
"""Panel returned the v1 Nak opcode (6) instead of the expected reply.
Thrown when a feature the panel doesn't support is requested — e.g.
``RequestZoneExtendedStatus`` on firmware 2.12 NAKs because only the
non-extended ``RequestZoneStatus`` is supported.
"""
class OmniProtocolError(RuntimeError):
"""Panel returned a reply opcode neither matching nor a NAK."""

View File

@ -0,0 +1,522 @@
"""Async UDP connection to an Omni-Link controller speaking the v1 wire protocol.
Differs from :class:`omni_pca.connection.OmniConnection` in three ways:
1. **Transport**: UDP only. Each datagram carries exactly one outer Packet.
2. **Outer packet type for messages**: ``OmniLinkMessage`` (0x10), not
``OmniLink2Message`` (0x20). The 4-step handshake packets are identical.
3. **Inner message format**: v1 ``Message`` with ``StartChar = 0x5A``
(NonAddressable) carrying a v1 opcode, not the v2 ``StartChar = 0x21``
carrying a v2 opcode.
The handshake itself (ClientRequestNewSession ControllerAckNewSession
ClientRequestSecureSession ControllerAckSecureSession) and the AES-128
session key derivation are protocol-agnostic and we reuse the same crypto
primitives.
Reference: clsOmniLinkConnection.cs (UDP path):
udpConnect lines 1239-1295 open + queue ClientRequestNewSession
udpListen lines 1298-1399 receive loop, dispatches replies
udpHandleRequestNewSession lines 1401-1459 step 2 step 3
udpHandleRequestSecureSession lines 1461-1487 step 4 OnlineSecure
udpSend lines 1514-1560 outer PacketType = OmniLinkMessage (16)
EncryptPacket lines 372-401 same crypto as TCP
"""
from __future__ import annotations
import asyncio
import contextlib
import logging
from collections.abc import AsyncIterator
from enum import IntEnum
from types import TracebackType
from ..crypto import (
BLOCK_SIZE,
decrypt_message_payload,
derive_session_key,
encrypt_message_payload,
)
from ..message import (
START_CHAR_V1_UNADDRESSED,
Message,
MessageCrcError,
)
from ..opcodes import OmniLinkMessageType, PacketType
from ..packet import Packet
_log = logging.getLogger(__name__)
_DEFAULT_PORT = 4369
_SESSION_ID_LEN = 5
_PROTO_VERSION = (0x00, 0x01)
_MAX_SEQ = 0xFFFF
class ConnectionState(IntEnum):
DISCONNECTED = 0
CONNECTING = 1
NEW_SESSION = 2
SECURE = 3
ONLINE = 4
class ConnectionError(OSError): # noqa: A001 - intentional shadow at module scope
pass
class HandshakeError(ConnectionError):
pass
class InvalidEncryptionKeyError(HandshakeError):
"""Controller answered ``ControllerSessionTerminated`` during handshake."""
class ProtocolError(ValueError):
pass
class RequestTimeoutError(TimeoutError):
pass
class OmniConnectionV1:
"""UDP + v1-wire-format connection to an Omni-Link controller."""
def __init__(
self,
host: str,
port: int = _DEFAULT_PORT,
controller_key: bytes = b"",
timeout: float = 5.0,
retry_count: int = 3,
) -> None:
if len(controller_key) != 16:
raise ValueError(
f"controller_key must be 16 bytes, got {len(controller_key)}"
)
self._host = host
self._port = port
self._controller_key = bytes(controller_key)
self._default_timeout = timeout
self._retry_count = max(0, retry_count)
self._udp_transport: asyncio.DatagramTransport | None = None
self._udp_protocol: _OmniDatagramProtocol | None = None
self._state = ConnectionState.DISCONNECTED
self._session_id: bytes | None = None
self._session_key: bytes | None = None
# First wire packet uses seq=1; wraparound skips 0 (reserved for
# unsolicited inbound). See clsOmniLinkConnection.cs:1251 (UDP
# init pktSequence=1, then udpSend pre-increments).
self._next_seq: int = 1
self._pending: dict[int, asyncio.Future[Packet]] = {}
self._unsolicited_queue: asyncio.Queue[Message] = asyncio.Queue()
self._handshake_event: asyncio.Event = asyncio.Event()
self._handshake_packet: Packet | None = None
self._handshake_error: Exception | None = None
self._closed = False
@property
def state(self) -> ConnectionState:
return self._state
@property
def session_key(self) -> bytes | None:
return self._session_key
async def __aenter__(self) -> OmniConnectionV1:
await self.connect()
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
await self.close()
async def connect(self) -> None:
if self._state is not ConnectionState.DISCONNECTED:
raise ConnectionError(
f"already connecting/connected (state={self._state})"
)
self._state = ConnectionState.CONNECTING
try:
loop = asyncio.get_running_loop()
self._udp_transport, self._udp_protocol = (
await loop.create_datagram_endpoint(
lambda: _OmniDatagramProtocol(self),
remote_addr=(self._host, self._port),
)
)
except (TimeoutError, OSError) as exc:
self._state = ConnectionState.DISCONNECTED
raise ConnectionError(f"failed to open UDP socket: {exc}") from exc
try:
await self._do_handshake()
except BaseException:
await self.close()
raise
async def close(self) -> None:
"""Tear down. Politely terminate the panel session first.
Without ClientSessionTerminated the panel keeps our slot allocated
until its idle timeout and rejects subsequent connect attempts
with ControllerCannotStartNewSession (0x07).
"""
if self._closed:
return
self._closed = True
previous_state = self._state
self._state = ConnectionState.DISCONNECTED
if previous_state in (
ConnectionState.NEW_SESSION,
ConnectionState.SECURE,
ConnectionState.ONLINE,
):
try:
term = Packet(
seq=self._claim_seq(),
type=PacketType.ClientSessionTerminated,
data=b"",
)
self._write_packet(term)
except Exception as exc: # noqa: BLE001 - close() must be idempotent
_log.debug("close: failed to send ClientSessionTerminated: %s", exc)
for fut in self._pending.values():
if not fut.done():
fut.set_exception(ConnectionError("connection closed"))
self._pending.clear()
if self._udp_transport is not None:
with contextlib.suppress(OSError):
self._udp_transport.close()
self._udp_transport = None
self._udp_protocol = None
# ---- public request API ---------------------------------------------
async def request(
self,
opcode: OmniLinkMessageType | int,
payload: bytes = b"",
timeout: float | None = None,
) -> Message:
"""Send a v1 request, await the matching reply, return the inner Message."""
if self._state is not ConnectionState.ONLINE:
raise ConnectionError(
f"cannot send request, connection state={self._state.name}"
)
message = Message(
start_char=START_CHAR_V1_UNADDRESSED,
data=bytes([int(opcode)]) + payload,
)
per_attempt_timeout = timeout if timeout is not None else self._default_timeout
max_attempts = 1 + self._retry_count
last_exc: Exception | None = None
for attempt in range(1, max_attempts + 1):
seq, fut = self._send_encrypted(message)
try:
reply_packet = await asyncio.wait_for(fut, per_attempt_timeout)
except TimeoutError as exc:
last_exc = exc
self._pending.pop(seq, None)
if attempt < max_attempts:
_log.debug(
"udp v1 retry %d/%d on opcode=%d seq=%d",
attempt, max_attempts, int(opcode), seq,
)
continue
raise RequestTimeoutError(
f"no v1 reply for opcode={int(opcode)} "
f"after {max_attempts} attempt(s)"
) from last_exc
return self._decode_inner(reply_packet)
raise RequestTimeoutError(
f"request loop exited without reply for opcode={int(opcode)}"
)
async def iter_streaming(
self,
initial_op: OmniLinkMessageType | int,
*,
ack_op: OmniLinkMessageType | int = OmniLinkMessageType.Ack,
end_op: OmniLinkMessageType | int = OmniLinkMessageType.EOD,
nak_op: OmniLinkMessageType | int = OmniLinkMessageType.Nak,
timeout: float | None = None,
) -> AsyncIterator[Message]:
"""Drive a v1 lock-step streaming download (UploadNames / UploadSetup / etc).
Sends ``initial_op`` (no payload), yields each ``ack_op``-elicited
reply, and stops when the panel sends ``end_op``. ``nak_op`` is
treated as an immediate end-of-stream no exception (some
firmwares use NAK to signal "no records to upload").
Unlike :meth:`request` we don't retry on timeout — losing a
reply mid-stream desynchronises the conversation, so the right
answer is to surface the timeout and let the caller restart.
"""
if self._state is not ConnectionState.ONLINE:
raise ConnectionError(
f"cannot stream, connection state={self._state.name}"
)
per_reply_timeout = timeout if timeout is not None else self._default_timeout
# Step 1: send the initial bare-opcode request, wait for first reply.
first_msg = Message(
start_char=START_CHAR_V1_UNADDRESSED,
data=bytes([int(initial_op)]),
)
seq, fut = self._send_encrypted(first_msg)
try:
reply_pkt = await asyncio.wait_for(fut, per_reply_timeout)
except TimeoutError as exc:
self._pending.pop(seq, None)
raise RequestTimeoutError(
f"no first reply to streaming opcode={int(initial_op)}"
) from exc
reply = self._decode_inner(reply_pkt)
# Step 2..N: ack-and-receive until end_op or nak_op.
while True:
if reply.opcode == int(end_op) or reply.opcode == int(nak_op):
return
yield reply
ack_msg = Message(
start_char=START_CHAR_V1_UNADDRESSED,
data=bytes([int(ack_op)]),
)
seq, fut = self._send_encrypted(ack_msg)
try:
reply_pkt = await asyncio.wait_for(fut, per_reply_timeout)
except TimeoutError as exc:
self._pending.pop(seq, None)
raise RequestTimeoutError(
f"no reply after streaming Ack (seq={seq})"
) from exc
reply = self._decode_inner(reply_pkt)
def unsolicited(self) -> AsyncIterator[Message]:
queue = self._unsolicited_queue
async def _gen() -> AsyncIterator[Message]:
while True:
yield await queue.get()
return _gen()
# ---- handshake -------------------------------------------------------
async def _do_handshake(self) -> None:
# Step 1: empty ClientRequestNewSession.
self._state = ConnectionState.NEW_SESSION
step1 = Packet(
seq=self._claim_seq(),
type=PacketType.ClientRequestNewSession,
data=b"",
)
self._write_packet(step1)
# Step 2: ControllerAckNewSession (carries protocol version + SessionID).
ack1 = await self._await_handshake_packet()
if ack1.type is PacketType.ControllerCannotStartNewSession:
raise HandshakeError("controller cannot start new session (busy?)")
if ack1.type is not PacketType.ControllerAckNewSession:
raise HandshakeError(f"unexpected step-2 packet type {ack1.type.name}")
if len(ack1.data) < 7:
raise HandshakeError(
f"ControllerAckNewSession payload too short: {len(ack1.data)} bytes"
)
if (ack1.data[0], ack1.data[1]) != _PROTO_VERSION:
raise HandshakeError(
f"unsupported protocol version {ack1.data[0]:#04x}{ack1.data[1]:02x}"
)
self._session_id = bytes(ack1.data[2 : 2 + _SESSION_ID_LEN])
self._session_key = derive_session_key(self._controller_key, self._session_id)
# Step 3: encrypted ClientRequestSecureSession echoing SessionID.
self._state = ConnectionState.SECURE
step3_seq = self._claim_seq()
step3_ct = encrypt_message_payload(
self._session_id, step3_seq, self._session_key
)
step3 = Packet(
seq=step3_seq,
type=PacketType.ClientRequestSecureSession,
data=step3_ct,
)
self._write_packet(step3)
# Step 4: ControllerAckSecureSession (or termination).
ack2 = await self._await_handshake_packet()
if ack2.type is PacketType.ControllerSessionTerminated:
raise InvalidEncryptionKeyError(
"controller terminated session during handshake (wrong ControllerKey?)"
)
if ack2.type is not PacketType.ControllerAckSecureSession:
raise HandshakeError(
f"unexpected step-4 packet type {ack2.type.name}"
)
self._state = ConnectionState.ONLINE
async def _await_handshake_packet(self) -> Packet:
try:
await asyncio.wait_for(
self._handshake_event.wait(), self._default_timeout
)
except TimeoutError as exc:
raise HandshakeError(
"timeout waiting for controller handshake reply"
) from exc
if self._handshake_error is not None:
err = self._handshake_error
self._handshake_error = None
raise err
pkt = self._handshake_packet
self._handshake_packet = None
self._handshake_event.clear()
if pkt is None:
raise HandshakeError("handshake event fired with no packet")
return pkt
# ---- send / receive helpers -----------------------------------------
def _claim_seq(self) -> int:
seq = self._next_seq
nxt = seq + 1
if nxt > _MAX_SEQ or nxt == 0:
nxt = 1
self._next_seq = nxt
return seq
def _send_encrypted(
self, inner: Message
) -> tuple[int, asyncio.Future[Packet]]:
if self._session_key is None:
raise ConnectionError("no session key (handshake not complete)")
seq = self._claim_seq()
plaintext = inner.encode()
ciphertext = encrypt_message_payload(plaintext, seq, self._session_key)
# KEY DIFFERENCE FROM V2: outer type is OmniLinkMessage (0x10),
# not OmniLink2Message (0x20). See clsOmniLinkConnection.cs:1536.
pkt = Packet(seq=seq, type=PacketType.OmniLinkMessage, data=ciphertext)
loop = asyncio.get_running_loop()
fut: asyncio.Future[Packet] = loop.create_future()
self._pending[seq] = fut
self._write_packet(pkt)
return seq, fut
def _write_packet(self, pkt: Packet) -> None:
if self._udp_transport is None:
raise ConnectionError("transport not open")
wire = pkt.encode()
_log.debug(
"TX seq=%d type=%s len=%d", pkt.seq, pkt.type.name, len(pkt.data)
)
self._udp_transport.sendto(wire)
def _decode_inner(self, pkt: Packet) -> Message:
if self._session_key is None:
raise ConnectionError("no session key")
if not pkt.data:
raise ProtocolError("empty packet data")
plaintext = decrypt_message_payload(pkt.data, pkt.seq, self._session_key)
try:
return Message.decode(plaintext)
except MessageCrcError as exc:
raise ProtocolError(f"inner v1 message CRC mismatch: {exc}") from exc
# ---- inbound dispatch (called from the datagram protocol) -----------
def _dispatch(self, pkt: Packet) -> None:
if pkt.data is None:
pkt = Packet(seq=pkt.seq, type=pkt.type, data=b"")
if self._state in (ConnectionState.NEW_SESSION, ConnectionState.SECURE):
handshake_types = {
PacketType.ControllerAckNewSession,
PacketType.ControllerAckSecureSession,
PacketType.ControllerSessionTerminated,
PacketType.ControllerCannotStartNewSession,
}
if pkt.type in handshake_types:
self._handshake_packet = pkt
self._handshake_event.set()
return
if pkt.seq == 0:
if pkt.type is PacketType.OmniLinkMessage:
try:
msg = self._decode_inner(pkt)
except (ProtocolError, ConnectionError) as exc:
_log.warning(
"dropping malformed unsolicited v1 packet: %s", exc
)
return
try:
self._unsolicited_queue.put_nowait(msg)
except asyncio.QueueFull: # pragma: no cover - unbounded queue
_log.warning("unsolicited queue full; dropping message")
return
fut = self._pending.pop(pkt.seq, None)
if fut is None:
_log.debug(
"no waiter for seq=%d type=%s; dropping",
pkt.seq, pkt.type.name,
)
return
if pkt.type is PacketType.ControllerSessionTerminated:
fut.set_exception(ConnectionError("controller terminated session"))
return
if not fut.done():
fut.set_result(pkt)
class _OmniDatagramProtocol(asyncio.DatagramProtocol):
"""asyncio.DatagramProtocol bound to a single OmniConnectionV1.
Each datagram is one complete Packet. We decode it and hand it to the
connection's dispatcher; the dispatcher already knows how to sort
handshake / solicited / unsolicited paths.
"""
def __init__(self, conn: OmniConnectionV1) -> None:
self._conn = conn
def connection_made(self, transport: asyncio.BaseTransport) -> None:
pass
def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
try:
pkt = Packet.decode(data)
except Exception as exc:
_log.warning("dropping malformed UDP datagram: %s", exc)
return
try:
self._conn._dispatch(pkt)
except Exception:
_log.exception("UDP v1 dispatch crashed for seq=%d", pkt.seq)
def error_received(self, exc: Exception) -> None:
_log.warning("UDP v1 socket error: %s", exc)
def connection_lost(self, exc: Exception | None) -> None:
if exc is not None:
_log.warning("UDP v1 transport lost: %s", exc)

310
src/omni_pca/v1/messages.py Normal file
View File

@ -0,0 +1,310 @@
"""V1 status-reply and name parsers.
The v1 wire protocol's typed status messages (ZoneStatus, UnitStatus,
ThermostatStatus, AuxiliaryStatus) carry one record per object in the
range the client requested but, unlike v2's ExtendedStatus, the records
do **not** include the object number. The starting index is implicit
from the request payload, and each record is at a fixed offset.
This module supplies "block" parsers that take both the reply payload
and the starting index, and produce a list of the existing top-level
dataclasses (:class:`omni_pca.models.ZoneStatus` etc) so HA entity code
doesn't need a v1-specific schema. The :func:`parse_v1_namedata` helper
decodes the bulk-name-download replies streamed by ``UploadNames``.
Per-record byte counts (verified against firmware 2.12 over UDP):
ZoneStatus 2 bytes per zone (status, analog_loop)
UnitStatus 3 bytes per unit (status, time_hi, time_lo)
ThermostatStatus 7 bytes per tstat (status, current_t, heat_sp,
cool_sp, sys_mode, fan_mode,
hold_mode)
AuxiliaryStatus 4 bytes per aux (relay, current, low_sp,
high_sp)
References:
clsOLMsgZoneStatus.cs / clsOLMsgRequestZoneStatus.cs
clsOLMsgUnitStatus.cs / clsOLMsgRequestUnitStatus.cs
clsOLMsgThermostatStatus.cs / clsOLMsgRequestThermostatStatus.cs
clsOLMsgAuxiliaryStatus.cs / clsOLMsgRequestAuxiliaryStatus.cs
clsOLMsgSystemStatus.cs v1 byte 14 = battery, then per-area Mode
clsOLMsgNameData.cs bulk name download record format
enuNameType.cs Zone=1 Unit=2 Button=3 Code=4 Area=5
Tstat=6 Message=7 UserSetting=8
AccessControlReader=9
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from enum import IntEnum
from ..models import (
AuxSensorStatus,
SystemStatus,
ThermostatStatus,
UnitStatus,
ZoneStatus,
)
_ZONE_RECORD_BYTES = 2
_UNIT_RECORD_BYTES = 3
_THERMOSTAT_RECORD_BYTES = 7
_AUX_RECORD_BYTES = 4
def parse_v1_zone_status(payload: bytes, start_index: int) -> list[ZoneStatus]:
"""Parse a v1 ZoneStatus reply payload into per-zone dataclasses.
``payload`` is the inner Message ``payload`` (data minus opcode byte);
its length must be a multiple of ``_ZONE_RECORD_BYTES``.
"""
if len(payload) % _ZONE_RECORD_BYTES != 0:
raise ValueError(
f"v1 ZoneStatus payload length {len(payload)} not a multiple of "
f"{_ZONE_RECORD_BYTES}"
)
out: list[ZoneStatus] = []
for i, off in enumerate(range(0, len(payload), _ZONE_RECORD_BYTES)):
out.append(
ZoneStatus(
index=start_index + i,
raw_status=payload[off],
loop=payload[off + 1],
)
)
return out
def parse_v1_unit_status(payload: bytes, start_index: int) -> list[UnitStatus]:
"""Parse a v1 UnitStatus reply payload into per-unit dataclasses."""
if len(payload) % _UNIT_RECORD_BYTES != 0:
raise ValueError(
f"v1 UnitStatus payload length {len(payload)} not a multiple of "
f"{_UNIT_RECORD_BYTES}"
)
out: list[UnitStatus] = []
for i, off in enumerate(range(0, len(payload), _UNIT_RECORD_BYTES)):
out.append(
UnitStatus(
index=start_index + i,
state=payload[off],
time_remaining_secs=(payload[off + 1] << 8) | payload[off + 2],
)
)
return out
def parse_v1_thermostat_status(
payload: bytes, start_index: int
) -> list[ThermostatStatus]:
"""Parse a v1 ThermostatStatus reply payload into per-tstat dataclasses.
The v1 record only carries 7 fields; the v2 dataclass has 4 more
(humidity, humidify_setpoint, dehumidify_setpoint, outdoor_temp,
horc_status). We zero-fill those HA's climate platform doesn't
require them and an explicit 0 is more honest than a fake value.
"""
if len(payload) % _THERMOSTAT_RECORD_BYTES != 0:
raise ValueError(
f"v1 ThermostatStatus payload length {len(payload)} not a multiple "
f"of {_THERMOSTAT_RECORD_BYTES}"
)
out: list[ThermostatStatus] = []
for i, off in enumerate(range(0, len(payload), _THERMOSTAT_RECORD_BYTES)):
out.append(
ThermostatStatus(
index=start_index + i,
status=payload[off],
temperature_raw=payload[off + 1],
heat_setpoint_raw=payload[off + 2],
cool_setpoint_raw=payload[off + 3],
system_mode=payload[off + 4],
fan_mode=payload[off + 5],
hold_mode=payload[off + 6],
humidity_raw=0,
humidify_setpoint_raw=0,
dehumidify_setpoint_raw=0,
outdoor_temperature_raw=0,
horc_status=0,
)
)
return out
def parse_v1_aux_status(payload: bytes, start_index: int) -> list[AuxSensorStatus]:
"""Parse a v1 AuxiliaryStatus reply payload into per-aux dataclasses."""
if len(payload) % _AUX_RECORD_BYTES != 0:
raise ValueError(
f"v1 AuxiliaryStatus payload length {len(payload)} not a multiple "
f"of {_AUX_RECORD_BYTES}"
)
out: list[AuxSensorStatus] = []
for i, off in enumerate(range(0, len(payload), _AUX_RECORD_BYTES)):
out.append(
AuxSensorStatus(
index=start_index + i,
output=payload[off],
value_raw=payload[off + 1],
low_raw=payload[off + 2],
high_raw=payload[off + 3],
)
)
return out
def parse_v1_system_status(payload: bytes) -> SystemStatus:
"""Parse a v1 SystemStatus reply.
Bytes 0..13 are byte-identical to v2 (time/date + sunrise/sunset +
battery). After byte 13 v1 carries per-area Mode bytes (1 byte each)
while v2 carries 2-byte alarm-flag pairs. We translate to the v2
dataclass's ``area_alarms`` shape by promoting each v1 mode byte to
a ``(mode, 0)`` tuple that way HA code that already consumes
:class:`SystemStatus` keeps working without a v1-specific branch.
"""
if len(payload) < 14:
raise ValueError(
f"v1 SystemStatus payload too short: {len(payload)} bytes"
)
time_valid = payload[0] != 0
year = payload[1]
month = payload[2]
day = payload[3]
# day_of_week = payload[4]
hour = payload[5]
minute = payload[6]
second = payload[7]
# daylight = payload[8]
sunrise_h = payload[9]
sunrise_m = payload[10]
sunset_h = payload[11]
sunset_m = payload[12]
battery = payload[13]
panel_time: datetime | None = None
if time_valid:
try:
panel_time = datetime(
year=2000 + year,
month=month,
day=day,
hour=hour,
minute=minute,
second=second,
)
except ValueError:
panel_time = None
# Promote each v1 per-area mode byte to a (mode, 0) pair so the v2
# area_alarms tuple shape carries the same information without a
# second dataclass.
mode_bytes = payload[14:]
area_alarms = tuple((b, 0) for b in mode_bytes)
return SystemStatus(
time_valid=time_valid,
panel_time=panel_time,
sunrise_hour=sunrise_h,
sunrise_minute=sunrise_m,
sunset_hour=sunset_h,
sunset_minute=sunset_m,
battery_reading=battery,
area_alarms=area_alarms,
)
# ---- NameData --------------------------------------------------------------
class NameType(IntEnum):
"""Categories of named objects panels can stream over UploadNames.
Reference: enuNameType.cs.
"""
ZONE = 1
UNIT = 2
BUTTON = 3
CODE = 4
AREA = 5
THERMOSTAT = 6
MESSAGE = 7
USER_SETTING = 8
ACCESS_CONTROL_READER = 9
# Per-type max name length (clsCapOMNI_PRO_II.cs lines 55-71).
# Other Omni models share these numbers — the few exceptions are
# documented but not relevant for the panels we know speak v1+UDP.
_NAME_TYPE_LENGTH: dict[int, int] = {
NameType.ZONE: 15,
NameType.UNIT: 12,
NameType.BUTTON: 12,
NameType.CODE: 12,
NameType.AREA: 12,
NameType.THERMOSTAT: 12,
NameType.MESSAGE: 15,
NameType.USER_SETTING: 15,
NameType.ACCESS_CONTROL_READER: 15,
}
@dataclass(frozen=True, slots=True)
class NameRecord:
"""One name record from a v1 ``NameData`` reply (opcode 11)."""
name_type: int
number: int
name: str
@property
def name_type_label(self) -> str:
try:
return NameType(self.name_type).name
except ValueError:
return f"Unknown({self.name_type})"
def parse_v1_namedata(payload: bytes) -> NameRecord:
"""Decode a v1 ``NameData`` payload (opcode 11) into a :class:`NameRecord`.
Wire layout (per clsOLMsgNameData.cs, MessageLength is the
full Data byte count including the opcode):
* One-byte form (NameNumber 255), MessageLength = 4 + NameTypeLen:
``[opcode][type][num][name×L][\\0]`` one trailing reserved byte.
* Two-byte form (NameNumber > 255), MessageLength = 5 + NameTypeLen:
``[opcode][type][num_hi][num_lo][name×L][\\0]``.
``payload`` here is the *inner* :attr:`Message.payload` (data minus
the leading opcode), so the lengths to compare against are L+3 and
L+4 respectively.
"""
if len(payload) < 3:
raise ValueError(f"NameData payload too short: {len(payload)} bytes")
name_type = payload[0]
name_len = _NAME_TYPE_LENGTH.get(name_type)
if name_len is not None:
# Disambiguate by payload length against the expected forms.
one_byte_len = name_len + 3 # type + num + name + 1 trailing
two_byte_len = name_len + 4 # type + num_hi + num_lo + name + 1 trailing
if len(payload) >= two_byte_len:
number = (payload[1] << 8) | payload[2]
name_bytes = payload[3 : 3 + name_len]
elif len(payload) >= one_byte_len:
number = payload[1]
name_bytes = payload[2 : 2 + name_len]
else:
# Short payload — best-effort one-byte decode of whatever is left.
number = payload[1]
name_bytes = payload[2:]
else:
# Unknown type — can't tell the form. Assume one-byte and consume
# the rest; HA filters by known type anyway.
number = payload[1]
name_bytes = payload[2:]
name = name_bytes.split(b"\x00", 1)[0].decode("utf-8", errors="replace")
return NameRecord(name_type=name_type, number=number, name=name)

View File

@ -0,0 +1,290 @@
"""Unit tests for the OmniClientV1 write methods.
These exercise wire-payload construction by monkey-patching the
connection's ``request`` method so we never have to open a UDP socket.
The contract under test:
* :meth:`OmniClientV1.execute_command` packs ``[cmd][p1][p2_hi][p2_lo]``.
* :meth:`OmniClientV1.execute_security_command` packs
``[area][mode][d1][d2][d3][d4]`` with the C# digit-by-digit form.
* Convenience wrappers (``turn_unit_on`` etc) route through
:meth:`execute_command` with the right Command enum values.
* Replies are interpreted: Ack return, Nak CommandFailedError,
non-zero SecurityCommandResponse CommandFailedError with code.
"""
from __future__ import annotations
import struct
import pytest
from omni_pca.commands import (
Command,
CommandFailedError,
SecurityCommandResponse,
)
from omni_pca.message import START_CHAR_V1_UNADDRESSED, Message
from omni_pca.models import SecurityMode
from omni_pca.opcodes import OmniLinkMessageType
from omni_pca.v1.client import OmniClientV1
class _FakeConn:
"""Records each request, returns a canned reply.
Tests construct one with a list of (opcode, payload_bytes) replies in
order; each call to :meth:`request` consumes one.
"""
def __init__(
self,
replies: list[tuple[int, bytes]] | None = None,
) -> None:
self.replies = replies or []
self.calls: list[tuple[int, bytes]] = []
async def request(
self,
opcode: int,
payload: bytes = b"",
timeout: float | None = None,
) -> Message:
self.calls.append((int(opcode), bytes(payload)))
if not self.replies:
# Default: panel ack — works for the boring success path.
return Message(
start_char=START_CHAR_V1_UNADDRESSED,
data=bytes([int(OmniLinkMessageType.Ack)]),
)
reply_op, reply_payload = self.replies.pop(0)
return Message(
start_char=START_CHAR_V1_UNADDRESSED,
data=bytes([reply_op]) + reply_payload,
)
def _make_client(replies: list[tuple[int, bytes]] | None = None) -> tuple[OmniClientV1, _FakeConn]:
client = OmniClientV1(
host="127.0.0.1",
controller_key=b"\x00" * 16,
)
fake = _FakeConn(replies)
# Swap out the real connection with our recorder.
client._conn = fake # type: ignore[assignment]
return client, fake
# ---- execute_command ---------------------------------------------------
@pytest.mark.asyncio
async def test_execute_command_packs_payload_be() -> None:
client, fake = _make_client()
await client.execute_command(Command.UNIT_LEVEL, parameter1=42, parameter2=0x1234)
assert len(fake.calls) == 1
opcode, payload = fake.calls[0]
assert opcode == int(OmniLinkMessageType.Command)
# [cmd][p1][p2_hi][p2_lo]
assert payload == struct.pack(">BBH", int(Command.UNIT_LEVEL), 42, 0x1234)
@pytest.mark.asyncio
async def test_execute_command_rejects_oversized_parameters() -> None:
client, _ = _make_client()
with pytest.raises(ValueError, match="parameter1"):
await client.execute_command(Command.UNIT_LEVEL, parameter1=256, parameter2=1)
with pytest.raises(ValueError, match="parameter2"):
await client.execute_command(Command.UNIT_LEVEL, parameter1=0, parameter2=0x10000)
@pytest.mark.asyncio
async def test_execute_command_nak_raises_command_failed() -> None:
client, _ = _make_client([(int(OmniLinkMessageType.Nak), b"")])
with pytest.raises(CommandFailedError, match="NAK"):
await client.execute_command(Command.UNIT_ON, parameter2=5)
@pytest.mark.asyncio
async def test_execute_command_unexpected_reply_raises() -> None:
# Panel returns SystemInformation reply to a Command request — that's bogus.
client, _ = _make_client(
[(int(OmniLinkMessageType.SystemInformation), b"\x00")]
)
with pytest.raises(CommandFailedError, match="unexpected reply"):
await client.execute_command(Command.UNIT_ON, parameter2=5)
# ---- thin wrappers -----------------------------------------------------
@pytest.mark.asyncio
async def test_turn_unit_on_sends_unit_on_command() -> None:
client, fake = _make_client()
await client.turn_unit_on(7)
opcode, payload = fake.calls[0]
assert opcode == int(OmniLinkMessageType.Command)
assert payload[0] == int(Command.UNIT_ON)
assert (payload[2] << 8) | payload[3] == 7
@pytest.mark.asyncio
async def test_turn_unit_off_sends_unit_off_command() -> None:
client, fake = _make_client()
await client.turn_unit_off(255)
payload = fake.calls[0][1]
assert payload[0] == int(Command.UNIT_OFF)
assert (payload[2] << 8) | payload[3] == 255
@pytest.mark.asyncio
async def test_set_unit_level_packs_percent_as_p1() -> None:
client, fake = _make_client()
await client.set_unit_level(3, 75)
payload = fake.calls[0][1]
assert payload[0] == int(Command.UNIT_LEVEL)
assert payload[1] == 75
assert (payload[2] << 8) | payload[3] == 3
@pytest.mark.asyncio
async def test_set_unit_level_rejects_out_of_range_percent() -> None:
client, _ = _make_client()
with pytest.raises(ValueError, match="0..100"):
await client.set_unit_level(1, 101)
with pytest.raises(ValueError, match="0..100"):
await client.set_unit_level(1, -1)
@pytest.mark.asyncio
async def test_bypass_zone_packs_code_as_p1_and_zone_as_p2() -> None:
client, fake = _make_client()
await client.bypass_zone(12, code=5)
payload = fake.calls[0][1]
assert payload[0] == int(Command.BYPASS_ZONE)
assert payload[1] == 5
assert (payload[2] << 8) | payload[3] == 12
@pytest.mark.asyncio
async def test_restore_zone_packs_code_and_zone() -> None:
client, fake = _make_client()
await client.restore_zone(99, code=3)
payload = fake.calls[0][1]
assert payload[0] == int(Command.RESTORE_ZONE)
assert payload[1] == 3
assert (payload[2] << 8) | payload[3] == 99
@pytest.mark.asyncio
async def test_execute_button() -> None:
client, fake = _make_client()
await client.execute_button(15)
payload = fake.calls[0][1]
assert payload[0] == int(Command.EXECUTE_BUTTON)
assert (payload[2] << 8) | payload[3] == 15
@pytest.mark.asyncio
async def test_set_thermostat_modes_route_through_command() -> None:
client, fake = _make_client()
await client.set_thermostat_system_mode(2, 1) # 1 = Heat
await client.set_thermostat_fan_mode(2, 2) # 2 = On
await client.set_thermostat_hold_mode(2, 1) # 1 = Hold
cmds = [p[1][0] for p in fake.calls]
assert cmds == [
int(Command.SET_THERMOSTAT_SYSTEM_MODE),
int(Command.SET_THERMOSTAT_FAN_MODE),
int(Command.SET_THERMOSTAT_HOLD_MODE),
]
@pytest.mark.asyncio
async def test_set_thermostat_setpoint_raw_validates_byte_range() -> None:
client, _ = _make_client()
with pytest.raises(ValueError, match="raw_temp"):
await client.set_thermostat_heat_setpoint_raw(1, 256)
with pytest.raises(ValueError, match="raw_temp"):
await client.set_thermostat_cool_setpoint_raw(1, -1)
# ---- execute_security_command ------------------------------------------
@pytest.mark.asyncio
async def test_execute_security_command_digit_packing() -> None:
# Code 1234 → digits 1, 2, 3, 4.
client, fake = _make_client([(int(OmniLinkMessageType.Ack), b"")])
await client.execute_security_command(area=1, mode=SecurityMode.OFF, code=1234)
opcode, payload = fake.calls[0]
assert opcode == int(OmniLinkMessageType.ExecuteSecurityCommand)
assert payload == bytes([1, int(SecurityMode.OFF), 1, 2, 3, 4])
@pytest.mark.asyncio
async def test_execute_security_command_pads_short_codes() -> None:
# Code 7 → digits 0, 0, 0, 7.
client, fake = _make_client([(int(OmniLinkMessageType.Ack), b"")])
await client.execute_security_command(area=8, mode=SecurityMode.AWAY, code=7)
payload = fake.calls[0][1]
assert payload == bytes([8, int(SecurityMode.AWAY), 0, 0, 0, 7])
@pytest.mark.asyncio
async def test_execute_security_command_response_success_returns() -> None:
# Panel returns ExecuteSecurityCommandResponse with status=0 (success).
client, _ = _make_client(
[(
int(OmniLinkMessageType.ExecuteSecurityCommandResponse),
bytes([int(SecurityCommandResponse.SUCCESS)]),
)]
)
await client.execute_security_command(area=1, mode=SecurityMode.OFF, code=0)
@pytest.mark.asyncio
async def test_execute_security_command_response_failure_raises() -> None:
# Panel returns ExecuteSecurityCommandResponse with status=
# SecureSystem (1) — wrong code or area not enabled for this code.
client, _ = _make_client(
[(
int(OmniLinkMessageType.ExecuteSecurityCommandResponse),
bytes([int(SecurityCommandResponse.INVALID_CODE)]),
)]
)
with pytest.raises(CommandFailedError) as ei:
await client.execute_security_command(
area=1, mode=SecurityMode.AWAY, code=9999
)
assert ei.value.failure_code == int(SecurityCommandResponse.INVALID_CODE)
@pytest.mark.asyncio
async def test_execute_security_command_nak_raises() -> None:
client, _ = _make_client([(int(OmniLinkMessageType.Nak), b"")])
with pytest.raises(CommandFailedError, match="NAK"):
await client.execute_security_command(
area=1, mode=SecurityMode.OFF, code=0
)
@pytest.mark.asyncio
async def test_execute_security_command_rejects_bad_inputs() -> None:
client, _ = _make_client()
with pytest.raises(ValueError, match="area"):
await client.execute_security_command(area=0, mode=SecurityMode.OFF, code=0)
with pytest.raises(ValueError, match="code"):
await client.execute_security_command(
area=1, mode=SecurityMode.OFF, code=10000
)
# ---- acknowledge_alerts -------------------------------------------------
@pytest.mark.asyncio
async def test_acknowledge_alerts_is_noop_on_v1() -> None:
"""v1 has no AcknowledgeAlerts opcode — method should not call request."""
client, fake = _make_client()
await client.acknowledge_alerts()
assert fake.calls == []

276
tests/test_v1_messages.py Normal file
View File

@ -0,0 +1,276 @@
"""Unit tests for omni_pca.v1.messages parsers.
Test vectors are real wire payloads captured from a firmware-2.12 Omni
Pro II panel via dev/probe_v1_recon.py see the comment above each
test for the inputs that produced it.
"""
from __future__ import annotations
import pytest
from omni_pca.v1.messages import (
parse_v1_aux_status,
parse_v1_system_status,
parse_v1_thermostat_status,
parse_v1_unit_status,
parse_v1_zone_status,
)
# ---- ZoneStatus ---------------------------------------------------------
def test_v1_zone_status_secure_and_open() -> None:
# Captured: RequestZoneStatus(1, 8) → 16-byte payload, 8 zones × 2 bytes.
# zone 6 raw_status=0x01 (open), all others 0x00.
payload = bytes.fromhex("0080007f007f0080008001fd00810080")
zones = parse_v1_zone_status(payload, start_index=1)
assert len(zones) == 8
assert {z.index for z in zones} == set(range(1, 9))
assert zones[0].is_secure # zone 1
assert zones[5].is_open # zone 6
assert zones[5].raw_status == 0x01
assert zones[5].loop == 0xFD
def test_v1_zone_status_indexes_offset_by_start() -> None:
# If we requested zones 17..24, the same 16-byte payload should
# produce indexes 17..24.
payload = bytes.fromhex("0080007f007f0080008001fd00810080")
zones = parse_v1_zone_status(payload, start_index=17)
assert {z.index for z in zones} == set(range(17, 25))
def test_v1_zone_status_invalid_length() -> None:
with pytest.raises(ValueError, match="multiple of 2"):
parse_v1_zone_status(b"\x00\x00\x00", start_index=1)
# ---- UnitStatus ---------------------------------------------------------
def test_v1_unit_status_dimmer_levels() -> None:
# Captured: RequestUnitStatus(1, 8) → 24-byte payload, 8 units × 3 bytes.
# state bytes: 01, 01, 69, 96, 69, 00, 73, 00 → 100%, 100%, 5%, 50%, 5%, 0%, 15%, 0%
payload = bytes.fromhex("010000010000690000960000690000000000730000000000")
units = parse_v1_unit_status(payload, start_index=1)
assert len(units) == 8
assert units[0].is_on and units[0].brightness == 100 # state=0x01
assert units[2].brightness == 5 # state=0x69 = 105 → -100 = 5%
assert units[3].brightness == 50 # state=0x96 = 150 → -100 = 50%
assert not units[5].is_on # state=0x00
assert units[6].brightness == 15 # state=0x73 = 115 → -100 = 15%
def test_v1_unit_status_time_remaining_be_u16() -> None:
# Single record with remaining=0x1234.
payload = bytes([0x01, 0x12, 0x34])
units = parse_v1_unit_status(payload, start_index=42)
assert len(units) == 1
assert units[0].index == 42
assert units[0].time_remaining_secs == 0x1234
def test_v1_unit_status_invalid_length() -> None:
with pytest.raises(ValueError, match="multiple of 3"):
parse_v1_unit_status(b"\x00\x00", start_index=1)
# ---- ThermostatStatus ---------------------------------------------------
def test_v1_thermostat_status_unconfigured() -> None:
# Captured: RequestThermostatStatus(1, 4) → 28 B, all values 0/0/0/0/0/0/0
# except byte 0 of records 0-1 which is 0x01 (status). The "raw=0" temps
# decode to -40°C / -40°F per omni_temp_to_fahrenheit.
payload = bytes.fromhex(
"01000000000000010000000000000000000000000000000000000000"
)
tstats = parse_v1_thermostat_status(payload, start_index=1)
assert len(tstats) == 4
assert tstats[0].status == 0x01
assert tstats[2].status == 0x00
assert tstats[0].humidity_raw == 0 # zero-filled (v1 doesn't carry it)
assert tstats[0].outdoor_temperature_raw == 0
assert tstats[0].horc_status == 0
def test_v1_thermostat_full_record() -> None:
# Hand-constructed: status=0x01, temp=170 (=45°F), heat=140 (30°F),
# cool=200 (60°F), mode=1, fan=2, hold=3.
payload = bytes([0x01, 170, 140, 200, 1, 2, 3])
tstats = parse_v1_thermostat_status(payload, start_index=5)
assert len(tstats) == 1
t = tstats[0]
assert t.index == 5
assert t.status == 0x01
assert t.temperature_raw == 170
assert t.heat_setpoint_raw == 140
assert t.cool_setpoint_raw == 200
assert t.system_mode == 1
assert t.fan_mode == 2
assert t.hold_mode == 3
def test_v1_thermostat_invalid_length() -> None:
with pytest.raises(ValueError, match="multiple of 7"):
parse_v1_thermostat_status(b"\x00" * 6, start_index=1)
# ---- AuxiliaryStatus ----------------------------------------------------
def test_v1_aux_status_all_zero() -> None:
# Captured: RequestAuxiliaryStatus(1, 8) → 32 B all zeros.
payload = bytes(32)
auxes = parse_v1_aux_status(payload, start_index=1)
assert len(auxes) == 8
assert all(a.output == 0 and a.value_raw == 0 for a in auxes)
def test_v1_aux_status_record_field_order() -> None:
# Single record: output=11, value=22, low=33, high=44
payload = bytes([11, 22, 33, 44])
auxes = parse_v1_aux_status(payload, start_index=99)
assert len(auxes) == 1
a = auxes[0]
assert a.index == 99
assert a.output == 11
assert a.value_raw == 22
assert a.low_raw == 33
assert a.high_raw == 44
def test_v1_aux_invalid_length() -> None:
with pytest.raises(ValueError, match="multiple of 4"):
parse_v1_aux_status(b"\x00\x00\x00", start_index=1)
# ---- SystemStatus -------------------------------------------------------
def test_v1_system_status_full_payload() -> None:
# Captured: RequestSystemStatus → 38 B payload from firmware 2.12.
# Bytes: 011a050a07163b1c01061c150003 + 24 area-mode bytes
# decode: time_valid=1, year=26 (=2026), month=05, day=10,
# dow=07, hour=22, min=59, sec=28, dst=01, sun_h=06, sun_m=28,
# sun_h2=21, sun_m2=21, battery=0x00, then area modes.
# Note: the 14th byte (0x03) is the BATTERY reading = 3, not 0.
payload = bytes.fromhex(
"011a050a07163b1c01061c150003000000000000000002090000000000000000000000000000"
)
s = parse_v1_system_status(payload)
assert s.time_valid is True
assert s.panel_time is not None
assert s.panel_time.year == 2000 + 0x1A # 2026
assert s.panel_time.month == 0x05
assert s.panel_time.day == 0x0A
assert s.sunrise_hour == 0x06
assert s.sunrise_minute == 0x1C # 28
assert s.sunset_hour == 0x15 # 21
assert s.sunset_minute == 0x00
assert s.battery_reading == 0x03
# 24 trailing bytes promoted to area_alarms tuples (mode_byte, 0).
assert len(s.area_alarms) == 24
assert s.area_alarms[0] == (0, 0)
# Area 9 in this capture had mode=2.
assert s.area_alarms[8] == (2, 0)
def test_v1_system_status_minimum_payload() -> None:
# Just the 14 header bytes, no area modes.
payload = bytes(14)
s = parse_v1_system_status(payload)
assert s.time_valid is False
assert s.panel_time is None
assert s.battery_reading == 0
assert s.area_alarms == ()
def test_v1_system_status_too_short_raises() -> None:
with pytest.raises(ValueError, match="too short"):
parse_v1_system_status(b"\x00" * 13)
# ---- NameData -----------------------------------------------------------
from omni_pca.v1.messages import NameType, parse_v1_namedata # noqa: E402
def test_v1_namedata_zone_one_byte_form() -> None:
# Captured: UploadNames stream → first reply = Zone #1 'GARAGE ENTRY'.
# Payload 18 B = type(1) + num(1) + name(15) + reserved(1).
payload = bytes.fromhex("010147415241474520454e54525900000000")
rec = parse_v1_namedata(payload)
assert rec.name_type == int(NameType.ZONE)
assert rec.name_type_label == "ZONE"
assert rec.number == 1
assert rec.name == "GARAGE ENTRY"
def test_v1_namedata_unit_one_byte_form() -> None:
# Hand-crafted: Unit #5 = "GARAGE ENTRY" (12-char name slot, no padding need).
name = "GARAGE ENTRY"
payload = (
bytes([int(NameType.UNIT), 5])
+ name.encode("ascii").ljust(12, b"\x00")
+ b"\x00" # reserved trailing byte
)
rec = parse_v1_namedata(payload)
assert rec.name_type == int(NameType.UNIT)
assert rec.number == 5
assert rec.name == name
def test_v1_namedata_unit_two_byte_form() -> None:
# Unit #257 = 'Z1-LANDSCAPE' — captured from the real panel after the
# numbered units rolled over 256.
payload = (
bytes([int(NameType.UNIT), 0x01, 0x01]) # type, num_hi=1, num_lo=1
+ b"Z1-LANDSCAPE".ljust(12, b"\x00") # 12-char name
+ b"\x00"
)
rec = parse_v1_namedata(payload)
assert rec.name_type == int(NameType.UNIT)
assert rec.number == 257
assert rec.name == "Z1-LANDSCAPE"
def test_v1_namedata_thermostat() -> None:
payload = (
bytes([int(NameType.THERMOSTAT), 1])
+ b"DOWNSTAIRS".ljust(12, b"\x00")
+ b"\x00"
)
rec = parse_v1_namedata(payload)
assert rec.name_type == int(NameType.THERMOSTAT)
assert rec.number == 1
assert rec.name == "DOWNSTAIRS"
def test_v1_namedata_strips_trailing_nulls() -> None:
payload = (
bytes([int(NameType.ZONE), 9])
+ b"HALL MOTION".ljust(15, b"\x00")
+ b"\x00"
)
rec = parse_v1_namedata(payload)
assert rec.name == "HALL MOTION" # no embedded nulls in result
def test_v1_namedata_unknown_type_falls_through() -> None:
# Unknown name type — parser should still return SOMETHING by
# consuming the rest as the name. HA filters by NameType anyway.
payload = bytes([99, 7]) + b"WHATEVER\x00\x00"
rec = parse_v1_namedata(payload)
assert rec.name_type == 99
assert rec.name_type_label == "Unknown(99)"
assert rec.number == 7
assert rec.name == "WHATEVER"
def test_v1_namedata_short_payload_raises() -> None:
with pytest.raises(ValueError, match="too short"):
parse_v1_namedata(b"\x01\x00")