Ryan Malloy e57fbc41e3 HA: optional .pca file as alternate source for panel programs
Adds CONF_PCA_PATH + CONF_PCA_KEY config-flow fields. When set, the
coordinator parses programs from the .pca file at that path instead
of streaming them over the wire on every entry refresh. Useful for:

* deployments where wire enumeration is slow (1500-slot iteration)
* offline snapshots when the panel is unreachable
* deterministic test setups against a known fixture

The config-flow validates the file is readable and decrypts cleanly,
surfacing pca_not_found / pca_decode_failed errors via the strings/
en.json translations.

The .pca path is checked first in _discover_programs; if absent the
wire path runs as before. So existing deployments are unaffected.

Tests cover the success path (live fixture, 330 programs) and the
two validation failures (missing file, garbage bytes).
2026-05-12 19:15:32 -06:00

267 lines
10 KiB
Python

"""Config flow for the HAI/Leviton Omni Panel integration."""
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_HOST, CONF_PORT
from omni_pca.client import OmniClient
from omni_pca.connection import (
ConnectionError as OmniConnectionError,
)
from omni_pca.connection import (
HandshakeError,
InvalidEncryptionKeyError,
)
from .const import (
CONF_CONTROLLER_KEY,
CONF_PCA_KEY,
CONF_PCA_PATH,
CONF_TRANSPORT,
CONTROLLER_KEY_HEX_LEN,
DEFAULT_PORT,
DEFAULT_TRANSPORT,
DOMAIN,
LOGGER,
TRANSPORT_TCP,
TRANSPORT_UDP,
)
class InvalidControllerKey(ValueError): # noqa: N818 - public surface, predates rule
"""The supplied controller key is not 32 hex characters."""
def parse_controller_key(raw: str) -> bytes:
"""Validate and decode a 32-char hex controller key into 16 raw bytes.
Pure function so it can be unit-tested without a HA harness.
Whitespace and a leading ``0x`` prefix are tolerated; case-insensitive.
"""
if not isinstance(raw, str):
raise InvalidControllerKey("controller key must be a string")
cleaned = raw.strip().replace(" ", "").replace(":", "").replace("-", "")
if cleaned.lower().startswith("0x"):
cleaned = cleaned[2:]
if len(cleaned) != CONTROLLER_KEY_HEX_LEN:
raise InvalidControllerKey(
f"controller key must be {CONTROLLER_KEY_HEX_LEN} hex characters "
f"(got {len(cleaned)})"
)
try:
return bytes.fromhex(cleaned)
except ValueError as err:
raise InvalidControllerKey(f"controller key is not valid hex: {err}") from err
_USER_SCHEMA = vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Required(CONF_PORT, default=DEFAULT_PORT): vol.All(
vol.Coerce(int), vol.Range(min=1, max=65535)
),
vol.Required(CONF_CONTROLLER_KEY): str,
# Most modern firmware uses TCP; some installers configure
# Network_UDP. PC Access stores the choice as
# enuPreferredNetworkProtocol in the .pca config.
vol.Required(CONF_TRANSPORT, default=DEFAULT_TRANSPORT): vol.In(
[TRANSPORT_TCP, TRANSPORT_UDP]
),
# Optional: load panel programs from a saved .pca file on the HA
# filesystem (e.g. /config/pca/My_House.pca) instead of streaming
# them from the panel on every restart. Useful when wire
# enumeration is slow or unreliable. CONF_PCA_KEY is the
# per-install key from PCA01.CFG (a uint32, 0 for plain-text).
vol.Optional(CONF_PCA_PATH, default=""): str,
vol.Optional(CONF_PCA_KEY, default=0): vol.All(
vol.Coerce(int), vol.Range(min=0, max=0xFFFFFFFF)
),
}
)
class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for omni_pca."""
VERSION = 1
def __init__(self) -> None:
self._reauth_entry_data: Mapping[str, Any] | None = None
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
errors: dict[str, str] = {}
if user_input is not None:
host: str = user_input[CONF_HOST].strip()
port: int = user_input[CONF_PORT]
transport: str = user_input.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
pca_path: str = (user_input.get(CONF_PCA_PATH) or "").strip()
pca_key: int = user_input.get(CONF_PCA_KEY, 0)
unique_id = f"{host}:{port}"
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_configured()
try:
key = parse_controller_key(user_input[CONF_CONTROLLER_KEY])
except InvalidControllerKey as err:
LOGGER.debug("controller key rejected: %s", err)
errors[CONF_CONTROLLER_KEY] = "invalid_key"
else:
if pca_path and (pca_err := await self._validate_pca(pca_path, pca_key)):
errors[CONF_PCA_PATH] = pca_err
if not errors:
title, error = await self._probe(host, port, key, transport)
if error is not None:
errors["base"] = error
else:
return self.async_create_entry(
title=title or f"Omni Panel ({host})",
data={
CONF_HOST: host,
CONF_PORT: port,
CONF_CONTROLLER_KEY: key.hex(),
CONF_TRANSPORT: transport,
CONF_PCA_PATH: pca_path,
CONF_PCA_KEY: pca_key,
},
)
return self.async_show_form(
step_id="user",
data_schema=_USER_SCHEMA,
errors=errors,
)
async def _validate_pca(self, path: str, key: int) -> str | None:
"""Validate the user-supplied .pca path is readable and decrypts.
Returns ``None`` on success or an error code string on failure
(matches the {code: message} keys in strings.json).
"""
from pathlib import Path
from omni_pca.pca_file import parse_pca_file
p = Path(path)
if not p.is_file():
return "pca_not_found"
try:
data = await self.hass.async_add_executor_job(p.read_bytes)
acct = await self.hass.async_add_executor_job(
lambda: parse_pca_file(data, key=key)
)
except Exception as err:
LOGGER.debug("pca file rejected: %s", err)
return "pca_decode_failed"
# Sanity: programs block decoded cleanly. Empty is allowed
# (legitimate brand-new install with no programs).
if not isinstance(acct.programs, tuple):
return "pca_decode_failed"
return None
async def async_step_reauth(
self, entry_data: Mapping[str, Any]
) -> ConfigFlowResult:
self._reauth_entry_data = entry_data
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
assert self._reauth_entry_data is not None
host: str = self._reauth_entry_data[CONF_HOST]
port: int = self._reauth_entry_data[CONF_PORT]
transport: str = self._reauth_entry_data.get(
CONF_TRANSPORT, DEFAULT_TRANSPORT
)
errors: dict[str, str] = {}
if user_input is not None:
try:
key = parse_controller_key(user_input[CONF_CONTROLLER_KEY])
except InvalidControllerKey:
errors[CONF_CONTROLLER_KEY] = "invalid_key"
else:
_, error = await self._probe(host, port, key, transport)
if error is not None:
errors["base"] = error
else:
entry = self._get_reauth_entry()
new_data = {**entry.data, CONF_CONTROLLER_KEY: key.hex()}
return self.async_update_reload_and_abort(entry, data=new_data)
return self.async_show_form(
step_id="reauth_confirm",
data_schema=vol.Schema({vol.Required(CONF_CONTROLLER_KEY): str}),
description_placeholders={"host": host, "port": str(port)},
errors=errors,
)
# ---- helpers ---------------------------------------------------------
async def _probe(
self,
host: str,
port: int,
key: bytes,
transport: str = DEFAULT_TRANSPORT,
) -> tuple[str | None, str | None]:
"""Try to connect once. Returns (title, error_code).
TCP uses :class:`OmniClient` (v2 wire protocol). UDP uses the v1
adapter — UDP-listening panels speak the legacy wire protocol,
not OmniLink2 — see :mod:`omni_pca.v1.adapter` for the bridge.
"""
try:
if transport == TRANSPORT_UDP:
from omni_pca.v1 import (
HandshakeError as V1HandshakeError,
)
from omni_pca.v1 import (
InvalidEncryptionKeyError as V1InvalidEncryptionKeyError,
)
from omni_pca.v1 import OmniClientV1Adapter
from omni_pca.v1.connection import (
ConnectionError as V1ConnectionError,
)
try:
async with OmniClientV1Adapter(
host, port=port, controller_key=key,
) as client:
info = await client.get_system_information()
except (V1HandshakeError, V1InvalidEncryptionKeyError):
return None, "invalid_auth"
except (V1ConnectionError, OSError, TimeoutError) as err:
LOGGER.debug("v1 probe failed: %s", err)
return None, "cannot_connect"
else:
async with OmniClient(
host, port=port, controller_key=key, transport=transport, # type: ignore[arg-type]
) as client:
info = await client.get_system_information()
except (HandshakeError, InvalidEncryptionKeyError):
return None, "invalid_auth"
except (OmniConnectionError, OSError, TimeoutError) as err:
LOGGER.debug("probe connect failed: %s", err)
return None, "cannot_connect"
except Exception:
LOGGER.exception("unexpected probe failure")
return None, "unknown"
return f"{info.model_name} ({host})", None
def _get_reauth_entry(self): # type: ignore[no-untyped-def]
"""Resolve the entry being reauthenticated.
Wrapped in a method so tests / older HA versions that lack the
helper can monkeypatch this single accessor.
"""
return self.hass.config_entries.async_get_entry(self.context["entry_id"])