omni-pca/src/omni_pca/v1/connection.py
Ryan Malloy 24eecceff9 docs: second cross-ref pass (HvacMode/FanMode/HoldMode, pca_file, v1/connection)
Follow-up to 0d6465d, sprinkling pca-re/docs/manuals/ citations into
three more files that map to user-visible or installer-visible panel
behavior:

models.HvacMode / FanMode / HoldMode
  Docstrings now explain which values correspond to keypad menu picks
  vs. programmatic-only states, and point at the Owner's Manual
  *Scene Commands* chapter where each menu is laid out.

pca_file.py
  Module docstring adds Cross-references to the Installation Manual's
  *INSTALLER SETUP* chapter (SETUP CONTROL/ZONES/AREAS/MISC/EXPANSION)
  -- those are the keypad screens that produce the very SetupData /
  Names / Programs blocks the parser walks. Also points at APPENDIX C
  (zone and unit mapping) for where the _CAP_OMNI_PRO_II numbers come
  from on the panel side.

v1/connection.py
  Module docstring adds cross-references to the docs-site pages that
  explain (a) the non-public handshake quirks the v1 connection relies
  on for crypto and (b) why subsequent RequestUnitStatus calls need
  the long-form BE u16 payload (Appendix C zone/unit mapping again).

No code changes, doc-only; 387 tests still pass.
2026-05-11 15:33:14 -06:00

531 lines
19 KiB
Python

"""Async UDP connection to an Omni-Link controller speaking the v1 wire protocol.
Differs from :class:`omni_pca.connection.OmniConnection` in three ways:
1. **Transport**: UDP only. Each datagram carries exactly one outer Packet.
2. **Outer packet type for messages**: ``OmniLinkMessage`` (0x10), not
``OmniLink2Message`` (0x20). The 4-step handshake packets are identical.
3. **Inner message format**: v1 ``Message`` with ``StartChar = 0x5A``
(NonAddressable) carrying a v1 opcode, not the v2 ``StartChar = 0x21``
carrying a v2 opcode.
The handshake itself (ClientRequestNewSession → ControllerAckNewSession →
ClientRequestSecureSession → ControllerAckSecureSession) and the AES-128
session key derivation are protocol-agnostic and we reuse the same crypto
primitives.
Reference: clsOmniLinkConnection.cs (UDP path):
udpConnect lines 1239-1295 open + queue ClientRequestNewSession
udpListen lines 1298-1399 receive loop, dispatches replies
udpHandleRequestNewSession lines 1401-1459 step 2 → step 3
udpHandleRequestSecureSession lines 1461-1487 step 4 → OnlineSecure
udpSend lines 1514-1560 outer PacketType = OmniLinkMessage (16)
EncryptPacket lines 372-401 same crypto as TCP
Cross-references:
*Two non-public quirks* — Owner's-Manual-style writeup of the
session-key XOR mix and per-block sequence whitening that this
handshake relies on: https://hai-omni-pro-ii.warehack.ing/explanation/quirks/
*Zone & unit numbering* — explains why subsequent ``RequestUnitStatus``
calls need the long-form (BE u16) payload for unit indices > 255:
https://hai-omni-pro-ii.warehack.ing/explanation/zone-unit-numbering/
"""
from __future__ import annotations
import asyncio
import contextlib
import logging
from collections.abc import AsyncIterator
from enum import IntEnum
from types import TracebackType
from ..crypto import (
BLOCK_SIZE,
decrypt_message_payload,
derive_session_key,
encrypt_message_payload,
)
from ..message import (
START_CHAR_V1_UNADDRESSED,
Message,
MessageCrcError,
)
from ..opcodes import OmniLinkMessageType, PacketType
from ..packet import Packet
_log = logging.getLogger(__name__)
_DEFAULT_PORT = 4369
_SESSION_ID_LEN = 5
_PROTO_VERSION = (0x00, 0x01)
_MAX_SEQ = 0xFFFF
class ConnectionState(IntEnum):
DISCONNECTED = 0
CONNECTING = 1
NEW_SESSION = 2
SECURE = 3
ONLINE = 4
class ConnectionError(OSError): # noqa: A001 - intentional shadow at module scope
pass
class HandshakeError(ConnectionError):
pass
class InvalidEncryptionKeyError(HandshakeError):
"""Controller answered ``ControllerSessionTerminated`` during handshake."""
class ProtocolError(ValueError):
pass
class RequestTimeoutError(TimeoutError):
pass
class OmniConnectionV1:
"""UDP + v1-wire-format connection to an Omni-Link controller."""
def __init__(
self,
host: str,
port: int = _DEFAULT_PORT,
controller_key: bytes = b"",
timeout: float = 5.0,
retry_count: int = 3,
) -> None:
if len(controller_key) != 16:
raise ValueError(
f"controller_key must be 16 bytes, got {len(controller_key)}"
)
self._host = host
self._port = port
self._controller_key = bytes(controller_key)
self._default_timeout = timeout
self._retry_count = max(0, retry_count)
self._udp_transport: asyncio.DatagramTransport | None = None
self._udp_protocol: _OmniDatagramProtocol | None = None
self._state = ConnectionState.DISCONNECTED
self._session_id: bytes | None = None
self._session_key: bytes | None = None
# First wire packet uses seq=1; wraparound skips 0 (reserved for
# unsolicited inbound). See clsOmniLinkConnection.cs:1251 (UDP
# init pktSequence=1, then udpSend pre-increments).
self._next_seq: int = 1
self._pending: dict[int, asyncio.Future[Packet]] = {}
self._unsolicited_queue: asyncio.Queue[Message] = asyncio.Queue()
self._handshake_event: asyncio.Event = asyncio.Event()
self._handshake_packet: Packet | None = None
self._handshake_error: Exception | None = None
self._closed = False
@property
def state(self) -> ConnectionState:
return self._state
@property
def session_key(self) -> bytes | None:
return self._session_key
async def __aenter__(self) -> OmniConnectionV1:
await self.connect()
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
await self.close()
async def connect(self) -> None:
if self._state is not ConnectionState.DISCONNECTED:
raise ConnectionError(
f"already connecting/connected (state={self._state})"
)
self._state = ConnectionState.CONNECTING
try:
loop = asyncio.get_running_loop()
self._udp_transport, self._udp_protocol = (
await loop.create_datagram_endpoint(
lambda: _OmniDatagramProtocol(self),
remote_addr=(self._host, self._port),
)
)
except (TimeoutError, OSError) as exc:
self._state = ConnectionState.DISCONNECTED
raise ConnectionError(f"failed to open UDP socket: {exc}") from exc
try:
await self._do_handshake()
except BaseException:
await self.close()
raise
async def close(self) -> None:
"""Tear down. Politely terminate the panel session first.
Without ClientSessionTerminated the panel keeps our slot allocated
until its idle timeout — and rejects subsequent connect attempts
with ControllerCannotStartNewSession (0x07).
"""
if self._closed:
return
self._closed = True
previous_state = self._state
self._state = ConnectionState.DISCONNECTED
if previous_state in (
ConnectionState.NEW_SESSION,
ConnectionState.SECURE,
ConnectionState.ONLINE,
):
try:
term = Packet(
seq=self._claim_seq(),
type=PacketType.ClientSessionTerminated,
data=b"",
)
self._write_packet(term)
except Exception as exc: # noqa: BLE001 - close() must be idempotent
_log.debug("close: failed to send ClientSessionTerminated: %s", exc)
for fut in self._pending.values():
if not fut.done():
fut.set_exception(ConnectionError("connection closed"))
self._pending.clear()
if self._udp_transport is not None:
with contextlib.suppress(OSError):
self._udp_transport.close()
self._udp_transport = None
self._udp_protocol = None
# ---- public request API ---------------------------------------------
async def request(
self,
opcode: OmniLinkMessageType | int,
payload: bytes = b"",
timeout: float | None = None,
) -> Message:
"""Send a v1 request, await the matching reply, return the inner Message."""
if self._state is not ConnectionState.ONLINE:
raise ConnectionError(
f"cannot send request, connection state={self._state.name}"
)
message = Message(
start_char=START_CHAR_V1_UNADDRESSED,
data=bytes([int(opcode)]) + payload,
)
per_attempt_timeout = timeout if timeout is not None else self._default_timeout
max_attempts = 1 + self._retry_count
last_exc: Exception | None = None
for attempt in range(1, max_attempts + 1):
seq, fut = self._send_encrypted(message)
try:
reply_packet = await asyncio.wait_for(fut, per_attempt_timeout)
except TimeoutError as exc:
last_exc = exc
self._pending.pop(seq, None)
if attempt < max_attempts:
_log.debug(
"udp v1 retry %d/%d on opcode=%d seq=%d",
attempt, max_attempts, int(opcode), seq,
)
continue
raise RequestTimeoutError(
f"no v1 reply for opcode={int(opcode)} "
f"after {max_attempts} attempt(s)"
) from last_exc
return self._decode_inner(reply_packet)
raise RequestTimeoutError(
f"request loop exited without reply for opcode={int(opcode)}"
)
async def iter_streaming(
self,
initial_op: OmniLinkMessageType | int,
*,
ack_op: OmniLinkMessageType | int = OmniLinkMessageType.Ack,
end_op: OmniLinkMessageType | int = OmniLinkMessageType.EOD,
nak_op: OmniLinkMessageType | int = OmniLinkMessageType.Nak,
timeout: float | None = None,
) -> AsyncIterator[Message]:
"""Drive a v1 lock-step streaming download (UploadNames / UploadSetup / etc).
Sends ``initial_op`` (no payload), yields each ``ack_op``-elicited
reply, and stops when the panel sends ``end_op``. ``nak_op`` is
treated as an immediate end-of-stream — no exception (some
firmwares use NAK to signal "no records to upload").
Unlike :meth:`request` we don't retry on timeout — losing a
reply mid-stream desynchronises the conversation, so the right
answer is to surface the timeout and let the caller restart.
"""
if self._state is not ConnectionState.ONLINE:
raise ConnectionError(
f"cannot stream, connection state={self._state.name}"
)
per_reply_timeout = timeout if timeout is not None else self._default_timeout
# Step 1: send the initial bare-opcode request, wait for first reply.
first_msg = Message(
start_char=START_CHAR_V1_UNADDRESSED,
data=bytes([int(initial_op)]),
)
seq, fut = self._send_encrypted(first_msg)
try:
reply_pkt = await asyncio.wait_for(fut, per_reply_timeout)
except TimeoutError as exc:
self._pending.pop(seq, None)
raise RequestTimeoutError(
f"no first reply to streaming opcode={int(initial_op)}"
) from exc
reply = self._decode_inner(reply_pkt)
# Step 2..N: ack-and-receive until end_op or nak_op.
while True:
if reply.opcode == int(end_op) or reply.opcode == int(nak_op):
return
yield reply
ack_msg = Message(
start_char=START_CHAR_V1_UNADDRESSED,
data=bytes([int(ack_op)]),
)
seq, fut = self._send_encrypted(ack_msg)
try:
reply_pkt = await asyncio.wait_for(fut, per_reply_timeout)
except TimeoutError as exc:
self._pending.pop(seq, None)
raise RequestTimeoutError(
f"no reply after streaming Ack (seq={seq})"
) from exc
reply = self._decode_inner(reply_pkt)
def unsolicited(self) -> AsyncIterator[Message]:
queue = self._unsolicited_queue
async def _gen() -> AsyncIterator[Message]:
while True:
yield await queue.get()
return _gen()
# ---- handshake -------------------------------------------------------
async def _do_handshake(self) -> None:
# Step 1: empty ClientRequestNewSession.
self._state = ConnectionState.NEW_SESSION
step1 = Packet(
seq=self._claim_seq(),
type=PacketType.ClientRequestNewSession,
data=b"",
)
self._write_packet(step1)
# Step 2: ControllerAckNewSession (carries protocol version + SessionID).
ack1 = await self._await_handshake_packet()
if ack1.type is PacketType.ControllerCannotStartNewSession:
raise HandshakeError("controller cannot start new session (busy?)")
if ack1.type is not PacketType.ControllerAckNewSession:
raise HandshakeError(f"unexpected step-2 packet type {ack1.type.name}")
if len(ack1.data) < 7:
raise HandshakeError(
f"ControllerAckNewSession payload too short: {len(ack1.data)} bytes"
)
if (ack1.data[0], ack1.data[1]) != _PROTO_VERSION:
raise HandshakeError(
f"unsupported protocol version {ack1.data[0]:#04x}{ack1.data[1]:02x}"
)
self._session_id = bytes(ack1.data[2 : 2 + _SESSION_ID_LEN])
self._session_key = derive_session_key(self._controller_key, self._session_id)
# Step 3: encrypted ClientRequestSecureSession echoing SessionID.
self._state = ConnectionState.SECURE
step3_seq = self._claim_seq()
step3_ct = encrypt_message_payload(
self._session_id, step3_seq, self._session_key
)
step3 = Packet(
seq=step3_seq,
type=PacketType.ClientRequestSecureSession,
data=step3_ct,
)
self._write_packet(step3)
# Step 4: ControllerAckSecureSession (or termination).
ack2 = await self._await_handshake_packet()
if ack2.type is PacketType.ControllerSessionTerminated:
raise InvalidEncryptionKeyError(
"controller terminated session during handshake (wrong ControllerKey?)"
)
if ack2.type is not PacketType.ControllerAckSecureSession:
raise HandshakeError(
f"unexpected step-4 packet type {ack2.type.name}"
)
self._state = ConnectionState.ONLINE
async def _await_handshake_packet(self) -> Packet:
try:
await asyncio.wait_for(
self._handshake_event.wait(), self._default_timeout
)
except TimeoutError as exc:
raise HandshakeError(
"timeout waiting for controller handshake reply"
) from exc
if self._handshake_error is not None:
err = self._handshake_error
self._handshake_error = None
raise err
pkt = self._handshake_packet
self._handshake_packet = None
self._handshake_event.clear()
if pkt is None:
raise HandshakeError("handshake event fired with no packet")
return pkt
# ---- send / receive helpers -----------------------------------------
def _claim_seq(self) -> int:
seq = self._next_seq
nxt = seq + 1
if nxt > _MAX_SEQ or nxt == 0:
nxt = 1
self._next_seq = nxt
return seq
def _send_encrypted(
self, inner: Message
) -> tuple[int, asyncio.Future[Packet]]:
if self._session_key is None:
raise ConnectionError("no session key (handshake not complete)")
seq = self._claim_seq()
plaintext = inner.encode()
ciphertext = encrypt_message_payload(plaintext, seq, self._session_key)
# KEY DIFFERENCE FROM V2: outer type is OmniLinkMessage (0x10),
# not OmniLink2Message (0x20). See clsOmniLinkConnection.cs:1536.
pkt = Packet(seq=seq, type=PacketType.OmniLinkMessage, data=ciphertext)
loop = asyncio.get_running_loop()
fut: asyncio.Future[Packet] = loop.create_future()
self._pending[seq] = fut
self._write_packet(pkt)
return seq, fut
def _write_packet(self, pkt: Packet) -> None:
if self._udp_transport is None:
raise ConnectionError("transport not open")
wire = pkt.encode()
_log.debug(
"TX seq=%d type=%s len=%d", pkt.seq, pkt.type.name, len(pkt.data)
)
self._udp_transport.sendto(wire)
def _decode_inner(self, pkt: Packet) -> Message:
if self._session_key is None:
raise ConnectionError("no session key")
if not pkt.data:
raise ProtocolError("empty packet data")
plaintext = decrypt_message_payload(pkt.data, pkt.seq, self._session_key)
try:
return Message.decode(plaintext)
except MessageCrcError as exc:
raise ProtocolError(f"inner v1 message CRC mismatch: {exc}") from exc
# ---- inbound dispatch (called from the datagram protocol) -----------
def _dispatch(self, pkt: Packet) -> None:
if pkt.data is None:
pkt = Packet(seq=pkt.seq, type=pkt.type, data=b"")
if self._state in (ConnectionState.NEW_SESSION, ConnectionState.SECURE):
handshake_types = {
PacketType.ControllerAckNewSession,
PacketType.ControllerAckSecureSession,
PacketType.ControllerSessionTerminated,
PacketType.ControllerCannotStartNewSession,
}
if pkt.type in handshake_types:
self._handshake_packet = pkt
self._handshake_event.set()
return
if pkt.seq == 0:
if pkt.type is PacketType.OmniLinkMessage:
try:
msg = self._decode_inner(pkt)
except (ProtocolError, ConnectionError) as exc:
_log.warning(
"dropping malformed unsolicited v1 packet: %s", exc
)
return
try:
self._unsolicited_queue.put_nowait(msg)
except asyncio.QueueFull: # pragma: no cover - unbounded queue
_log.warning("unsolicited queue full; dropping message")
return
fut = self._pending.pop(pkt.seq, None)
if fut is None:
_log.debug(
"no waiter for seq=%d type=%s; dropping",
pkt.seq, pkt.type.name,
)
return
if pkt.type is PacketType.ControllerSessionTerminated:
fut.set_exception(ConnectionError("controller terminated session"))
return
if not fut.done():
fut.set_result(pkt)
class _OmniDatagramProtocol(asyncio.DatagramProtocol):
"""asyncio.DatagramProtocol bound to a single OmniConnectionV1.
Each datagram is one complete Packet. We decode it and hand it to the
connection's dispatcher; the dispatcher already knows how to sort
handshake / solicited / unsolicited paths.
"""
def __init__(self, conn: OmniConnectionV1) -> None:
self._conn = conn
def connection_made(self, transport: asyncio.BaseTransport) -> None:
pass
def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
try:
pkt = Packet.decode(data)
except Exception as exc:
_log.warning("dropping malformed UDP datagram: %s", exc)
return
try:
self._conn._dispatch(pkt)
except Exception:
_log.exception("UDP v1 dispatch crashed for seq=%d", pkt.seq)
def error_received(self, exc: Exception) -> None:
_log.warning("UDP v1 socket error: %s", exc)
def connection_lost(self, exc: Exception | None) -> None:
if exc is not None:
_log.warning("UDP v1 transport lost: %s", exc)