Compare commits

..

No commits in common. "d4c4e530f60c6fee60cb5be71c263971d7e507df" and "cc32081caf65f6fc33cea0f29e18d589afb0bafb" have entirely different histories.

20 changed files with 25 additions and 762 deletions

View File

@ -1,26 +0,0 @@
name: Validate
on:
push:
branches: [main]
pull_request:
schedule:
- cron: "0 4 * * 1" # weekly Monday 04:00 UTC
workflow_dispatch:
jobs:
hacs:
name: HACS validation
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: hacs/action@main
with:
category: integration
hassfest:
name: Hassfest
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: home-assistant/actions/hassfest@master

View File

@ -82,4 +82,4 @@ First release. Working library + Home Assistant custom component, validated end-
- **PyPI publish**: `omni-pca` not yet on PyPI; HA `manifest.json` requirements line will only resolve once it is. For now users either install the wheel manually or pip-install from a Git URL.
- **HACS submission**: pending live-panel validation.
[2026.5.10]: https://github.com/rsp2k/omni-pca/releases/tag/v2026.5.10
[2026.5.10]: https://git.supported.systems/warehack.ing/omni-pca/releases/tag/v2026.5.10

View File

@ -4,7 +4,7 @@ Async Python client for HAI/Leviton Omni-Link II home automation panels — Omni
Includes a Home Assistant custom component (`custom_components/omni_pca/`).
**Project home:** <https://github.com/rsp2k/omni-pca>
**Project home:** <https://git.supported.systems/warehack.ing/omni-pca>
**Documentation:** <https://hai-omni-pro-ii.warehack.ing/>
## Status
@ -18,14 +18,20 @@ The full byte-level protocol spec lives at <https://hai-omni-pro-ii.warehack.ing
## Install
The library isn't on PyPI yet (pending), so install directly from the Gitea release:
```bash
pip install omni-pca
# Pinned to a specific release (recommended)
pip install "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
# Or the wheel from the release page
pip install https://git.supported.systems/warehack.ing/omni-pca/releases/download/v2026.5.10/omni_pca-2026.5.10-py3-none-any.whl
# Or with uv
uv add omni-pca
uv add "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
```
For Home Assistant users, install the integration through HACS — see the [HA install how-to](https://hai-omni-pro-ii.warehack.ing/how-to/install-in-home-assistant/).
Once published to PyPI, the canonical install will be `pip install omni-pca`.
## Quick start (library)
@ -73,7 +79,7 @@ The HA integration picks the right client automatically based on the **Transport
cd /path/to/your/homeassistant/config/
mkdir -p custom_components
cd custom_components
git clone https://github.com/rsp2k/omni-pca tmp-omni
git clone https://git.supported.systems/warehack.ing/omni-pca tmp-omni
cp -r tmp-omni/custom_components/omni_pca .
rm -rf tmp-omni
```

View File

@ -6,18 +6,16 @@ opens an encrypted session straight to the panel and listens for unsolicited
push messages.
This integration is the HA-facing wrapper around the
[`omni-pca`](https://github.com/rsp2k/omni-pca) Python library; the library
[`omni-pca`](https://git.supported.systems/warehack.ing/omni-pca) Python library; the library
handles the wire protocol, this component surfaces it as HA entities.
## Install
### HACS
### HACS (recommended once published)
1. HACS → Integrations → search **HAI / Leviton Omni Panel**.
2. Install, then restart Home Assistant.
(If not yet in the HACS default catalog: HACS → Integrations → custom
repository → add `https://github.com/rsp2k/omni-pca`, category **Integration**.)
1. HACS → Integrations → custom repository → add
`https://git.supported.systems/warehack.ing/omni-pca`, category **Integration**.
2. Install **HAI / Leviton Omni Panel**, then restart Home Assistant.
### Manual
@ -123,6 +121,6 @@ hashed) — useful for bug reports.
- **No entities for X**: only objects with a name configured on the panel
are discovered. PC Access's "Names" page is where they live.
See the [parent README](https://github.com/rsp2k/omni-pca) for protocol /
See the [parent README](https://git.supported.systems/warehack.ing/omni-pca) for protocol /
library details. Detailed reverse-engineering notes are in
[`docs/JOURNEY.md`](https://github.com/rsp2k/omni-pca/blob/main/docs/JOURNEY.md).
[`docs/JOURNEY.md`](https://git.supported.systems/warehack.ing/omni-pca/blob/main/docs/JOURNEY.md).

View File

@ -7,7 +7,7 @@
"dependencies": [],
"codeowners": ["@rsp2k"],
"requirements": ["omni-pca==2026.5.11"],
"documentation": "https://github.com/rsp2k/omni-pca",
"issue_tracker": "https://github.com/rsp2k/omni-pca/issues",
"documentation": "https://git.supported.systems/warehack.ing/omni-pca",
"issue_tracker": "https://git.supported.systems/warehack.ing/omni-pca/issues",
"integration_type": "hub"
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 108 KiB

After

Width:  |  Height:  |  Size: 107 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 128 KiB

After

Width:  |  Height:  |  Size: 130 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 107 KiB

After

Width:  |  Height:  |  Size: 106 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 221 KiB

After

Width:  |  Height:  |  Size: 220 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

After

Width:  |  Height:  |  Size: 85 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 190 KiB

After

Width:  |  Height:  |  Size: 189 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 102 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 128 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 122 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 287 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 190 KiB

View File

@ -34,10 +34,7 @@ engine = ["astral>=2.2,<3"]
omni-pca = "omni_pca.__main__:main"
[project.urls]
Repository = "https://github.com/rsp2k/omni-pca"
Issues = "https://github.com/rsp2k/omni-pca/issues"
Changelog = "https://github.com/rsp2k/omni-pca/blob/main/CHANGELOG.md"
Documentation = "https://hai-omni-pro-ii.warehack.ing/"
Repository = "https://git.supported.systems/warehack.ing/omni-pca"
[build-system]
requires = ["uv_build>=0.11.8,<0.12.0"]

View File

@ -48,16 +48,7 @@ from dataclasses import dataclass, field
from datetime import date, datetime, time, timedelta, timezone
from typing import TYPE_CHECKING
from .programs import (
CondArgType,
CondOP,
Days,
MiscConditional,
Program,
ProgramCond,
ProgramType,
TimeKind,
)
from .programs import Days, Program, ProgramType, TimeKind
_log = logging.getLogger(__name__)
@ -614,357 +605,6 @@ def evaluate_conditions(
return False
# --------------------------------------------------------------------------
# State-aware AND/OR evaluator
# --------------------------------------------------------------------------
class _UnsupportedCondition(Exception):
"""Raised internally when an AND/OR record encodes a check we don't
yet evaluate. The caller (StateEvaluator) treats this as False
"we can't prove this condition", so the chain stays guarded and
logs once so the next semantic-decode pass has a punch list.
"""
class StateEvaluator:
"""Decode AND/OR records against a :class:`MockState` snapshot.
Each AND/OR :class:`Program` record encodes either:
* **Traditional** (``and_op == 0``): a compact bit-packed condition
in the ``cond`` u16 selecting a family (ProgramCond: ZONE / CTRL /
TIME / SEC / OTHER) with an inline operand. Decoded per
``clsText.GetConditionalText`` (clsText.cs:2224-2274).
* **Structured** (``and_op > 0``): a ``Arg1 OP Arg2`` triple where
Arg1 and Arg2 are typed references (Zone / Unit / Thermostat /
TimeDate / Constant / ) plus per-type field selectors. The
operator is one of :class:`CondOP` (EQ / NE / LT / GT / ).
Coverage in this initial cut:
Traditional:
* ZONE family zone secure / not-ready against MockZoneState
* CTRL family unit on / off against MockUnitState
* SEC family area in security-mode against MockAreaState
* OTHER family: ``NEVER`` (always false), ``LIGHT``/``DARK``
approximated via the engine's sun events when location is set,
``AC_POWER_OFF``/``ON`` (no model return False),
``BATTERY_LOW``/``OK`` (no model return False)
* TIME family (time-clock enabled/disabled) no MockState
slot for time-clock toggles, returns False
Structured:
* Zone.CurrentState / ArmingState (== const)
* Unit.CurrentState / Level (== / >= / <= const)
* Thermostat.CurrentTemp / Humidity / setpoints (numeric compare)
* TimeDate.Month / Day / DayOfWeek / Hour / Minute (numeric compare)
Anything else returns ``False`` and logs at DEBUG once per
condition class keeps a chain *guarded* rather than fired-too-
eagerly when we can't yet decode the predicate.
Time-related comparisons need a :class:`Clock`; pass one to honor
Hour/Minute/DayOfWeek predicates correctly. Without a clock those
return False.
"""
def __init__(
self,
state,
*,
clock: Clock | None = None,
location: PanelLocation | None = None,
) -> None:
self._state = state
self._clock = clock
self._location = location
def __call__(self, condition: Program) -> bool:
"""Evaluate one AND/OR record against the bound MockState.
Treated as a plain predicate so callers can pass this instance
directly to ``ProgramEngine.set_condition_evaluator``.
"""
try:
if condition.and_op == CondOP.ARG1_TRADITIONAL:
return self._eval_traditional(condition)
return self._eval_structured(condition)
except _UnsupportedCondition as err:
_log.debug("evaluator: unsupported condition: %s", err)
return False
except Exception:
_log.exception("evaluator: condition evaluation crashed")
return False
# ---- Traditional ------------------------------------------------------
def _eval_traditional(self, c: Program) -> bool:
"""Decode a Traditional AND record.
Per ``clsConditionLine.Cond`` (clsConditionLine.cs:17-33), the
compact-form cond is split across two AND-record slots:
* disk byte 1 (= ``and_family``) carries the compact's high byte
(the family + selector encoding from GetConditionalText)
* disk bytes 3-4 (= ``and_arg1_ix``, ``and_instance`` derived
from ``cond2 >> 8``) carry the compact's low byte (the
object index, shifted into the high half)
Family decoding mirrors clsText.GetConditionalText (clsText.cs:
2224-2274):
family & 0xFC == 0x00 OTHER (low 4 bits = MiscConditional)
family & 0xFC == 0x04 ZONE (bit 0x02 = NOT_READY, else SECURE)
family & 0xFC == 0x08 CTRL (bit 0x02 = ON, else OFF)
family & 0xFC == 0x0C TIME (bit 0x02 = ENABLED, else DIS.)
family >= 0x10 SEC (high nibble = mode, low = area)
"""
family = c.and_family
instance = c.and_instance
family_major = family & 0xFC
secondary = bool(family & 0x02) # selector bit within the family
if family_major == 0x00:
return self._eval_other(family & 0x0F)
if family_major == ProgramCond.ZONE:
return self._eval_traditional_zone(instance, want_not_ready=secondary)
if family_major == ProgramCond.CTRL:
return self._eval_traditional_ctrl(instance, want_on=secondary)
if family_major == ProgramCond.TIME:
# Time-clock enabled / disabled — MockState doesn't model
# the enable bit, so we conservatively report disabled.
return False
# 0x10 and above: SEC family — high nibble = mode, low nibble = area.
return self._eval_traditional_sec(
area=family & 0x0F, mode=(family >> 4) & 0x07,
)
def _eval_traditional_zone(self, zone_num: int, *, want_not_ready: bool) -> bool:
"""SECURE matches ``current_state == 0``; NOT_READY matches any
nonzero current_state (per the panel's display semantics)."""
zone = self._state.zones.get(zone_num)
if zone is None:
# Undefined zone reads as SECURE (matches real-panel behaviour
# when a programmed zone slot doesn't exist).
return not want_not_ready
if want_not_ready:
return zone.current_state != 0
return zone.current_state == 0
def _eval_traditional_ctrl(self, unit_num: int, *, want_on: bool) -> bool:
"""ON matches any nonzero ``state``; OFF matches ``state == 0``.
MockUnitState.state encodes 0=off, 1=on, 100..200=dim level
all nonzero values count as "on" for this predicate, which
matches the panel's binary on/off display.
"""
unit = self._state.units.get(unit_num)
if unit is None:
return not want_on # missing unit reads as OFF
on = unit.state != 0
return on == want_on
def _eval_traditional_sec(self, *, area: int, mode: int) -> bool:
"""Area in security-mode N. ``area == 0`` means "any area in
this mode" per GetConditionalText:2262 — without a multi-area
model we approximate by checking area 1."""
if area == 0:
area = 1
a = self._state.areas.get(area)
if a is None:
return False
return a.mode == mode
def _eval_other(self, misc_code: int) -> bool:
"""OTHER family: low 4 bits = enuMiscConditional."""
try:
cat = MiscConditional(misc_code)
except ValueError:
raise _UnsupportedCondition(f"unknown misc condition {misc_code}")
if cat == MiscConditional.NONE:
return True
if cat == MiscConditional.NEVER:
return False
if cat in (MiscConditional.LIGHT, MiscConditional.DARK):
light = self._is_light_outside()
if light is None:
# Can't determine — be conservative, don't fire either way.
return False
return light if cat == MiscConditional.LIGHT else not light
# PHONE_*, AC_POWER_*, BATTERY_*, ENERGY_COST_* — no MockState
# model for any of these yet. Conservatively False.
return False
def _is_light_outside(self) -> bool | None:
"""Approximate "is the sun up" against the engine's PanelLocation
+ clock. Returns None when clock or location is missing
(caller decides what to do with the indeterminate result)."""
if self._clock is None or self._location is None:
return None
try:
now = self._clock.now()
sunrise, sunset = _sun_events(self._location, now.date())
except Exception:
return None
return sunrise <= now <= sunset
# ---- Structured -------------------------------------------------------
def _eval_structured(self, c: Program) -> bool:
"""Evaluate ``Arg1 OP Arg2`` style conditions.
Resolves Arg1 and Arg2 to numeric values via :meth:`_resolve_arg`
then compares with the operator. Comparison operators are
straightforward integer math; AND/OR/XOR at this layer are
treated as bitwise reductions on the resolved values (matches
the C# operator semantics for those uncommon op codes).
"""
op = c.and_op
arg1 = self._resolve_arg(
c.and_arg1_argtype, c.and_arg1_ix, c.and_arg1_field,
)
arg2 = self._resolve_arg(
c.and_arg2_argtype, c.and_arg2_ix, c.and_arg2_field,
)
if arg1 is None or arg2 is None:
return False
if op == CondOP.ARG1_EQ_ARG2:
return arg1 == arg2
if op == CondOP.ARG1_NE_ARG2:
return arg1 != arg2
if op == CondOP.ARG1_LT_ARG2:
return arg1 < arg2
if op == CondOP.ARG1_GT_ARG2:
return arg1 > arg2
if op == CondOP.ARG1_ODD:
return (arg1 & 1) == 1
if op == CondOP.ARG1_EVEN:
return (arg1 & 1) == 0
# MULTIPLE / IN / NOT_IN are bitfield checks the panel uses for
# day-of-week and area-set tests. arg2 is the bitmask.
if op == CondOP.ARG1_MULTIPLE_ARG2:
return arg2 != 0 and (arg1 % arg2) == 0
if op == CondOP.ARG1_IN_ARG2:
return bool(arg1 & arg2)
if op == CondOP.ARG1_NOT_IN_ARG2:
return not bool(arg1 & arg2)
raise _UnsupportedCondition(f"unknown structured op {op}")
def _resolve_arg(
self, argtype: int, ix: int, field: int,
) -> int | None:
"""Return the numeric value of one Arg side, or None if it can't
be resolved (unknown type, missing object, missing clock).
"""
if argtype == CondArgType.CONSTANT:
return ix
if argtype == CondArgType.ZONE:
return self._resolve_zone_field(ix, field)
if argtype == CondArgType.UNIT:
return self._resolve_unit_field(ix, field)
if argtype == CondArgType.THERMOSTAT:
return self._resolve_thermostat_field(ix, field)
if argtype == CondArgType.AREA:
return self._resolve_area_field(ix, field)
if argtype == CondArgType.TIME_DATE:
return self._resolve_timedate_field(field)
# USER_SETTING, AUXILLARY, AUDIO, ACCESS_CONTROL, MESSAGE,
# SYSTEM — no MockState models for these. Treat as unresolved
# so the comparison returns False.
raise _UnsupportedCondition(f"unsupported argtype {argtype}")
def _resolve_zone_field(self, ix: int, field: int) -> int | None:
zone = self._state.zones.get(ix)
if zone is None:
return None
# enuZoneField: LoopReading=1, CurrentState=2, ArmingState=3, AlarmState=4
if field == 1:
return zone.loop
if field == 2:
return zone.current_state
if field == 3:
return zone.arming_state
if field == 4:
return zone.latched_state
raise _UnsupportedCondition(f"zone field {field}")
def _resolve_unit_field(self, ix: int, field: int) -> int | None:
unit = self._state.units.get(ix)
if unit is None:
return None
# enuUnitField: CurrentState=1, PreviousState=2, Timer=3, Level=4
if field == 1:
# 0 = off, 1 = on, 100..200 = dim. The panel treats anything
# non-zero as "on" at this granularity.
return 1 if unit.state != 0 else 0
if field == 3:
return unit.time_remaining
if field == 4:
# Level: panel returns 0..100% — derive from state byte.
if unit.state >= 100:
return unit.state - 100
return 100 if unit.state == 1 else 0
raise _UnsupportedCondition(f"unit field {field}")
def _resolve_thermostat_field(self, ix: int, field: int) -> int | None:
t = self._state.thermostats.get(ix)
if t is None:
return None
# enuThermostatField map (subset MockState has data for):
# CurrentTemp=1, HeatSetpt=2, CoolSetpt=3, SystemMode=4, FanMode=5,
# HoldMode=6, Humidity=9, HumidifySetpoint=10, DehumidifySetpoint=11,
# OutdoorTemperature=12
return {
1: t.temperature_raw,
2: t.heat_setpoint_raw,
3: t.cool_setpoint_raw,
4: t.system_mode,
5: t.fan_mode,
6: t.hold_mode,
9: t.humidity_raw,
10: t.humidify_setpoint_raw,
11: t.dehumidify_setpoint_raw,
12: t.outdoor_temperature_raw,
}.get(field, None)
def _resolve_area_field(self, ix: int, field: int) -> int | None:
area = self._state.areas.get(ix)
if area is None:
return None
# No enuAreaField source — be permissive: field 1 = mode.
if field == 1:
return area.mode
raise _UnsupportedCondition(f"area field {field}")
def _resolve_timedate_field(self, field: int) -> int | None:
if self._clock is None:
return None
now = self._clock.now()
# enuTimeDateField: Date=1, Year=2, Month=3, Day=4, DayOfWeek=5,
# Time=6, DST_Flag=7, Hour=8, Minute=9, SunriseSunset=10.
if field == 2:
return now.year
if field == 3:
return now.month
if field == 4:
return now.day
if field == 5:
# DayOfWeek: panel uses 1=Mon..7=Sun per clsHAC. Python
# weekday() returns Mon=0..Sun=6 — add 1.
return now.weekday() + 1
if field == 6:
# Time-of-day encoded as (hour * 60 + minute) — minutes since
# midnight. Matches the C# packing in GetComplexConditionText.
return now.hour * 60 + now.minute
if field == 8:
return now.hour
if field == 9:
return now.minute
# Date / DST_Flag / SunriseSunset — not modelled here.
raise _UnsupportedCondition(f"timedate field {field}")
# --------------------------------------------------------------------------
# Engine
# --------------------------------------------------------------------------
@ -1047,27 +687,10 @@ class ProgramEngine:
bool. The default returns True for every AND, False for every
OR (a degenerate evaluator that means "all chains' first AND
groups always pass" — useful as a smoke-test default, not for
real automation).
For real automation, call :meth:`use_state_evaluator` instead
(or build your own :class:`StateEvaluator` and pass it here).
real automation). Real callers supply a state-aware evaluator.
"""
self._condition_evaluator = fn
def use_state_evaluator(self) -> "StateEvaluator":
"""Install a :class:`StateEvaluator` bound to this engine's
``MockState``, clock, and location. Returns the new evaluator
so the caller can introspect it.
Equivalent to ``engine.set_condition_evaluator(
StateEvaluator(panel.state, clock=clock, location=loc))``.
"""
evaluator = StateEvaluator(
self._panel.state, clock=self._clock, location=self._location,
)
self._condition_evaluator = evaluator
return evaluator
@staticmethod
def _default_condition_evaluator(condition: Program) -> bool:
"""Stub evaluator — caller should override via set_condition_evaluator."""

View File

@ -19,13 +19,7 @@ from omni_pca.program_engine import (
RealClock,
classify,
)
from omni_pca.programs import (
CondArgType,
CondOP,
Days,
Program,
ProgramType,
)
from omni_pca.programs import Days, Program, ProgramType
CONTROLLER_KEY = bytes(range(16))
@ -1023,332 +1017,3 @@ async def test_engine_every_chain_fires_on_interval() -> None:
await clock.advance_to(t0 + timedelta(seconds=60 * tick + 1))
await asyncio.sleep(0)
assert engine.metrics.clausal_fired == 3
# ---- Phase 6: detailed AND/OR semantics (StateEvaluator) ----------------
from omni_pca.mock_panel import ( # noqa: E402
MockAreaState,
MockThermostatState,
MockUnitState,
MockZoneState,
)
from omni_pca.program_engine import StateEvaluator # noqa: E402
def _and_traditional(
slot: int, family: int, instance: int = 0,
) -> Program:
"""Build a Traditional (OP=0) AND record.
Per clsConditionLine.Cond synthesis (clsConditionLine.cs:17-33),
the family byte lives in disk byte 1 = ``and_family`` (Python's
``cond & 0xFF``), and the object instance lives in disk byte 3
= ``and_instance`` (Python's ``(cond2 >> 8) & 0xFF``).
"""
return Program(
slot=slot, prog_type=int(ProgramType.AND),
cond=family & 0xFF, # byte 1 = family; byte 2 (OP) = 0
cond2=(instance & 0xFF) << 8, # byte 3 = instance; byte 4 = 0
)
def _and_structured(
slot: int,
op: int,
arg1_type: int, arg1_ix: int, arg1_field: int,
arg2_type: int, arg2_ix: int, arg2_field: int = 0,
) -> Program:
"""Build a Structured (OP>0) AND record.
Field layout per programs.py decoders:
cond high byte (>>8 & 0xFF) = op (and_op)
cond low byte (& 0xFF) = arg1_argtype (and_arg1_argtype)
cond2 = arg1_ix (and_arg1_ix)
cmd = arg1_field (and_arg1_field)
par = arg2_argtype (and_arg2_argtype)
pr2 = arg2_ix (and_arg2_ix)
month = arg2_field (and_arg2_field)
"""
return Program(
slot=slot, prog_type=int(ProgramType.AND),
cond=(op << 8) | arg1_type,
cond2=arg1_ix,
cmd=arg1_field,
par=arg2_type,
pr2=arg2_ix,
month=arg2_field,
)
def _state_with(**kwargs) -> MockState:
return MockState(**kwargs)
# ---- Traditional ZONE family -------------------------------------------
def test_state_evaluator_zone_secure_passes_when_state_zero() -> None:
state = _state_with(zones={
7: MockZoneState(name="FRONT DOOR", current_state=0),
})
ev = StateEvaluator(state)
# ZONE family = 0x04 (secure variant); instance = 7
cond = _and_traditional(1, family=0x04, instance=7)
assert ev(cond) is True
def test_state_evaluator_zone_secure_fails_when_tripped() -> None:
state = _state_with(zones={
7: MockZoneState(name="FRONT DOOR", current_state=1), # not-ready
})
ev = StateEvaluator(state)
cond = _and_traditional(1, family=0x04, instance=7)
assert ev(cond) is False
def test_state_evaluator_zone_not_ready_passes_when_tripped() -> None:
state = _state_with(zones={
7: MockZoneState(name="FRONT DOOR", current_state=1),
})
ev = StateEvaluator(state)
# family 0x06 = ZONE + NOT_READY selector bit
cond = _and_traditional(1, family=0x06, instance=7)
assert ev(cond) is True
def test_state_evaluator_zone_undefined_is_secure() -> None:
"""Undefined zone reads as SECURE — matches real-panel behaviour
when a programmed zone slot doesn't exist."""
ev = StateEvaluator(_state_with()) # no zones
secure_check = _and_traditional(1, family=0x04, instance=99)
not_ready_check = _and_traditional(2, family=0x06, instance=99)
assert ev(secure_check) is True
assert ev(not_ready_check) is False
# ---- Traditional CTRL family --------------------------------------------
def test_state_evaluator_unit_on_passes_when_state_one() -> None:
state = _state_with(units={5: MockUnitState(name="LAMP", state=1)})
ev = StateEvaluator(state)
# CTRL family + ON selector = 0x0A
cond = _and_traditional(1, family=0x0A, instance=5)
assert ev(cond) is True
def test_state_evaluator_unit_off_passes_when_state_zero() -> None:
state = _state_with(units={5: MockUnitState(name="LAMP", state=0)})
ev = StateEvaluator(state)
# CTRL family + OFF selector = 0x08
cond = _and_traditional(1, family=0x08, instance=5)
assert ev(cond) is True
def test_state_evaluator_unit_on_for_dimmed_unit() -> None:
"""Dim level 50% → state=150; ON predicate should pass."""
state = _state_with(units={5: MockUnitState(state=150)})
ev = StateEvaluator(state)
cond = _and_traditional(1, family=0x0A, instance=5)
assert ev(cond) is True
# ---- Traditional SEC family ---------------------------------------------
def test_state_evaluator_security_mode_match() -> None:
"""SEC family: family byte = (mode << 4) | area. Area 1 in mode 2."""
state = _state_with(areas={1: MockAreaState(name="MAIN", mode=2)})
ev = StateEvaluator(state)
cond = _and_traditional(1, family=(2 << 4) | 1) # mode 2, area 1 = 0x21
assert ev(cond) is True
# Area in different mode → fails.
cond_wrong = _and_traditional(1, family=(3 << 4) | 1) # mode 3
assert ev(cond_wrong) is False
# ---- Traditional OTHER family -------------------------------------------
def test_state_evaluator_never_is_always_false() -> None:
"""MiscConditional.NEVER (= 1) → always False, regardless of state."""
ev = StateEvaluator(_state_with())
# OTHER family + NEVER misc = 0x01
cond = _and_traditional(1, family=0x01)
assert ev(cond) is False
def test_state_evaluator_dark_without_location_is_false() -> None:
ev = StateEvaluator(_state_with()) # no location
# OTHER family + DARK misc = 0x03
cond = _and_traditional(1, family=0x03)
assert ev(cond) is False
# ---- Structured: Zone fields --------------------------------------------
def test_state_evaluator_structured_zone_current_state_eq() -> None:
"""Zone 5 CurrentState == 1 (not-ready) — structured form."""
state = _state_with(zones={5: MockZoneState(current_state=1)})
ev = StateEvaluator(state)
# EQ, Arg1=ZONE.CurrentState(2), Arg1IX=5, Arg2=CONSTANT, Arg2IX=1
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=1,
)
assert ev(cond) is True
def test_state_evaluator_structured_zone_state_ne() -> None:
state = _state_with(zones={5: MockZoneState(current_state=0)})
ev = StateEvaluator(state)
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_NE_ARG2),
arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=0,
)
assert ev(cond) is False # state IS 0
# ---- Structured: Thermostat fields --------------------------------------
def test_state_evaluator_structured_thermostat_temp_gt() -> None:
"""TEMPERATURE > 75 — structured comparison."""
# Thermostat raw temperature 168 ~ 76°F on the Omni linear scale,
# but we compare raw bytes here. Use a raw temp clearly above the
# constant.
state = _state_with(thermostats={
1: MockThermostatState(temperature_raw=80),
})
ev = StateEvaluator(state)
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_GT_ARG2),
arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=75,
)
assert ev(cond) is True
# And < should fail.
cond_lt = _and_structured(
slot=2, op=int(CondOP.ARG1_LT_ARG2),
arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=75,
)
assert ev(cond_lt) is False
# ---- Structured: TimeDate fields ----------------------------------------
def test_state_evaluator_structured_timedate_hour_compare() -> None:
"""Current hour == 22 — uses the clock."""
clock = FakeClock(datetime(2026, 5, 14, 22, 30, tzinfo=timezone.utc))
ev = StateEvaluator(_state_with(), clock=clock)
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8, # Hour
arg2_type=int(CondArgType.CONSTANT), arg2_ix=22,
)
assert ev(cond) is True
def test_state_evaluator_structured_timedate_dayofweek() -> None:
"""DayOfWeek == 4 (Thursday)."""
clock = FakeClock(datetime(2026, 5, 14, 12, 0, tzinfo=timezone.utc)) # Thu
ev = StateEvaluator(_state_with(), clock=clock)
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=5, # DOW
arg2_type=int(CondArgType.CONSTANT), arg2_ix=4,
)
assert ev(cond) is True
def test_state_evaluator_structured_timedate_without_clock_is_false() -> None:
"""No clock → TimeDate predicates resolve to None → comparison False."""
ev = StateEvaluator(_state_with()) # no clock
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=22,
)
assert ev(cond) is False
# ---- Engine integration --------------------------------------------------
@pytest.mark.asyncio
async def test_engine_use_state_evaluator_gates_real_conditions() -> None:
"""End-to-end: WHEN + AND IF UNIT 3 ON + THEN UNIT 9 ON.
Chain fires only when unit 3 is on at the time the event arrives."""
button_evt = event_id_user_macro_button(5)
# WHEN button 5; AND IF unit 3 ON; THEN unit 9 ON.
when = _when(1, button_evt)
and_cond = _and_traditional(2, family=0x0A, instance=3) # CTRL + ON, unit 3
then = _then_action(3, int(Command.UNIT_ON), 9)
# Start with unit 3 OFF.
panel = MockPanel(
controller_key=CONTROLLER_KEY,
state=MockState(
programs={
1: when.encode_wire_bytes(),
2: and_cond.encode_wire_bytes(),
3: then.encode_wire_bytes(),
},
units={3: MockUnitState(state=0)},
),
)
async with ProgramEngine(panel, clock=FakeClock(
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
)) as engine:
engine.use_state_evaluator()
# Unit 3 is OFF — chain blocked.
fired = await engine.emit_user_macro_button(5)
assert fired == 0
assert 9 not in panel.state.units or panel.state.units[9].state == 0
# Turn unit 3 ON, re-emit — chain fires.
panel.state.units[3].state = 1
fired = await engine.emit_user_macro_button(5)
assert fired == 1
assert panel.state.units[9].state == 1
@pytest.mark.asyncio
async def test_engine_state_evaluator_or_alternative() -> None:
"""WHEN + AND IF zone 5 secure + OR + AND IF zone 6 secure + THEN.
Fires if either zone is secure."""
button_evt = event_id_user_macro_button(5)
when = _when(1, button_evt)
and_z5 = _and_traditional(2, family=0x04, instance=5) # ZONE 5 secure
or_break = _or_cond(3) # OR boundary
and_z6 = _and_traditional(4, family=0x04, instance=6) # ZONE 6 secure
then = _then_action(5, int(Command.UNIT_ON), 10)
# Zone 5 tripped, Zone 6 secure → group A fails, group B passes.
panel = MockPanel(
controller_key=CONTROLLER_KEY,
state=MockState(
programs={
p.slot: p.encode_wire_bytes()
for p in (when, and_z5, or_break, and_z6, then)
},
zones={
5: MockZoneState(current_state=1),
6: MockZoneState(current_state=0),
},
),
)
async with ProgramEngine(panel, clock=FakeClock(
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
)) as engine:
engine.use_state_evaluator()
fired = await engine.emit_user_macro_button(5)
assert fired == 1 # OR-alternative passed
assert panel.state.units[10].state == 1