Compare commits
3 Commits
cc32081caf
...
d4c4e530f6
| Author | SHA1 | Date | |
|---|---|---|---|
| d4c4e530f6 | |||
| 16655da34c | |||
| 116591be90 |
26
.github/workflows/validate.yml
vendored
Normal file
@ -0,0 +1,26 @@
|
||||
name: Validate
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
pull_request:
|
||||
schedule:
|
||||
- cron: "0 4 * * 1" # weekly Monday 04:00 UTC
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
hacs:
|
||||
name: HACS validation
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: hacs/action@main
|
||||
with:
|
||||
category: integration
|
||||
|
||||
hassfest:
|
||||
name: Hassfest
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: home-assistant/actions/hassfest@master
|
||||
@ -82,4 +82,4 @@ First release. Working library + Home Assistant custom component, validated end-
|
||||
- **PyPI publish**: `omni-pca` not yet on PyPI; HA `manifest.json` requirements line will only resolve once it is. For now users either install the wheel manually or pip-install from a Git URL.
|
||||
- **HACS submission**: pending live-panel validation.
|
||||
|
||||
[2026.5.10]: https://git.supported.systems/warehack.ing/omni-pca/releases/tag/v2026.5.10
|
||||
[2026.5.10]: https://github.com/rsp2k/omni-pca/releases/tag/v2026.5.10
|
||||
|
||||
16
README.md
@ -4,7 +4,7 @@ Async Python client for HAI/Leviton Omni-Link II home automation panels — Omni
|
||||
|
||||
Includes a Home Assistant custom component (`custom_components/omni_pca/`).
|
||||
|
||||
**Project home:** <https://git.supported.systems/warehack.ing/omni-pca>
|
||||
**Project home:** <https://github.com/rsp2k/omni-pca>
|
||||
**Documentation:** <https://hai-omni-pro-ii.warehack.ing/>
|
||||
|
||||
## Status
|
||||
@ -18,20 +18,14 @@ The full byte-level protocol spec lives at <https://hai-omni-pro-ii.warehack.ing
|
||||
|
||||
## Install
|
||||
|
||||
The library isn't on PyPI yet (pending), so install directly from the Gitea release:
|
||||
|
||||
```bash
|
||||
# Pinned to a specific release (recommended)
|
||||
pip install "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
|
||||
|
||||
# Or the wheel from the release page
|
||||
pip install https://git.supported.systems/warehack.ing/omni-pca/releases/download/v2026.5.10/omni_pca-2026.5.10-py3-none-any.whl
|
||||
pip install omni-pca
|
||||
|
||||
# Or with uv
|
||||
uv add "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
|
||||
uv add omni-pca
|
||||
```
|
||||
|
||||
Once published to PyPI, the canonical install will be `pip install omni-pca`.
|
||||
For Home Assistant users, install the integration through HACS — see the [HA install how-to](https://hai-omni-pro-ii.warehack.ing/how-to/install-in-home-assistant/).
|
||||
|
||||
## Quick start (library)
|
||||
|
||||
@ -79,7 +73,7 @@ The HA integration picks the right client automatically based on the **Transport
|
||||
cd /path/to/your/homeassistant/config/
|
||||
mkdir -p custom_components
|
||||
cd custom_components
|
||||
git clone https://git.supported.systems/warehack.ing/omni-pca tmp-omni
|
||||
git clone https://github.com/rsp2k/omni-pca tmp-omni
|
||||
cp -r tmp-omni/custom_components/omni_pca .
|
||||
rm -rf tmp-omni
|
||||
```
|
||||
|
||||
@ -6,16 +6,18 @@ opens an encrypted session straight to the panel and listens for unsolicited
|
||||
push messages.
|
||||
|
||||
This integration is the HA-facing wrapper around the
|
||||
[`omni-pca`](https://git.supported.systems/warehack.ing/omni-pca) Python library; the library
|
||||
[`omni-pca`](https://github.com/rsp2k/omni-pca) Python library; the library
|
||||
handles the wire protocol, this component surfaces it as HA entities.
|
||||
|
||||
## Install
|
||||
|
||||
### HACS (recommended once published)
|
||||
### HACS
|
||||
|
||||
1. HACS → Integrations → custom repository → add
|
||||
`https://git.supported.systems/warehack.ing/omni-pca`, category **Integration**.
|
||||
2. Install **HAI / Leviton Omni Panel**, then restart Home Assistant.
|
||||
1. HACS → Integrations → search **HAI / Leviton Omni Panel**.
|
||||
2. Install, then restart Home Assistant.
|
||||
|
||||
(If not yet in the HACS default catalog: HACS → Integrations → custom
|
||||
repository → add `https://github.com/rsp2k/omni-pca`, category **Integration**.)
|
||||
|
||||
### Manual
|
||||
|
||||
@ -121,6 +123,6 @@ hashed) — useful for bug reports.
|
||||
- **No entities for X**: only objects with a name configured on the panel
|
||||
are discovered. PC Access's "Names" page is where they live.
|
||||
|
||||
See the [parent README](https://git.supported.systems/warehack.ing/omni-pca) for protocol /
|
||||
See the [parent README](https://github.com/rsp2k/omni-pca) for protocol /
|
||||
library details. Detailed reverse-engineering notes are in
|
||||
[`docs/JOURNEY.md`](https://git.supported.systems/warehack.ing/omni-pca/blob/main/docs/JOURNEY.md).
|
||||
[`docs/JOURNEY.md`](https://github.com/rsp2k/omni-pca/blob/main/docs/JOURNEY.md).
|
||||
|
||||
@ -7,7 +7,7 @@
|
||||
"dependencies": [],
|
||||
"codeowners": ["@rsp2k"],
|
||||
"requirements": ["omni-pca==2026.5.11"],
|
||||
"documentation": "https://git.supported.systems/warehack.ing/omni-pca",
|
||||
"issue_tracker": "https://git.supported.systems/warehack.ing/omni-pca/issues",
|
||||
"documentation": "https://github.com/rsp2k/omni-pca",
|
||||
"issue_tracker": "https://github.com/rsp2k/omni-pca/issues",
|
||||
"integration_type": "hub"
|
||||
}
|
||||
|
||||
|
Before Width: | Height: | Size: 107 KiB After Width: | Height: | Size: 108 KiB |
|
Before Width: | Height: | Size: 130 KiB After Width: | Height: | Size: 128 KiB |
|
Before Width: | Height: | Size: 106 KiB After Width: | Height: | Size: 107 KiB |
|
Before Width: | Height: | Size: 220 KiB After Width: | Height: | Size: 221 KiB |
|
Before Width: | Height: | Size: 85 KiB After Width: | Height: | Size: 86 KiB |
|
Before Width: | Height: | Size: 189 KiB After Width: | Height: | Size: 190 KiB |
BIN
dev/artifacts/screenshots/2026-05-11/01-overview.png
Normal file
|
After Width: | Height: | Size: 102 KiB |
BIN
dev/artifacts/screenshots/2026-05-11/02-integrations-list.png
Normal file
|
After Width: | Height: | Size: 128 KiB |
BIN
dev/artifacts/screenshots/2026-05-11/03-omni-pca-config.png
Normal file
|
After Width: | Height: | Size: 122 KiB |
BIN
dev/artifacts/screenshots/2026-05-11/04-panel-device.png
Normal file
|
After Width: | Height: | Size: 287 KiB |
BIN
dev/artifacts/screenshots/2026-05-11/05-entities-omni.png
Normal file
|
After Width: | Height: | Size: 86 KiB |
BIN
dev/artifacts/screenshots/2026-05-11/06-developer-states.png
Normal file
|
After Width: | Height: | Size: 190 KiB |
@ -34,7 +34,10 @@ engine = ["astral>=2.2,<3"]
|
||||
omni-pca = "omni_pca.__main__:main"
|
||||
|
||||
[project.urls]
|
||||
Repository = "https://git.supported.systems/warehack.ing/omni-pca"
|
||||
Repository = "https://github.com/rsp2k/omni-pca"
|
||||
Issues = "https://github.com/rsp2k/omni-pca/issues"
|
||||
Changelog = "https://github.com/rsp2k/omni-pca/blob/main/CHANGELOG.md"
|
||||
Documentation = "https://hai-omni-pro-ii.warehack.ing/"
|
||||
|
||||
[build-system]
|
||||
requires = ["uv_build>=0.11.8,<0.12.0"]
|
||||
|
||||
@ -48,7 +48,16 @@ from dataclasses import dataclass, field
|
||||
from datetime import date, datetime, time, timedelta, timezone
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from .programs import Days, Program, ProgramType, TimeKind
|
||||
from .programs import (
|
||||
CondArgType,
|
||||
CondOP,
|
||||
Days,
|
||||
MiscConditional,
|
||||
Program,
|
||||
ProgramCond,
|
||||
ProgramType,
|
||||
TimeKind,
|
||||
)
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
@ -605,6 +614,357 @@ def evaluate_conditions(
|
||||
return False
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# State-aware AND/OR evaluator
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _UnsupportedCondition(Exception):
|
||||
"""Raised internally when an AND/OR record encodes a check we don't
|
||||
yet evaluate. The caller (StateEvaluator) treats this as False —
|
||||
"we can't prove this condition", so the chain stays guarded — and
|
||||
logs once so the next semantic-decode pass has a punch list.
|
||||
"""
|
||||
|
||||
|
||||
class StateEvaluator:
|
||||
"""Decode AND/OR records against a :class:`MockState` snapshot.
|
||||
|
||||
Each AND/OR :class:`Program` record encodes either:
|
||||
|
||||
* **Traditional** (``and_op == 0``): a compact bit-packed condition
|
||||
in the ``cond`` u16 selecting a family (ProgramCond: ZONE / CTRL /
|
||||
TIME / SEC / OTHER) with an inline operand. Decoded per
|
||||
``clsText.GetConditionalText`` (clsText.cs:2224-2274).
|
||||
|
||||
* **Structured** (``and_op > 0``): a ``Arg1 OP Arg2`` triple where
|
||||
Arg1 and Arg2 are typed references (Zone / Unit / Thermostat /
|
||||
TimeDate / Constant / …) plus per-type field selectors. The
|
||||
operator is one of :class:`CondOP` (EQ / NE / LT / GT / …).
|
||||
|
||||
Coverage in this initial cut:
|
||||
|
||||
Traditional:
|
||||
* ZONE family — zone secure / not-ready against MockZoneState
|
||||
* CTRL family — unit on / off against MockUnitState
|
||||
* SEC family — area in security-mode against MockAreaState
|
||||
* OTHER family: ``NEVER`` (always false), ``LIGHT``/``DARK``
|
||||
approximated via the engine's sun events when location is set,
|
||||
``AC_POWER_OFF``/``ON`` (no model — return False),
|
||||
``BATTERY_LOW``/``OK`` (no model — return False)
|
||||
* TIME family (time-clock enabled/disabled) — no MockState
|
||||
slot for time-clock toggles, returns False
|
||||
|
||||
Structured:
|
||||
* Zone.CurrentState / ArmingState (== const)
|
||||
* Unit.CurrentState / Level (== / >= / <= const)
|
||||
* Thermostat.CurrentTemp / Humidity / setpoints (numeric compare)
|
||||
* TimeDate.Month / Day / DayOfWeek / Hour / Minute (numeric compare)
|
||||
|
||||
Anything else returns ``False`` and logs at DEBUG once per
|
||||
condition class — keeps a chain *guarded* rather than fired-too-
|
||||
eagerly when we can't yet decode the predicate.
|
||||
|
||||
Time-related comparisons need a :class:`Clock`; pass one to honor
|
||||
Hour/Minute/DayOfWeek predicates correctly. Without a clock those
|
||||
return False.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
state,
|
||||
*,
|
||||
clock: Clock | None = None,
|
||||
location: PanelLocation | None = None,
|
||||
) -> None:
|
||||
self._state = state
|
||||
self._clock = clock
|
||||
self._location = location
|
||||
|
||||
def __call__(self, condition: Program) -> bool:
|
||||
"""Evaluate one AND/OR record against the bound MockState.
|
||||
|
||||
Treated as a plain predicate so callers can pass this instance
|
||||
directly to ``ProgramEngine.set_condition_evaluator``.
|
||||
"""
|
||||
try:
|
||||
if condition.and_op == CondOP.ARG1_TRADITIONAL:
|
||||
return self._eval_traditional(condition)
|
||||
return self._eval_structured(condition)
|
||||
except _UnsupportedCondition as err:
|
||||
_log.debug("evaluator: unsupported condition: %s", err)
|
||||
return False
|
||||
except Exception:
|
||||
_log.exception("evaluator: condition evaluation crashed")
|
||||
return False
|
||||
|
||||
# ---- Traditional ------------------------------------------------------
|
||||
|
||||
def _eval_traditional(self, c: Program) -> bool:
|
||||
"""Decode a Traditional AND record.
|
||||
|
||||
Per ``clsConditionLine.Cond`` (clsConditionLine.cs:17-33), the
|
||||
compact-form cond is split across two AND-record slots:
|
||||
|
||||
* disk byte 1 (= ``and_family``) carries the compact's high byte
|
||||
(the family + selector encoding from GetConditionalText)
|
||||
* disk bytes 3-4 (= ``and_arg1_ix``, ``and_instance`` derived
|
||||
from ``cond2 >> 8``) carry the compact's low byte (the
|
||||
object index, shifted into the high half)
|
||||
|
||||
Family decoding mirrors clsText.GetConditionalText (clsText.cs:
|
||||
2224-2274):
|
||||
|
||||
family & 0xFC == 0x00 → OTHER (low 4 bits = MiscConditional)
|
||||
family & 0xFC == 0x04 → ZONE (bit 0x02 = NOT_READY, else SECURE)
|
||||
family & 0xFC == 0x08 → CTRL (bit 0x02 = ON, else OFF)
|
||||
family & 0xFC == 0x0C → TIME (bit 0x02 = ENABLED, else DIS.)
|
||||
family >= 0x10 → SEC (high nibble = mode, low = area)
|
||||
"""
|
||||
family = c.and_family
|
||||
instance = c.and_instance
|
||||
family_major = family & 0xFC
|
||||
secondary = bool(family & 0x02) # selector bit within the family
|
||||
if family_major == 0x00:
|
||||
return self._eval_other(family & 0x0F)
|
||||
if family_major == ProgramCond.ZONE:
|
||||
return self._eval_traditional_zone(instance, want_not_ready=secondary)
|
||||
if family_major == ProgramCond.CTRL:
|
||||
return self._eval_traditional_ctrl(instance, want_on=secondary)
|
||||
if family_major == ProgramCond.TIME:
|
||||
# Time-clock enabled / disabled — MockState doesn't model
|
||||
# the enable bit, so we conservatively report disabled.
|
||||
return False
|
||||
# 0x10 and above: SEC family — high nibble = mode, low nibble = area.
|
||||
return self._eval_traditional_sec(
|
||||
area=family & 0x0F, mode=(family >> 4) & 0x07,
|
||||
)
|
||||
|
||||
def _eval_traditional_zone(self, zone_num: int, *, want_not_ready: bool) -> bool:
|
||||
"""SECURE matches ``current_state == 0``; NOT_READY matches any
|
||||
nonzero current_state (per the panel's display semantics)."""
|
||||
zone = self._state.zones.get(zone_num)
|
||||
if zone is None:
|
||||
# Undefined zone reads as SECURE (matches real-panel behaviour
|
||||
# when a programmed zone slot doesn't exist).
|
||||
return not want_not_ready
|
||||
if want_not_ready:
|
||||
return zone.current_state != 0
|
||||
return zone.current_state == 0
|
||||
|
||||
def _eval_traditional_ctrl(self, unit_num: int, *, want_on: bool) -> bool:
|
||||
"""ON matches any nonzero ``state``; OFF matches ``state == 0``.
|
||||
|
||||
MockUnitState.state encodes 0=off, 1=on, 100..200=dim level —
|
||||
all nonzero values count as "on" for this predicate, which
|
||||
matches the panel's binary on/off display.
|
||||
"""
|
||||
unit = self._state.units.get(unit_num)
|
||||
if unit is None:
|
||||
return not want_on # missing unit reads as OFF
|
||||
on = unit.state != 0
|
||||
return on == want_on
|
||||
|
||||
def _eval_traditional_sec(self, *, area: int, mode: int) -> bool:
|
||||
"""Area in security-mode N. ``area == 0`` means "any area in
|
||||
this mode" per GetConditionalText:2262 — without a multi-area
|
||||
model we approximate by checking area 1."""
|
||||
if area == 0:
|
||||
area = 1
|
||||
a = self._state.areas.get(area)
|
||||
if a is None:
|
||||
return False
|
||||
return a.mode == mode
|
||||
|
||||
def _eval_other(self, misc_code: int) -> bool:
|
||||
"""OTHER family: low 4 bits = enuMiscConditional."""
|
||||
try:
|
||||
cat = MiscConditional(misc_code)
|
||||
except ValueError:
|
||||
raise _UnsupportedCondition(f"unknown misc condition {misc_code}")
|
||||
if cat == MiscConditional.NONE:
|
||||
return True
|
||||
if cat == MiscConditional.NEVER:
|
||||
return False
|
||||
if cat in (MiscConditional.LIGHT, MiscConditional.DARK):
|
||||
light = self._is_light_outside()
|
||||
if light is None:
|
||||
# Can't determine — be conservative, don't fire either way.
|
||||
return False
|
||||
return light if cat == MiscConditional.LIGHT else not light
|
||||
# PHONE_*, AC_POWER_*, BATTERY_*, ENERGY_COST_* — no MockState
|
||||
# model for any of these yet. Conservatively False.
|
||||
return False
|
||||
|
||||
def _is_light_outside(self) -> bool | None:
|
||||
"""Approximate "is the sun up" against the engine's PanelLocation
|
||||
+ clock. Returns None when clock or location is missing
|
||||
(caller decides what to do with the indeterminate result)."""
|
||||
if self._clock is None or self._location is None:
|
||||
return None
|
||||
try:
|
||||
now = self._clock.now()
|
||||
sunrise, sunset = _sun_events(self._location, now.date())
|
||||
except Exception:
|
||||
return None
|
||||
return sunrise <= now <= sunset
|
||||
|
||||
# ---- Structured -------------------------------------------------------
|
||||
|
||||
def _eval_structured(self, c: Program) -> bool:
|
||||
"""Evaluate ``Arg1 OP Arg2`` style conditions.
|
||||
|
||||
Resolves Arg1 and Arg2 to numeric values via :meth:`_resolve_arg`
|
||||
then compares with the operator. Comparison operators are
|
||||
straightforward integer math; AND/OR/XOR at this layer are
|
||||
treated as bitwise reductions on the resolved values (matches
|
||||
the C# operator semantics for those uncommon op codes).
|
||||
"""
|
||||
op = c.and_op
|
||||
arg1 = self._resolve_arg(
|
||||
c.and_arg1_argtype, c.and_arg1_ix, c.and_arg1_field,
|
||||
)
|
||||
arg2 = self._resolve_arg(
|
||||
c.and_arg2_argtype, c.and_arg2_ix, c.and_arg2_field,
|
||||
)
|
||||
if arg1 is None or arg2 is None:
|
||||
return False
|
||||
if op == CondOP.ARG1_EQ_ARG2:
|
||||
return arg1 == arg2
|
||||
if op == CondOP.ARG1_NE_ARG2:
|
||||
return arg1 != arg2
|
||||
if op == CondOP.ARG1_LT_ARG2:
|
||||
return arg1 < arg2
|
||||
if op == CondOP.ARG1_GT_ARG2:
|
||||
return arg1 > arg2
|
||||
if op == CondOP.ARG1_ODD:
|
||||
return (arg1 & 1) == 1
|
||||
if op == CondOP.ARG1_EVEN:
|
||||
return (arg1 & 1) == 0
|
||||
# MULTIPLE / IN / NOT_IN are bitfield checks the panel uses for
|
||||
# day-of-week and area-set tests. arg2 is the bitmask.
|
||||
if op == CondOP.ARG1_MULTIPLE_ARG2:
|
||||
return arg2 != 0 and (arg1 % arg2) == 0
|
||||
if op == CondOP.ARG1_IN_ARG2:
|
||||
return bool(arg1 & arg2)
|
||||
if op == CondOP.ARG1_NOT_IN_ARG2:
|
||||
return not bool(arg1 & arg2)
|
||||
raise _UnsupportedCondition(f"unknown structured op {op}")
|
||||
|
||||
def _resolve_arg(
|
||||
self, argtype: int, ix: int, field: int,
|
||||
) -> int | None:
|
||||
"""Return the numeric value of one Arg side, or None if it can't
|
||||
be resolved (unknown type, missing object, missing clock).
|
||||
"""
|
||||
if argtype == CondArgType.CONSTANT:
|
||||
return ix
|
||||
if argtype == CondArgType.ZONE:
|
||||
return self._resolve_zone_field(ix, field)
|
||||
if argtype == CondArgType.UNIT:
|
||||
return self._resolve_unit_field(ix, field)
|
||||
if argtype == CondArgType.THERMOSTAT:
|
||||
return self._resolve_thermostat_field(ix, field)
|
||||
if argtype == CondArgType.AREA:
|
||||
return self._resolve_area_field(ix, field)
|
||||
if argtype == CondArgType.TIME_DATE:
|
||||
return self._resolve_timedate_field(field)
|
||||
# USER_SETTING, AUXILLARY, AUDIO, ACCESS_CONTROL, MESSAGE,
|
||||
# SYSTEM — no MockState models for these. Treat as unresolved
|
||||
# so the comparison returns False.
|
||||
raise _UnsupportedCondition(f"unsupported argtype {argtype}")
|
||||
|
||||
def _resolve_zone_field(self, ix: int, field: int) -> int | None:
|
||||
zone = self._state.zones.get(ix)
|
||||
if zone is None:
|
||||
return None
|
||||
# enuZoneField: LoopReading=1, CurrentState=2, ArmingState=3, AlarmState=4
|
||||
if field == 1:
|
||||
return zone.loop
|
||||
if field == 2:
|
||||
return zone.current_state
|
||||
if field == 3:
|
||||
return zone.arming_state
|
||||
if field == 4:
|
||||
return zone.latched_state
|
||||
raise _UnsupportedCondition(f"zone field {field}")
|
||||
|
||||
def _resolve_unit_field(self, ix: int, field: int) -> int | None:
|
||||
unit = self._state.units.get(ix)
|
||||
if unit is None:
|
||||
return None
|
||||
# enuUnitField: CurrentState=1, PreviousState=2, Timer=3, Level=4
|
||||
if field == 1:
|
||||
# 0 = off, 1 = on, 100..200 = dim. The panel treats anything
|
||||
# non-zero as "on" at this granularity.
|
||||
return 1 if unit.state != 0 else 0
|
||||
if field == 3:
|
||||
return unit.time_remaining
|
||||
if field == 4:
|
||||
# Level: panel returns 0..100% — derive from state byte.
|
||||
if unit.state >= 100:
|
||||
return unit.state - 100
|
||||
return 100 if unit.state == 1 else 0
|
||||
raise _UnsupportedCondition(f"unit field {field}")
|
||||
|
||||
def _resolve_thermostat_field(self, ix: int, field: int) -> int | None:
|
||||
t = self._state.thermostats.get(ix)
|
||||
if t is None:
|
||||
return None
|
||||
# enuThermostatField map (subset MockState has data for):
|
||||
# CurrentTemp=1, HeatSetpt=2, CoolSetpt=3, SystemMode=4, FanMode=5,
|
||||
# HoldMode=6, Humidity=9, HumidifySetpoint=10, DehumidifySetpoint=11,
|
||||
# OutdoorTemperature=12
|
||||
return {
|
||||
1: t.temperature_raw,
|
||||
2: t.heat_setpoint_raw,
|
||||
3: t.cool_setpoint_raw,
|
||||
4: t.system_mode,
|
||||
5: t.fan_mode,
|
||||
6: t.hold_mode,
|
||||
9: t.humidity_raw,
|
||||
10: t.humidify_setpoint_raw,
|
||||
11: t.dehumidify_setpoint_raw,
|
||||
12: t.outdoor_temperature_raw,
|
||||
}.get(field, None)
|
||||
|
||||
def _resolve_area_field(self, ix: int, field: int) -> int | None:
|
||||
area = self._state.areas.get(ix)
|
||||
if area is None:
|
||||
return None
|
||||
# No enuAreaField source — be permissive: field 1 = mode.
|
||||
if field == 1:
|
||||
return area.mode
|
||||
raise _UnsupportedCondition(f"area field {field}")
|
||||
|
||||
def _resolve_timedate_field(self, field: int) -> int | None:
|
||||
if self._clock is None:
|
||||
return None
|
||||
now = self._clock.now()
|
||||
# enuTimeDateField: Date=1, Year=2, Month=3, Day=4, DayOfWeek=5,
|
||||
# Time=6, DST_Flag=7, Hour=8, Minute=9, SunriseSunset=10.
|
||||
if field == 2:
|
||||
return now.year
|
||||
if field == 3:
|
||||
return now.month
|
||||
if field == 4:
|
||||
return now.day
|
||||
if field == 5:
|
||||
# DayOfWeek: panel uses 1=Mon..7=Sun per clsHAC. Python
|
||||
# weekday() returns Mon=0..Sun=6 — add 1.
|
||||
return now.weekday() + 1
|
||||
if field == 6:
|
||||
# Time-of-day encoded as (hour * 60 + minute) — minutes since
|
||||
# midnight. Matches the C# packing in GetComplexConditionText.
|
||||
return now.hour * 60 + now.minute
|
||||
if field == 8:
|
||||
return now.hour
|
||||
if field == 9:
|
||||
return now.minute
|
||||
# Date / DST_Flag / SunriseSunset — not modelled here.
|
||||
raise _UnsupportedCondition(f"timedate field {field}")
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Engine
|
||||
# --------------------------------------------------------------------------
|
||||
@ -687,10 +1047,27 @@ class ProgramEngine:
|
||||
bool. The default returns True for every AND, False for every
|
||||
OR (a degenerate evaluator that means "all chains' first AND
|
||||
groups always pass" — useful as a smoke-test default, not for
|
||||
real automation). Real callers supply a state-aware evaluator.
|
||||
real automation).
|
||||
|
||||
For real automation, call :meth:`use_state_evaluator` instead
|
||||
(or build your own :class:`StateEvaluator` and pass it here).
|
||||
"""
|
||||
self._condition_evaluator = fn
|
||||
|
||||
def use_state_evaluator(self) -> "StateEvaluator":
|
||||
"""Install a :class:`StateEvaluator` bound to this engine's
|
||||
``MockState``, clock, and location. Returns the new evaluator
|
||||
so the caller can introspect it.
|
||||
|
||||
Equivalent to ``engine.set_condition_evaluator(
|
||||
StateEvaluator(panel.state, clock=clock, location=loc))``.
|
||||
"""
|
||||
evaluator = StateEvaluator(
|
||||
self._panel.state, clock=self._clock, location=self._location,
|
||||
)
|
||||
self._condition_evaluator = evaluator
|
||||
return evaluator
|
||||
|
||||
@staticmethod
|
||||
def _default_condition_evaluator(condition: Program) -> bool:
|
||||
"""Stub evaluator — caller should override via set_condition_evaluator."""
|
||||
|
||||
@ -19,7 +19,13 @@ from omni_pca.program_engine import (
|
||||
RealClock,
|
||||
classify,
|
||||
)
|
||||
from omni_pca.programs import Days, Program, ProgramType
|
||||
from omni_pca.programs import (
|
||||
CondArgType,
|
||||
CondOP,
|
||||
Days,
|
||||
Program,
|
||||
ProgramType,
|
||||
)
|
||||
|
||||
CONTROLLER_KEY = bytes(range(16))
|
||||
|
||||
@ -1017,3 +1023,332 @@ async def test_engine_every_chain_fires_on_interval() -> None:
|
||||
await clock.advance_to(t0 + timedelta(seconds=60 * tick + 1))
|
||||
await asyncio.sleep(0)
|
||||
assert engine.metrics.clausal_fired == 3
|
||||
|
||||
|
||||
# ---- Phase 6: detailed AND/OR semantics (StateEvaluator) ----------------
|
||||
|
||||
|
||||
from omni_pca.mock_panel import ( # noqa: E402
|
||||
MockAreaState,
|
||||
MockThermostatState,
|
||||
MockUnitState,
|
||||
MockZoneState,
|
||||
)
|
||||
from omni_pca.program_engine import StateEvaluator # noqa: E402
|
||||
|
||||
|
||||
def _and_traditional(
|
||||
slot: int, family: int, instance: int = 0,
|
||||
) -> Program:
|
||||
"""Build a Traditional (OP=0) AND record.
|
||||
|
||||
Per clsConditionLine.Cond synthesis (clsConditionLine.cs:17-33),
|
||||
the family byte lives in disk byte 1 = ``and_family`` (Python's
|
||||
``cond & 0xFF``), and the object instance lives in disk byte 3
|
||||
= ``and_instance`` (Python's ``(cond2 >> 8) & 0xFF``).
|
||||
"""
|
||||
return Program(
|
||||
slot=slot, prog_type=int(ProgramType.AND),
|
||||
cond=family & 0xFF, # byte 1 = family; byte 2 (OP) = 0
|
||||
cond2=(instance & 0xFF) << 8, # byte 3 = instance; byte 4 = 0
|
||||
)
|
||||
|
||||
|
||||
def _and_structured(
|
||||
slot: int,
|
||||
op: int,
|
||||
arg1_type: int, arg1_ix: int, arg1_field: int,
|
||||
arg2_type: int, arg2_ix: int, arg2_field: int = 0,
|
||||
) -> Program:
|
||||
"""Build a Structured (OP>0) AND record.
|
||||
|
||||
Field layout per programs.py decoders:
|
||||
cond high byte (>>8 & 0xFF) = op (and_op)
|
||||
cond low byte (& 0xFF) = arg1_argtype (and_arg1_argtype)
|
||||
cond2 = arg1_ix (and_arg1_ix)
|
||||
cmd = arg1_field (and_arg1_field)
|
||||
par = arg2_argtype (and_arg2_argtype)
|
||||
pr2 = arg2_ix (and_arg2_ix)
|
||||
month = arg2_field (and_arg2_field)
|
||||
"""
|
||||
return Program(
|
||||
slot=slot, prog_type=int(ProgramType.AND),
|
||||
cond=(op << 8) | arg1_type,
|
||||
cond2=arg1_ix,
|
||||
cmd=arg1_field,
|
||||
par=arg2_type,
|
||||
pr2=arg2_ix,
|
||||
month=arg2_field,
|
||||
)
|
||||
|
||||
|
||||
def _state_with(**kwargs) -> MockState:
|
||||
return MockState(**kwargs)
|
||||
|
||||
|
||||
# ---- Traditional ZONE family -------------------------------------------
|
||||
|
||||
|
||||
def test_state_evaluator_zone_secure_passes_when_state_zero() -> None:
|
||||
state = _state_with(zones={
|
||||
7: MockZoneState(name="FRONT DOOR", current_state=0),
|
||||
})
|
||||
ev = StateEvaluator(state)
|
||||
# ZONE family = 0x04 (secure variant); instance = 7
|
||||
cond = _and_traditional(1, family=0x04, instance=7)
|
||||
assert ev(cond) is True
|
||||
|
||||
|
||||
def test_state_evaluator_zone_secure_fails_when_tripped() -> None:
|
||||
state = _state_with(zones={
|
||||
7: MockZoneState(name="FRONT DOOR", current_state=1), # not-ready
|
||||
})
|
||||
ev = StateEvaluator(state)
|
||||
cond = _and_traditional(1, family=0x04, instance=7)
|
||||
assert ev(cond) is False
|
||||
|
||||
|
||||
def test_state_evaluator_zone_not_ready_passes_when_tripped() -> None:
|
||||
state = _state_with(zones={
|
||||
7: MockZoneState(name="FRONT DOOR", current_state=1),
|
||||
})
|
||||
ev = StateEvaluator(state)
|
||||
# family 0x06 = ZONE + NOT_READY selector bit
|
||||
cond = _and_traditional(1, family=0x06, instance=7)
|
||||
assert ev(cond) is True
|
||||
|
||||
|
||||
def test_state_evaluator_zone_undefined_is_secure() -> None:
|
||||
"""Undefined zone reads as SECURE — matches real-panel behaviour
|
||||
when a programmed zone slot doesn't exist."""
|
||||
ev = StateEvaluator(_state_with()) # no zones
|
||||
secure_check = _and_traditional(1, family=0x04, instance=99)
|
||||
not_ready_check = _and_traditional(2, family=0x06, instance=99)
|
||||
assert ev(secure_check) is True
|
||||
assert ev(not_ready_check) is False
|
||||
|
||||
|
||||
# ---- Traditional CTRL family --------------------------------------------
|
||||
|
||||
|
||||
def test_state_evaluator_unit_on_passes_when_state_one() -> None:
|
||||
state = _state_with(units={5: MockUnitState(name="LAMP", state=1)})
|
||||
ev = StateEvaluator(state)
|
||||
# CTRL family + ON selector = 0x0A
|
||||
cond = _and_traditional(1, family=0x0A, instance=5)
|
||||
assert ev(cond) is True
|
||||
|
||||
|
||||
def test_state_evaluator_unit_off_passes_when_state_zero() -> None:
|
||||
state = _state_with(units={5: MockUnitState(name="LAMP", state=0)})
|
||||
ev = StateEvaluator(state)
|
||||
# CTRL family + OFF selector = 0x08
|
||||
cond = _and_traditional(1, family=0x08, instance=5)
|
||||
assert ev(cond) is True
|
||||
|
||||
|
||||
def test_state_evaluator_unit_on_for_dimmed_unit() -> None:
|
||||
"""Dim level 50% → state=150; ON predicate should pass."""
|
||||
state = _state_with(units={5: MockUnitState(state=150)})
|
||||
ev = StateEvaluator(state)
|
||||
cond = _and_traditional(1, family=0x0A, instance=5)
|
||||
assert ev(cond) is True
|
||||
|
||||
|
||||
# ---- Traditional SEC family ---------------------------------------------
|
||||
|
||||
|
||||
def test_state_evaluator_security_mode_match() -> None:
|
||||
"""SEC family: family byte = (mode << 4) | area. Area 1 in mode 2."""
|
||||
state = _state_with(areas={1: MockAreaState(name="MAIN", mode=2)})
|
||||
ev = StateEvaluator(state)
|
||||
cond = _and_traditional(1, family=(2 << 4) | 1) # mode 2, area 1 = 0x21
|
||||
assert ev(cond) is True
|
||||
# Area in different mode → fails.
|
||||
cond_wrong = _and_traditional(1, family=(3 << 4) | 1) # mode 3
|
||||
assert ev(cond_wrong) is False
|
||||
|
||||
|
||||
# ---- Traditional OTHER family -------------------------------------------
|
||||
|
||||
|
||||
def test_state_evaluator_never_is_always_false() -> None:
|
||||
"""MiscConditional.NEVER (= 1) → always False, regardless of state."""
|
||||
ev = StateEvaluator(_state_with())
|
||||
# OTHER family + NEVER misc = 0x01
|
||||
cond = _and_traditional(1, family=0x01)
|
||||
assert ev(cond) is False
|
||||
|
||||
|
||||
def test_state_evaluator_dark_without_location_is_false() -> None:
|
||||
ev = StateEvaluator(_state_with()) # no location
|
||||
# OTHER family + DARK misc = 0x03
|
||||
cond = _and_traditional(1, family=0x03)
|
||||
assert ev(cond) is False
|
||||
|
||||
|
||||
# ---- Structured: Zone fields --------------------------------------------
|
||||
|
||||
|
||||
def test_state_evaluator_structured_zone_current_state_eq() -> None:
|
||||
"""Zone 5 CurrentState == 1 (not-ready) — structured form."""
|
||||
state = _state_with(zones={5: MockZoneState(current_state=1)})
|
||||
ev = StateEvaluator(state)
|
||||
# EQ, Arg1=ZONE.CurrentState(2), Arg1IX=5, Arg2=CONSTANT, Arg2IX=1
|
||||
cond = _and_structured(
|
||||
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
|
||||
arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2,
|
||||
arg2_type=int(CondArgType.CONSTANT), arg2_ix=1,
|
||||
)
|
||||
assert ev(cond) is True
|
||||
|
||||
|
||||
def test_state_evaluator_structured_zone_state_ne() -> None:
|
||||
state = _state_with(zones={5: MockZoneState(current_state=0)})
|
||||
ev = StateEvaluator(state)
|
||||
cond = _and_structured(
|
||||
slot=1, op=int(CondOP.ARG1_NE_ARG2),
|
||||
arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2,
|
||||
arg2_type=int(CondArgType.CONSTANT), arg2_ix=0,
|
||||
)
|
||||
assert ev(cond) is False # state IS 0
|
||||
|
||||
|
||||
# ---- Structured: Thermostat fields --------------------------------------
|
||||
|
||||
|
||||
def test_state_evaluator_structured_thermostat_temp_gt() -> None:
|
||||
"""TEMPERATURE > 75 — structured comparison."""
|
||||
# Thermostat raw temperature 168 ~ 76°F on the Omni linear scale,
|
||||
# but we compare raw bytes here. Use a raw temp clearly above the
|
||||
# constant.
|
||||
state = _state_with(thermostats={
|
||||
1: MockThermostatState(temperature_raw=80),
|
||||
})
|
||||
ev = StateEvaluator(state)
|
||||
cond = _and_structured(
|
||||
slot=1, op=int(CondOP.ARG1_GT_ARG2),
|
||||
arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1,
|
||||
arg2_type=int(CondArgType.CONSTANT), arg2_ix=75,
|
||||
)
|
||||
assert ev(cond) is True
|
||||
# And < should fail.
|
||||
cond_lt = _and_structured(
|
||||
slot=2, op=int(CondOP.ARG1_LT_ARG2),
|
||||
arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1,
|
||||
arg2_type=int(CondArgType.CONSTANT), arg2_ix=75,
|
||||
)
|
||||
assert ev(cond_lt) is False
|
||||
|
||||
|
||||
# ---- Structured: TimeDate fields ----------------------------------------
|
||||
|
||||
|
||||
def test_state_evaluator_structured_timedate_hour_compare() -> None:
|
||||
"""Current hour == 22 — uses the clock."""
|
||||
clock = FakeClock(datetime(2026, 5, 14, 22, 30, tzinfo=timezone.utc))
|
||||
ev = StateEvaluator(_state_with(), clock=clock)
|
||||
cond = _and_structured(
|
||||
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
|
||||
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8, # Hour
|
||||
arg2_type=int(CondArgType.CONSTANT), arg2_ix=22,
|
||||
)
|
||||
assert ev(cond) is True
|
||||
|
||||
|
||||
def test_state_evaluator_structured_timedate_dayofweek() -> None:
|
||||
"""DayOfWeek == 4 (Thursday)."""
|
||||
clock = FakeClock(datetime(2026, 5, 14, 12, 0, tzinfo=timezone.utc)) # Thu
|
||||
ev = StateEvaluator(_state_with(), clock=clock)
|
||||
cond = _and_structured(
|
||||
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
|
||||
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=5, # DOW
|
||||
arg2_type=int(CondArgType.CONSTANT), arg2_ix=4,
|
||||
)
|
||||
assert ev(cond) is True
|
||||
|
||||
|
||||
def test_state_evaluator_structured_timedate_without_clock_is_false() -> None:
|
||||
"""No clock → TimeDate predicates resolve to None → comparison False."""
|
||||
ev = StateEvaluator(_state_with()) # no clock
|
||||
cond = _and_structured(
|
||||
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
|
||||
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8,
|
||||
arg2_type=int(CondArgType.CONSTANT), arg2_ix=22,
|
||||
)
|
||||
assert ev(cond) is False
|
||||
|
||||
|
||||
# ---- Engine integration --------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_engine_use_state_evaluator_gates_real_conditions() -> None:
|
||||
"""End-to-end: WHEN + AND IF UNIT 3 ON + THEN UNIT 9 ON.
|
||||
Chain fires only when unit 3 is on at the time the event arrives."""
|
||||
button_evt = event_id_user_macro_button(5)
|
||||
# WHEN button 5; AND IF unit 3 ON; THEN unit 9 ON.
|
||||
when = _when(1, button_evt)
|
||||
and_cond = _and_traditional(2, family=0x0A, instance=3) # CTRL + ON, unit 3
|
||||
then = _then_action(3, int(Command.UNIT_ON), 9)
|
||||
|
||||
# Start with unit 3 OFF.
|
||||
panel = MockPanel(
|
||||
controller_key=CONTROLLER_KEY,
|
||||
state=MockState(
|
||||
programs={
|
||||
1: when.encode_wire_bytes(),
|
||||
2: and_cond.encode_wire_bytes(),
|
||||
3: then.encode_wire_bytes(),
|
||||
},
|
||||
units={3: MockUnitState(state=0)},
|
||||
),
|
||||
)
|
||||
async with ProgramEngine(panel, clock=FakeClock(
|
||||
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
|
||||
)) as engine:
|
||||
engine.use_state_evaluator()
|
||||
# Unit 3 is OFF — chain blocked.
|
||||
fired = await engine.emit_user_macro_button(5)
|
||||
assert fired == 0
|
||||
assert 9 not in panel.state.units or panel.state.units[9].state == 0
|
||||
|
||||
# Turn unit 3 ON, re-emit — chain fires.
|
||||
panel.state.units[3].state = 1
|
||||
fired = await engine.emit_user_macro_button(5)
|
||||
assert fired == 1
|
||||
assert panel.state.units[9].state == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_engine_state_evaluator_or_alternative() -> None:
|
||||
"""WHEN + AND IF zone 5 secure + OR + AND IF zone 6 secure + THEN.
|
||||
Fires if either zone is secure."""
|
||||
button_evt = event_id_user_macro_button(5)
|
||||
when = _when(1, button_evt)
|
||||
and_z5 = _and_traditional(2, family=0x04, instance=5) # ZONE 5 secure
|
||||
or_break = _or_cond(3) # OR boundary
|
||||
and_z6 = _and_traditional(4, family=0x04, instance=6) # ZONE 6 secure
|
||||
then = _then_action(5, int(Command.UNIT_ON), 10)
|
||||
|
||||
# Zone 5 tripped, Zone 6 secure → group A fails, group B passes.
|
||||
panel = MockPanel(
|
||||
controller_key=CONTROLLER_KEY,
|
||||
state=MockState(
|
||||
programs={
|
||||
p.slot: p.encode_wire_bytes()
|
||||
for p in (when, and_z5, or_break, and_z6, then)
|
||||
},
|
||||
zones={
|
||||
5: MockZoneState(current_state=1),
|
||||
6: MockZoneState(current_state=0),
|
||||
},
|
||||
),
|
||||
)
|
||||
async with ProgramEngine(panel, clock=FakeClock(
|
||||
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
|
||||
)) as engine:
|
||||
engine.use_state_evaluator()
|
||||
fired = await engine.emit_user_macro_button(5)
|
||||
assert fired == 1 # OR-alternative passed
|
||||
assert panel.state.units[10].state == 1
|
||||
|
||||