HA integration: wire v1+UDP into the coordinator + config flow

OmniClientV1Adapter (src/omni_pca/v1/adapter.py)
  V2-shape facade over OmniClientV1. Exposes the OmniClient surface the
  HA coordinator was written against — get_system_information,
  list_*_names, get_object_properties (synthesized from streamed names),
  get_extended_status (chunked, routed to v1 typed status opcodes),
  get_object_status(AREA, ...) (derived from SystemStatus.area_alarms),
  events() (EventStream on v1 SystemEvents opcode 35), plus all the
  write-method shims.

  Chunks unit/zone/thermostat/aux polls per-type because firmware 2.12
  NAKs Request*Status with >~62 records in one shot (verified live).
  Falls back to "Area 1".."Area 8" when the UploadNames stream returns
  zero areas — common on panels where the installer didn't name them.

custom_components/omni_pca/coordinator.py
  _ensure_connected picks OmniClientV1Adapter for transport=udp. New
  _walk_properties_v1 replaces the v2 RequestProperties walk with a
  name-stream + synthesized-Properties pass.

custom_components/omni_pca/config_flow.py
  _probe routes to OmniClientV1Adapter for transport=udp instead of
  trying to drive v2 OmniClient over UDP (which silently dropped after
  handshake, per the earlier diagnosis).

src/omni_pca/events.py
  parse_events / _ensure_system_events / EventStream now take an
  expected_opcode arg (default v2 SystemEvents=55, v1 callers pass 35).
  Word format is byte-identical between v1 and v2, so the typed-event
  decoder is unchanged.

src/omni_pca/v1/client.py
  _range_status supports the long-form RequestUnitStatus (BE u16
  start/end) so panels with unit indices > 255 (sprinklers, flags) work.

Verified end-to-end against firmware 2.12 panel at 192.168.1.9:
  config entries:
    state=loaded  Omni Pro II (host.docker.internal)  (mock)
    state=loaded  Omni Pro II (192.168.1.9)           (real, v1+UDP)
  real-panel entities created in HA: 96 (30 binary_sensor, 26 light,
  15 switch, 13 button, 9 sensor, 3 climate)
  cross-check: light.omni_pro_ii_front_porch_2 = on  (matches live
  probe: unit #2 'FRONT PORCH' state=0x01 brightness=100)

dev/probe_v1_coordinator.py
  Coordinator-shaped end-to-end smoke test against the real panel
  without HA — drives the full discovery + poll cycle through the
  adapter. Useful for regression-checking the v1 wire path.

dev/add_real_panel.py
  Programmatically adds the real-panel config entry to the dev HA
  stack via the REST config-flow endpoints. Idempotent.
This commit is contained in:
Ryan Malloy 2026-05-11 01:30:49 -06:00
parent 92c8b695b4
commit 30b482a8cb
8 changed files with 839 additions and 32 deletions

View File

@ -168,15 +168,40 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
key: bytes,
transport: str = DEFAULT_TRANSPORT,
) -> tuple[str | None, str | None]:
"""Try to connect once. Returns (title, error_code)."""
"""Try to connect once. Returns (title, error_code).
TCP uses :class:`OmniClient` (v2 wire protocol). UDP uses the v1
adapter UDP-listening panels speak the legacy wire protocol,
not OmniLink2 see :mod:`omni_pca.v1.adapter` for the bridge.
"""
try:
async with OmniClient(
host,
port=port,
controller_key=key,
transport=transport, # type: ignore[arg-type]
) as client:
info = await client.get_system_information()
if transport == TRANSPORT_UDP:
from omni_pca.v1 import (
HandshakeError as V1HandshakeError,
)
from omni_pca.v1 import (
InvalidEncryptionKeyError as V1InvalidEncryptionKeyError,
)
from omni_pca.v1 import OmniClientV1Adapter
from omni_pca.v1.connection import (
ConnectionError as V1ConnectionError,
)
try:
async with OmniClientV1Adapter(
host, port=port, controller_key=key,
) as client:
info = await client.get_system_information()
except (V1HandshakeError, V1InvalidEncryptionKeyError):
return None, "invalid_auth"
except (V1ConnectionError, OSError, TimeoutError) as err:
LOGGER.debug("v1 probe failed: %s", err)
return None, "cannot_connect"
else:
async with OmniClient(
host, port=port, controller_key=key, transport=transport, # type: ignore[arg-type]
) as client:
info = await client.get_system_information()
except (HandshakeError, InvalidEncryptionKeyError):
return None, "invalid_auth"
except (OmniConnectionError, OSError, TimeoutError) as err:

View File

@ -234,12 +234,25 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
async def _ensure_connected(self) -> OmniClient:
if self._client is not None:
return self._client
client = OmniClient(
self._host,
port=self._port,
controller_key=self._controller_key,
transport=self._transport, # type: ignore[arg-type]
)
if self._transport == "udp":
# Panels listening UDP-only speak the v1 wire protocol, not
# v2. The adapter exposes the OmniClient API surface this
# coordinator was written against, but underneath it drives
# an OmniConnectionV1 + the typed v1 status/command opcodes.
from omni_pca.v1 import OmniClientV1Adapter
client: OmniClient = OmniClientV1Adapter( # type: ignore[assignment]
self._host,
port=self._port,
controller_key=self._controller_key,
)
else:
client = OmniClient(
self._host,
port=self._port,
controller_key=self._controller_key,
transport=self._transport, # type: ignore[arg-type]
)
# Drive __aenter__ manually so the client survives across update
# cycles; we close it explicitly on shutdown / failure.
await client.__aenter__()
@ -392,9 +405,15 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
client's internal parser table only covers zones/units/areas in
v1.0). We drive ``RequestProperties`` directly on the connection
so we don't have to monkey-patch the library.
On UDP/v1 panels there is no ``RequestProperties`` opcode at all,
so we fall back to the v1 adapter's name-stream-based discovery
(each object's ``Properties`` is synthesized from its name).
"""
if parser is None or OBJECT_TYPE_TO_PROPERTIES.get(int(object_type)) is None:
return {}
if self._transport == "udp":
return await self._walk_properties_v1(client, object_type)
out: dict[int, object] = {}
cursor = 0
conn = client.connection
@ -443,6 +462,42 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
break
return out
async def _walk_properties_v1(
self, client: OmniClient, object_type: ObjectType
) -> dict[int, object]:
"""V1 fallback for :meth:`_walk_properties`.
v1 has no RequestProperties opcode names come from streaming
UploadNames and the rest of the Properties fields can't be
recovered from the wire. We delegate to the adapter's
``get_object_properties`` (which synthesizes a minimal record
from the cached name list) and skip anything it returns ``None``
for.
"""
# Pick the right per-type name lister. The adapter caches the
# UploadNames stream output so these are nearly free after the
# first call this discovery pass.
if object_type == ObjectType.THERMOSTAT:
names = await client.list_thermostat_names() # type: ignore[attr-defined]
elif object_type == ObjectType.BUTTON:
names = await client.list_button_names() # type: ignore[attr-defined]
else:
# Programs / Messages / etc — nothing to walk.
return {}
out: dict[int, object] = {}
for idx in sorted(names):
try:
props = await client.get_object_properties(object_type, idx)
except Exception:
LOGGER.debug(
"v1 properties synth failed for %s #%d",
object_type.name, idx, exc_info=True,
)
continue
if props is not None:
out[idx] = props
return out
@staticmethod
async def _best_effort(coro_fn, *, default):
"""Call ``coro_fn()`` and swallow non-transport errors, returning ``default``.

150
dev/add_real_panel.py Normal file
View File

@ -0,0 +1,150 @@
#!/usr/bin/env python3
"""Add a *second* omni_pca config entry pointing at the real panel.
The dev stack already has one entry pointing at the mock panel
(``host.docker.internal:14369``). This script adds another entry for
the real panel at ``192.168.1.9:4369`` using ``transport=udp`` and the
controller key from the bundled .pca fixture.
Run inside the project venv:
cd /home/kdm/home-auto/omni-pca
uv run python dev/add_real_panel.py
"""
from __future__ import annotations
import argparse
import asyncio
import sys
from pathlib import Path
import httpx
sys.path.insert(0, str(Path(__file__).parent))
from probe_v1 import _load_key # type: ignore # noqa: E402
DEFAULT_HA_URL = "http://localhost:8123"
PANEL_HOST = "192.168.1.9"
PANEL_PORT = 4369
DEFAULT_USERNAME = "demo"
DEFAULT_PASSWORD = "demo-password-1234"
async def _get_token(ha_url: str) -> str:
"""Re-use the cached access token; otherwise log in via /auth/login_flow."""
token_file = (
Path(__file__).parent / "ha-config" / ".storage" / ".demo_access_token"
)
if token_file.exists():
return token_file.read_text().strip()
async with httpx.AsyncClient(base_url=ha_url, timeout=15.0) as client:
r = await client.post(
"/auth/login_flow",
json={
"client_id": ha_url,
"handler": ["homeassistant", None],
"redirect_uri": ha_url,
},
)
r.raise_for_status()
flow_id = r.json()["flow_id"]
r = await client.post(
f"/auth/login_flow/{flow_id}",
json={
"client_id": ha_url,
"username": DEFAULT_USERNAME,
"password": DEFAULT_PASSWORD,
},
)
r.raise_for_status()
auth_code = r.json()["result"]
r = await client.post(
"/auth/token",
data={
"client_id": ha_url,
"grant_type": "authorization_code",
"code": auth_code,
},
)
r.raise_for_status()
token = r.json()["access_token"]
# Cache for next run.
try:
token_file.write_text(token)
except Exception:
pass
return token
async def amain(args: argparse.Namespace) -> int:
key_bytes = _load_key(None)
key_hex = key_bytes.hex()
print(f"[add-real-panel] target HA: {args.ha_url}")
print(f"[add-real-panel] panel: {PANEL_HOST}:{PANEL_PORT} (UDP)")
print(f"[add-real-panel] key: ...{key_hex[-4:]} (16 bytes)\n")
token = await _get_token(args.ha_url)
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient(base_url=args.ha_url, timeout=30.0) as client:
# ---- check if an entry already exists for this host ----
r = await client.get(
"/api/config/config_entries/entry", headers=headers
)
r.raise_for_status()
for entry in r.json():
if entry.get("domain") != "omni_pca":
continue
data = entry.get("data", {})
if data.get("host") == PANEL_HOST and data.get("port") == PANEL_PORT:
print(f" already configured: {entry['title']} ({entry['entry_id']})")
return 0
# ---- start the config flow ----
r = await client.post(
"/api/config/config_entries/flow",
headers=headers,
json={"handler": "omni_pca", "show_advanced_options": False},
)
r.raise_for_status()
flow = r.json()
flow_id = flow["flow_id"]
print(f" flow opened: {flow_id} (step={flow.get('step_id')})")
# ---- submit the form for the real panel ----
r = await client.post(
f"/api/config/config_entries/flow/{flow_id}",
headers=headers,
json={
"host": PANEL_HOST,
"port": PANEL_PORT,
"controller_key": key_hex,
"transport": "udp",
},
timeout=60.0, # the probe round-trip can take a few seconds
)
r.raise_for_status()
result = r.json()
if result.get("type") == "create_entry":
print(f" ✓ entry created: {result.get('title')}")
print(f" entry_id: {result.get('result')}")
elif result.get("type") == "form":
print(f" form re-shown — errors: {result.get('errors')}")
return 1
else:
print(f" unexpected outcome: {result}")
return 1
return 0
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--ha-url", default=DEFAULT_HA_URL)
args = parser.parse_args()
return asyncio.run(amain(args))
if __name__ == "__main__":
sys.exit(main())

129
dev/probe_v1_coordinator.py Normal file
View File

@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""Phase-3 smoke test: drive OmniClientV1Adapter through the same
sequence the HA coordinator runs in async_config_entry_first_refresh.
Doesn't pull in HA; just executes the discovery + initial poll pattern
against the real panel and prints what an OmniData snapshot would look
like. If this works, the actual HA coordinator should work too.
Run:
cd /home/kdm/home-auto/omni-pca
uv run python dev/probe_v1_coordinator.py
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from probe_v1 import _load_key # type: ignore # noqa: E402
from omni_pca.models import ObjectType
from omni_pca.v1 import OmniClientV1Adapter
async def amain(args: argparse.Namespace) -> int:
key = _load_key(args.key)
print(f"[coord probe] target {args.host}:{args.port}\n")
async with OmniClientV1Adapter(
host=args.host, port=args.port, controller_key=key, timeout=10.0
) as c:
# ---- 1. SystemInformation ----
info = await c.get_system_information()
print(f"=== SystemInformation ===\n"
f" model={info.model_name} fw={info.firmware_version}\n")
# ---- 2. Discovery: per-type names + synthesized properties ----
print("=== Discovery (UploadNames stream + synth Properties) ===")
zone_names = await c.list_zone_names()
unit_names = await c.list_unit_names()
area_names = await c.list_area_names()
tstat_names = await c.list_thermostat_names()
button_names = await c.list_button_names()
print(f" zones: {len(zone_names)}")
print(f" units: {len(unit_names)}")
print(f" areas: {len(area_names)} (fallback if 0 streamed)")
print(f" thermostats: {len(tstat_names)}")
print(f" buttons: {len(button_names)}")
print()
# Sanity-check that get_object_properties returns a real dataclass
# for one zone, one unit, one area, one thermostat, one button.
for type_byte, name_dict, label in [
(ObjectType.ZONE, zone_names, "Zone"),
(ObjectType.UNIT, unit_names, "Unit"),
(ObjectType.AREA, area_names, "Area"),
(ObjectType.THERMOSTAT, tstat_names, "Thermostat"),
(ObjectType.BUTTON, button_names, "Button"),
]:
if not name_dict:
print(f" {label}: no entries, skipping property synth")
continue
idx = min(name_dict)
props = await c.get_object_properties(type_byte, idx)
print(f" {label} #{idx}: {props}")
print()
# ---- 3. Polling: bulk status for each type, plus area derivation ----
print("=== Polling (bulk status) ===")
if zone_names:
zone_end = max(zone_names)
zones = await c.get_extended_status(ObjectType.ZONE, 1, zone_end)
open_zones = [z for z in zones if getattr(z, "is_open", False)]
print(f" ZoneStatus[1..{zone_end}]: {len(zones)} records, "
f"{len(open_zones)} currently open")
if unit_names:
unit_end = max(unit_names)
units = await c.get_extended_status(ObjectType.UNIT, 1, unit_end)
on_units = [u for u in units if getattr(u, "is_on", False)]
print(f" UnitStatus[1..{unit_end}]: {len(units)} records, "
f"{len(on_units)} currently on")
if tstat_names:
tstat_end = max(tstat_names)
tstats = await c.get_extended_status(
ObjectType.THERMOSTAT, 1, tstat_end
)
print(f" ThermostatStatus[1..{tstat_end}]: {len(tstats)} records")
# Areas: derived from SystemStatus
if area_names:
area_end = max(area_names)
areas = await c.get_object_status(ObjectType.AREA, 1, area_end)
modes = [a.mode for a in areas]
print(f" AreaStatus[1..{area_end}]: {len(areas)} records, "
f"modes={modes}")
print()
# ---- 4. SystemStatus ----
status = await c.get_system_status()
print(f"=== SystemStatus ===\n"
f" panel_time={status.panel_time} "
f"battery=0x{status.battery_reading:02x}\n"
f" sunrise={status.sunrise_hour:02d}:{status.sunrise_minute:02d} "
f"sunset={status.sunset_hour:02d}:{status.sunset_minute:02d}\n")
print("[coord probe] ✓ full discovery + poll cycle worked over v1+UDP")
return 0
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--host", default="192.168.1.9")
parser.add_argument("--port", type=int, default=4369)
parser.add_argument("--key", help="32 hex chars; overrides env/.omni_key")
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.WARNING,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
return asyncio.run(amain(args))
if __name__ == "__main__":
sys.exit(main())

View File

@ -127,17 +127,22 @@ class UpbLinkAction(IntEnum):
# --------------------------------------------------------------------------
def _ensure_system_events(message: Message) -> bytes:
"""Validate that ``message`` is a v2 SystemEvents reply, return its
payload bytes (everything after the opcode).
def _ensure_system_events(
message: Message,
expected_opcode: int = int(OmniLink2MessageType.SystemEvents),
) -> bytes:
"""Validate that ``message`` is a SystemEvents reply, return payload bytes.
Reference: clsOLMsgSystemEvents.cs (entire file) the message body
is just ``[opcode][word1_hi][word1_lo][word2_hi][word2_lo]``.
The v1 and v2 SystemEvents inner-message bodies are byte-identical
(clsOLMsgSystemEvents.cs vs clsOL2MsgSystemEvents.cs both yield
``[opcode][word1_hi][word1_lo][word2_hi][word2_lo]``); only the
opcode byte differs (35 vs 55). Pass ``expected_opcode`` to dispatch
the v1 path from :class:`omni_pca.v1.adapter.OmniClientV1Adapter`.
"""
if message.opcode != int(OmniLink2MessageType.SystemEvents):
if message.opcode != expected_opcode:
raise ValueError(
"not a SystemEvents message: opcode "
f"{message.opcode} (expected {int(OmniLink2MessageType.SystemEvents)})"
f"not a SystemEvents message: opcode {message.opcode} "
f"(expected {expected_opcode})"
)
payload = message.payload
if len(payload) % 2 != 0:
@ -700,18 +705,23 @@ def _classify(word: int) -> SystemEvent:
# --------------------------------------------------------------------------
def parse_events(message: Message) -> list[SystemEvent]:
"""Decode a v2 ``SystemEvents`` (opcode 55) message into typed events.
def parse_events(
message: Message,
expected_opcode: int = int(OmniLink2MessageType.SystemEvents),
) -> list[SystemEvent]:
"""Decode a ``SystemEvents`` message into typed events.
The panel batches multiple state changes into a single message, so
the return type is always a list even for messages that carry just
one event. Empty SystemEvents messages return an empty list rather
than raising.
Reference: clsOLMsgSystemEvents.cs:10-18 (SystemEventsCount + per-
word accessor).
``expected_opcode`` defaults to v2 (55); pass v1's value (35) when
decoding from a ``v1.OmniConnectionV1`` push stream.
Reference: clsOLMsgSystemEvents.cs / clsOL2MsgSystemEvents.cs.
"""
payload = _ensure_system_events(message)
payload = _ensure_system_events(message, expected_opcode)
return [_classify(w) for w in _iter_event_words(payload)]
@ -790,6 +800,7 @@ class EventStream:
"""
source: object # OmniConnection or duck-typed equivalent
expected_opcode: int = int(OmniLink2MessageType.SystemEvents)
_buffer: list[SystemEvent] = field(default_factory=list)
def __post_init__(self) -> None:
@ -817,10 +828,10 @@ class EventStream:
raise
except asyncio.CancelledError:
raise
if msg.opcode != int(OmniLink2MessageType.SystemEvents):
if msg.opcode != self.expected_opcode:
# Non-event message (Status, Ack, …) — silently ignore.
continue
self._buffer = parse_events(msg)
self._buffer = parse_events(msg, self.expected_opcode)
return self._buffer.pop(0)

View File

@ -13,6 +13,7 @@ V1 for Modem/UDP/Serial, V2 only for TCP).
from __future__ import annotations
from .adapter import OmniClientV1Adapter
from .client import OmniClientV1, OmniNakError, OmniProtocolError
from .connection import (
HandshakeError,
@ -37,6 +38,7 @@ __all__ = [
"NameRecord",
"NameType",
"OmniClientV1",
"OmniClientV1Adapter",
"OmniConnectionV1",
"OmniNakError",
"OmniProtocolError",

424
src/omni_pca/v1/adapter.py Normal file
View File

@ -0,0 +1,424 @@
"""V2-shape adapter over :class:`OmniClientV1`.
The Home Assistant coordinator was written against :class:`omni_pca.client.OmniClient`
(the v2 API). When the user configures ``transport=udp`` we need a client
that *looks* like ``OmniClient`` but speaks v1-over-UDP underneath.
This adapter exposes only the methods the coordinator and entity
platforms actually call. Where v1 lacks a v2 opcode (Properties for
zones/units/areas, AcknowledgeAlerts), we synthesize a sensible
fallback rather than raise HA users shouldn't have to care that their
panel is on a different wire protocol.
What the adapter does:
* **Discovery (``list_*_names``)**: delegates to ``OmniClientV1`` (which
drives the streaming ``UploadNames`` flow once per call).
* **Properties (``get_object_properties``)**: synthesizes a minimal
``*Properties`` dataclass from the name alone. v1 has no Properties
opcode, so we can't fetch zone_type / unit_type / area_alarms / etc.
Defaults are zero entity platforms read mostly the name + the live
``*Status`` snapshot, so this works for the common case.
* **Bulk status (``get_extended_status``)**: routes Zone/Unit/Thermostat/
AuxSensor through the v1 typed ``get_*_status`` calls and returns the
resulting dataclass list (same shape v2 produces).
* **Area status (``get_object_status(AREA, )``)**: derives ``AreaStatus``
records from the per-area mode bytes in v1 ``SystemStatus`` v1 has
no per-area status opcode and the modes are the only thing the panel
reports on UDP.
* **Events (``events()``)**: returns an :class:`EventStream` filtered on
v1's SystemEvents opcode (35) instead of v2's (55). Word format is
identical, so the existing typed-event decoder works unchanged.
* **Writes**: pass-through to the underlying ``OmniClientV1`` methods,
whose Command / ExecuteSecurityCommand payloads are byte-identical
to v2 only the opcode differs.
"""
from __future__ import annotations
from collections.abc import AsyncIterator, Awaitable, Callable
from typing import Self
from ..commands import Command
from ..events import EventStream, SystemEvent
from ..models import (
AreaProperties,
AreaStatus,
AuxSensorStatus,
ButtonProperties,
ObjectType,
SecurityMode,
SystemInformation,
SystemStatus,
ThermostatProperties,
ThermostatStatus,
UnitProperties,
UnitStatus,
ZoneProperties,
ZoneStatus,
)
from ..opcodes import OmniLinkMessageType
from .client import OmniClientV1
from .connection import OmniConnectionV1
# Type used by coordinator for object_type arg (the IntEnum in
# omni_pca.client is just a re-export of models.ObjectType).
_ObjectType = ObjectType
_DEFAULT_PORT = 4369
class OmniClientV1Adapter:
"""V2-shaped facade over :class:`OmniClientV1`.
Construct with the same kwargs as :class:`OmniClient`; the
coordinator does not need to know which one it has.
"""
def __init__(
self,
host: str,
port: int = _DEFAULT_PORT,
controller_key: bytes = b"",
timeout: float = 5.0,
retry_count: int = 3,
**_ignored,
) -> None:
# `transport=` and similar kwargs are accepted-and-ignored so the
# coordinator's construction call stays identical across v1/v2.
self._client = OmniClientV1(
host=host,
port=port,
controller_key=controller_key,
timeout=timeout,
retry_count=retry_count,
)
# ---- lifecycle ------------------------------------------------------
async def __aenter__(self) -> Self:
await self._client.__aenter__()
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
await self._client.__aexit__(exc_type, exc, tb)
@property
def connection(self) -> OmniConnectionV1:
"""Underlying :class:`OmniConnectionV1` — used by the coordinator's
low-level walks. v1's connection has the same ``unsolicited()`` /
``request()`` surface as v2's, just a different wire dialect.
"""
return self._client.connection
# ---- panel-wide reads ----------------------------------------------
async def get_system_information(self) -> SystemInformation:
return await self._client.get_system_information()
async def get_system_status(self) -> SystemStatus:
return await self._client.get_system_status()
# ---- discovery (cached once per coordinator setup) -----------------
#
# The coordinator calls list_*_names() once per object type. Each
# call drives a fresh UploadNames stream, which on this panel takes
# ~250ms per ~100 names. We cache the full bucketed dict on first
# call so the four list_*_names() calls + several synthesize-
# properties calls all share one network roundtrip.
async def _ensure_names(self) -> dict[int, dict[int, str]]:
cached = getattr(self, "_name_cache", None)
if cached is None:
cached = await self._client.list_all_names()
self._name_cache = cached
return cached
def _invalidate_names(self) -> None:
"""Force the next discovery call to re-stream UploadNames."""
self._name_cache = None # type: ignore[assignment]
async def list_zone_names(self) -> dict[int, str]:
return (await self._ensure_names()).get(1, {}) # NameType.ZONE
async def list_unit_names(self) -> dict[int, str]:
return (await self._ensure_names()).get(2, {}) # NameType.UNIT
async def list_area_names(self) -> dict[int, str]:
"""Return area names, falling back to "Area N" when stream is empty.
Most v1 panels don't expose user-assigned area names — the slots
exist (8 for Omni Pro II) but the .pca file leaves them zero-
filled. HA needs *something* to label each area entity, so we
synthesize "Area 1".."Area 8" as a fixed-size fallback. The 8
is the Omni Pro II cap; we cap here even when ``SystemStatus``
reports more mode bytes because the long-form SystemStatus
payload mixes in EE-expansion telemetry past byte 22.
"""
named = (await self._ensure_names()).get(5, {}) # NameType.AREA
if named:
return named
return {i: f"Area {i}" for i in range(1, 9)}
async def list_thermostat_names(self) -> dict[int, str]:
return (await self._ensure_names()).get(6, {}) # NameType.THERMOSTAT
async def list_button_names(self) -> dict[int, str]:
return (await self._ensure_names()).get(3, {}) # NameType.BUTTON
async def list_code_names(self) -> dict[int, str]:
return (await self._ensure_names()).get(4, {}) # NameType.CODE
async def list_message_names(self) -> dict[int, str]:
return (await self._ensure_names()).get(7, {}) # NameType.MESSAGE
# ---- properties synthesis ------------------------------------------
async def get_object_properties(
self, object_type: ObjectType, index: int
) -> ZoneProperties | UnitProperties | AreaProperties | ThermostatProperties | None:
"""Synthesize a Properties dataclass from the name alone.
v1 has no ``RequestProperties`` opcode; the rich fields v2 carries
(zone_type, unit areas bitfield, exit/entry delays, ) simply
aren't reachable on UDP. We return a minimal dataclass with just
``index`` + ``name`` populated and everything else defaulted to
0/False so entity setup doesn't need a transport branch.
Returns ``None`` if the object isn't defined (no name and not in
the default area-fallback range), which mirrors v2's behavior
when ``RequestProperties`` walks past the last defined object.
"""
names = await self._ensure_names()
if object_type == ObjectType.ZONE:
name = names.get(1, {}).get(index)
if not name:
return None
return ZoneProperties(
index=index, name=name, zone_type=0, area=1,
options=0, status=0, loop=0,
)
if object_type == ObjectType.UNIT:
name = names.get(2, {}).get(index)
if not name:
return None
return UnitProperties(
index=index, name=name, unit_type=0,
status=0, time=0, areas=0,
)
if object_type == ObjectType.AREA:
# Use the same fallback logic as list_area_names so HA always
# gets at least the 8 default-area entries.
label = (await self.list_area_names()).get(index)
if label is None:
return None
return AreaProperties(
index=index, name=label, mode=0, alarms=0,
enabled=True, entry_delay=0, exit_delay=0,
)
if object_type == ObjectType.THERMOSTAT:
name = names.get(6, {}).get(index)
if not name:
return None
return ThermostatProperties(
index=index, name=name, thermostat_type=0,
communicating=True,
)
if object_type == ObjectType.BUTTON:
name = names.get(3, {}).get(index)
if not name:
return None
return ButtonProperties(index=index, name=name)
return None
# ---- bulk status ---------------------------------------------------
# Per-type max records per chunk. Empirically firmware 2.12 caps unit
# responses around 62 records regardless of the MessageLength byte
# limit; other types follow similar conservative caps. We chunk well
# under those thresholds to leave headroom for any per-firmware
# variance and the AES zero-padding the wire frames add.
_CHUNK_SIZES: dict[int, int] = {
ObjectType.ZONE: 80, # 2 B/rec, panel caps high enough
ObjectType.UNIT: 40, # firmware 2.12 NAKs at 63+ records
ObjectType.THERMOSTAT: 30,
ObjectType.AUXILIARY: 60,
}
async def get_extended_status(
self,
object_type: ObjectType,
start: int,
end: int | None = None,
) -> list:
"""Route v2 ``get_extended_status`` to the matching v1 typed call.
v1 panels (Omni Pro II) can have 511 units across a sparse
address space. We chunk wide ranges into per-type-sized batches
and concatenate the records same effect for the caller, only
the wire transcript is different.
"""
last = end if end is not None else start
if object_type == ObjectType.ZONE:
fetch = self._client.get_zone_status
elif object_type == ObjectType.UNIT:
fetch = self._client.get_unit_status
elif object_type == ObjectType.THERMOSTAT:
fetch = self._client.get_thermostat_status
elif object_type == ObjectType.AUXILIARY:
fetch = self._client.get_aux_status
else:
raise ValueError(
f"v1 has no bulk extended-status opcode for {object_type.name}"
)
chunk = self._CHUNK_SIZES.get(int(object_type), 40)
out: dict[int, object] = {}
cur = start
while cur <= last:
chunk_end = min(cur + chunk - 1, last)
records = await fetch(cur, chunk_end)
out.update(records)
cur = chunk_end + 1
return [out[i] for i in sorted(out)]
async def get_object_status(
self,
object_type: ObjectType,
start: int,
end: int | None = None,
) -> list:
"""Synthesize AreaStatus from SystemStatus's per-area mode bytes.
v1 has no per-area status opcode but the SystemStatus payload
carries one ``Mode`` byte per area (single-area panels see one
byte at offset 15, multi-area panels see N consecutive bytes).
We promote each into an :class:`AreaStatus` with just ``index``
and ``mode`` populated; entry/exit timers and alarms are zero
because the protocol doesn't expose them at this level.
For non-area object types we fall back to extended-status, which
on v1 maps to the basic typed-status opcodes (which is what the
v2 coordinator actually wants anyway since v2's basic and
extended status are interchangeable in shape).
"""
if object_type != ObjectType.AREA:
return await self.get_extended_status(object_type, start, end)
last = end if end is not None else start
status = await self._client.get_system_status()
# First N bytes of area_alarms are valid area modes; the rest are
# EE-expansion data on long SystemStatus payloads (firmware 2.12
# length=39 form). We can't reliably tell where modes stop, so
# match against the list_area_names() count from the same
# SystemStatus.
area_count = max(1, min(8, len(status.area_alarms)))
out: list[AreaStatus] = []
for idx in range(start, min(last, area_count) + 1):
mode_pair = (
status.area_alarms[idx - 1] if idx - 1 < len(status.area_alarms)
else (0, 0)
)
out.append(
AreaStatus(
index=idx,
mode=mode_pair[0],
last_user=0,
entry_timer_secs=0,
exit_timer_secs=0,
alarms=mode_pair[1],
)
)
return out
# ---- events --------------------------------------------------------
def events(self) -> AsyncIterator[SystemEvent]:
"""v1-aware EventStream — filters on v1 SystemEvents opcode (35)."""
return EventStream(
self._client.connection,
expected_opcode=int(OmniLinkMessageType.SystemEvents),
).__aiter__()
async def subscribe(
self, callback: Callable[[object], Awaitable[None]]
) -> None:
"""Not used by the coordinator (which prefers ``events()``); kept
for API parity with :class:`OmniClient`. Raises ``NotImplementedError``
to flag accidental use when we need it, copy the v2 implementation.
"""
raise NotImplementedError(
"OmniClientV1Adapter.subscribe is not implemented; "
"use events() instead"
)
# ---- writes (pure pass-through) ------------------------------------
async def execute_command(
self, command: Command, parameter1: int = 0, parameter2: int = 0
) -> None:
await self._client.execute_command(command, parameter1, parameter2)
async def execute_security_command(
self, area: int, mode: SecurityMode, code: int
) -> None:
await self._client.execute_security_command(area, mode, code)
async def acknowledge_alerts(self) -> None:
await self._client.acknowledge_alerts()
async def turn_unit_on(self, index: int) -> None:
await self._client.turn_unit_on(index)
async def turn_unit_off(self, index: int) -> None:
await self._client.turn_unit_off(index)
async def set_unit_level(self, index: int, percent: int) -> None:
await self._client.set_unit_level(index, percent)
async def bypass_zone(self, index: int, code: int = 0) -> None:
await self._client.bypass_zone(index, code)
async def restore_zone(self, index: int, code: int = 0) -> None:
await self._client.restore_zone(index, code)
async def execute_button(self, index: int) -> None:
await self._client.execute_button(index)
async def execute_program(self, index: int) -> None:
"""Run a panel program by index.
v1 ``enuUnitCommand.Execute`` (raw byte not aliased in our enum)
and v2 both use a generic Command. The Command enum's
``EXECUTE_PROGRAM`` value works on both because the on-the-wire
Command body is byte-identical.
"""
await self.execute_command(Command.EXECUTE_PROGRAM, parameter2=index)
async def show_message(self, index: int, beep: bool = True) -> None:
await self.execute_command(
Command.SHOW_MESSAGE_WITH_BEEP if beep else Command.SHOW_MESSAGE_NO_BEEP,
parameter2=index,
)
async def clear_message(self, index: int) -> None:
await self.execute_command(Command.CLEAR_MESSAGE, parameter2=index)
async def set_thermostat_system_mode(self, index: int, mode_value: int) -> None:
await self._client.set_thermostat_system_mode(index, mode_value)
async def set_thermostat_fan_mode(self, index: int, mode_value: int) -> None:
await self._client.set_thermostat_fan_mode(index, mode_value)
async def set_thermostat_hold_mode(self, index: int, mode_value: int) -> None:
await self._client.set_thermostat_hold_mode(index, mode_value)
async def set_thermostat_heat_setpoint_raw(
self, index: int, raw_temp: int
) -> None:
await self._client.set_thermostat_heat_setpoint_raw(index, raw_temp)
async def set_thermostat_cool_setpoint_raw(
self, index: int, raw_temp: int
) -> None:
await self._client.set_thermostat_cool_setpoint_raw(index, raw_temp)

View File

@ -415,11 +415,22 @@ class OmniClientV1:
end: int,
parser: Callable[[bytes, int], list[T]],
) -> dict[int, T]:
if not 1 <= start <= end <= 0xFF:
if not 1 <= start <= end <= 0xFFFF:
raise ValueError(
f"invalid range: start={start}, end={end} (must be 1..255 with start<=end)"
f"invalid range: start={start}, end={end} "
f"(must be 1..65535 with start<=end)"
)
# v1 has two payload forms (clsOLMsgRequestUnitStatus.cs:18-31):
# short (3-byte msg with 1-byte start+end) when both ≤ 255, long
# (5-byte msg with BE u16 start+end) otherwise. The panel picks
# the right reply format based on what it received.
if start <= 0xFF and end <= 0xFF:
payload = bytes([start, end])
else:
payload = bytes(
[(start >> 8) & 0xFF, start & 0xFF,
(end >> 8) & 0xFF, end & 0xFF]
)
payload = bytes([start, end])
reply = await self._conn.request(request_op, payload)
self._expect(reply.opcode, reply_op)
records = parser(reply.payload, start)