Compare commits

..

3 Commits

Author SHA1 Message Date
d4c4e530f6 program_engine: real AND/OR condition evaluator
Some checks are pending
Validate / HACS validation (push) Waiting to run
Validate / Hassfest (push) Waiting to run
StateEvaluator decodes AND/OR records against MockState. ProgramEngine
.use_state_evaluator() installs one bound to the engine's panel + clock
+ location. Replaces the stub that always-passes-AND-always-fails-OR.

Traditional (OP=0) decode follows clsConditionLine.Cond synthesis
(clsConditionLine.cs:17-33): disk byte 1 (= and_family) carries the
compact GetConditionalText family byte, disk bytes 3-4 (and_instance
from cond2>>8) carry the object index. Family decoding mirrors
clsText.GetConditionalText (clsText.cs:2224-2274):

  family & 0xFC == 0x00 → OTHER  (low 4 bits = MiscConditional)
  family & 0xFC == 0x04 → ZONE   (bit 0x02 = NOT_READY, else SECURE)
  family & 0xFC == 0x08 → CTRL   (bit 0x02 = ON,         else OFF)
  family & 0xFC == 0x0C → TIME   (no MockState model → False)
  family >= 0x10        → SEC    (high nibble = mode, low = area)

Structured (OP > 0) decode uses Arg1 OP Arg2 with both sides resolved
via _resolve_arg(argtype, ix, field). MockState-backed resolution:

  ZONE        → loop / current_state / arming_state / latched_state
  UNIT        → on/off byte / time_remaining / dim level
  THERMOSTAT  → temp / setpoints / modes / humidity
  AREA        → mode
  TIME_DATE   → (clock-derived) year / month / day / DoW / hour /
                minute / time-of-day-in-minutes

CondOP supported: EQ / NE / LT / GT / ODD / EVEN / MULTIPLE / IN /
NOT_IN. Unknown argtypes or fields raise _UnsupportedCondition
internally — the evaluator swallows it and returns False, keeping the
chain *guarded* rather than firing too eagerly.

LIGHT/DARK MiscConditional uses astral via the engine's PanelLocation
when set. When location is missing, returns False either way (don't
fire if we can't determine).

15 new tests covering each evaluator branch (Traditional ZONE secure/
not-ready/undefined, CTRL on/off/dimmed, SEC mode-match, OTHER NEVER/
DARK; Structured Zone.CurrentState EQ/NE, Thermostat.Temp GT/LT,
TimeDate.Hour/DOW EQ, TimeDate-without-clock) plus end-to-end engine
integration showing use_state_evaluator() gates a real WHEN+AND chain
and the OR-alternative path works against real state.

Full suite: 581 passed, 1 skipped (up from 563, 82 engine tests total).
2026-05-14 02:39:41 -06:00
16655da34c hacs: switch canonical URLs to github.com/rsp2k/omni-pca + add validation CI
- manifest.json documentation/issue_tracker → GitHub (where HACS users land)
- README install instructions → `pip install omni-pca` (now on PyPI)
- pyproject.toml URLs → Repository / Issues / Changelog / Documentation
- custom_components README → HACS default-catalog install flow
- .github/workflows/validate.yml: hacs/action + hassfest on push/PR/weekly

Library remains importable from PyPI; integration tracks the same release tag.
2026-05-14 02:35:21 -06:00
116591be90 dev: refresh integration screenshots (2026-05-10 + 2026-05-11) 2026-05-14 02:32:54 -06:00
20 changed files with 762 additions and 25 deletions

26
.github/workflows/validate.yml vendored Normal file
View File

@ -0,0 +1,26 @@
name: Validate
on:
push:
branches: [main]
pull_request:
schedule:
- cron: "0 4 * * 1" # weekly Monday 04:00 UTC
workflow_dispatch:
jobs:
hacs:
name: HACS validation
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: hacs/action@main
with:
category: integration
hassfest:
name: Hassfest
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: home-assistant/actions/hassfest@master

View File

@ -82,4 +82,4 @@ First release. Working library + Home Assistant custom component, validated end-
- **PyPI publish**: `omni-pca` not yet on PyPI; HA `manifest.json` requirements line will only resolve once it is. For now users either install the wheel manually or pip-install from a Git URL.
- **HACS submission**: pending live-panel validation.
[2026.5.10]: https://git.supported.systems/warehack.ing/omni-pca/releases/tag/v2026.5.10
[2026.5.10]: https://github.com/rsp2k/omni-pca/releases/tag/v2026.5.10

View File

@ -4,7 +4,7 @@ Async Python client for HAI/Leviton Omni-Link II home automation panels — Omni
Includes a Home Assistant custom component (`custom_components/omni_pca/`).
**Project home:** <https://git.supported.systems/warehack.ing/omni-pca>
**Project home:** <https://github.com/rsp2k/omni-pca>
**Documentation:** <https://hai-omni-pro-ii.warehack.ing/>
## Status
@ -18,20 +18,14 @@ The full byte-level protocol spec lives at <https://hai-omni-pro-ii.warehack.ing
## Install
The library isn't on PyPI yet (pending), so install directly from the Gitea release:
```bash
# Pinned to a specific release (recommended)
pip install "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
# Or the wheel from the release page
pip install https://git.supported.systems/warehack.ing/omni-pca/releases/download/v2026.5.10/omni_pca-2026.5.10-py3-none-any.whl
pip install omni-pca
# Or with uv
uv add "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
uv add omni-pca
```
Once published to PyPI, the canonical install will be `pip install omni-pca`.
For Home Assistant users, install the integration through HACS — see the [HA install how-to](https://hai-omni-pro-ii.warehack.ing/how-to/install-in-home-assistant/).
## Quick start (library)
@ -79,7 +73,7 @@ The HA integration picks the right client automatically based on the **Transport
cd /path/to/your/homeassistant/config/
mkdir -p custom_components
cd custom_components
git clone https://git.supported.systems/warehack.ing/omni-pca tmp-omni
git clone https://github.com/rsp2k/omni-pca tmp-omni
cp -r tmp-omni/custom_components/omni_pca .
rm -rf tmp-omni
```

View File

@ -6,16 +6,18 @@ opens an encrypted session straight to the panel and listens for unsolicited
push messages.
This integration is the HA-facing wrapper around the
[`omni-pca`](https://git.supported.systems/warehack.ing/omni-pca) Python library; the library
[`omni-pca`](https://github.com/rsp2k/omni-pca) Python library; the library
handles the wire protocol, this component surfaces it as HA entities.
## Install
### HACS (recommended once published)
### HACS
1. HACS → Integrations → custom repository → add
`https://git.supported.systems/warehack.ing/omni-pca`, category **Integration**.
2. Install **HAI / Leviton Omni Panel**, then restart Home Assistant.
1. HACS → Integrations → search **HAI / Leviton Omni Panel**.
2. Install, then restart Home Assistant.
(If not yet in the HACS default catalog: HACS → Integrations → custom
repository → add `https://github.com/rsp2k/omni-pca`, category **Integration**.)
### Manual
@ -121,6 +123,6 @@ hashed) — useful for bug reports.
- **No entities for X**: only objects with a name configured on the panel
are discovered. PC Access's "Names" page is where they live.
See the [parent README](https://git.supported.systems/warehack.ing/omni-pca) for protocol /
See the [parent README](https://github.com/rsp2k/omni-pca) for protocol /
library details. Detailed reverse-engineering notes are in
[`docs/JOURNEY.md`](https://git.supported.systems/warehack.ing/omni-pca/blob/main/docs/JOURNEY.md).
[`docs/JOURNEY.md`](https://github.com/rsp2k/omni-pca/blob/main/docs/JOURNEY.md).

View File

@ -7,7 +7,7 @@
"dependencies": [],
"codeowners": ["@rsp2k"],
"requirements": ["omni-pca==2026.5.11"],
"documentation": "https://git.supported.systems/warehack.ing/omni-pca",
"issue_tracker": "https://git.supported.systems/warehack.ing/omni-pca/issues",
"documentation": "https://github.com/rsp2k/omni-pca",
"issue_tracker": "https://github.com/rsp2k/omni-pca/issues",
"integration_type": "hub"
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 107 KiB

After

Width:  |  Height:  |  Size: 108 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 130 KiB

After

Width:  |  Height:  |  Size: 128 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 106 KiB

After

Width:  |  Height:  |  Size: 107 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 220 KiB

After

Width:  |  Height:  |  Size: 221 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 85 KiB

After

Width:  |  Height:  |  Size: 86 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 189 KiB

After

Width:  |  Height:  |  Size: 190 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 102 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 128 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 122 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 287 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 86 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 190 KiB

View File

@ -34,7 +34,10 @@ engine = ["astral>=2.2,<3"]
omni-pca = "omni_pca.__main__:main"
[project.urls]
Repository = "https://git.supported.systems/warehack.ing/omni-pca"
Repository = "https://github.com/rsp2k/omni-pca"
Issues = "https://github.com/rsp2k/omni-pca/issues"
Changelog = "https://github.com/rsp2k/omni-pca/blob/main/CHANGELOG.md"
Documentation = "https://hai-omni-pro-ii.warehack.ing/"
[build-system]
requires = ["uv_build>=0.11.8,<0.12.0"]

View File

@ -48,7 +48,16 @@ from dataclasses import dataclass, field
from datetime import date, datetime, time, timedelta, timezone
from typing import TYPE_CHECKING
from .programs import Days, Program, ProgramType, TimeKind
from .programs import (
CondArgType,
CondOP,
Days,
MiscConditional,
Program,
ProgramCond,
ProgramType,
TimeKind,
)
_log = logging.getLogger(__name__)
@ -605,6 +614,357 @@ def evaluate_conditions(
return False
# --------------------------------------------------------------------------
# State-aware AND/OR evaluator
# --------------------------------------------------------------------------
class _UnsupportedCondition(Exception):
"""Raised internally when an AND/OR record encodes a check we don't
yet evaluate. The caller (StateEvaluator) treats this as False
"we can't prove this condition", so the chain stays guarded and
logs once so the next semantic-decode pass has a punch list.
"""
class StateEvaluator:
"""Decode AND/OR records against a :class:`MockState` snapshot.
Each AND/OR :class:`Program` record encodes either:
* **Traditional** (``and_op == 0``): a compact bit-packed condition
in the ``cond`` u16 selecting a family (ProgramCond: ZONE / CTRL /
TIME / SEC / OTHER) with an inline operand. Decoded per
``clsText.GetConditionalText`` (clsText.cs:2224-2274).
* **Structured** (``and_op > 0``): a ``Arg1 OP Arg2`` triple where
Arg1 and Arg2 are typed references (Zone / Unit / Thermostat /
TimeDate / Constant / ) plus per-type field selectors. The
operator is one of :class:`CondOP` (EQ / NE / LT / GT / ).
Coverage in this initial cut:
Traditional:
* ZONE family zone secure / not-ready against MockZoneState
* CTRL family unit on / off against MockUnitState
* SEC family area in security-mode against MockAreaState
* OTHER family: ``NEVER`` (always false), ``LIGHT``/``DARK``
approximated via the engine's sun events when location is set,
``AC_POWER_OFF``/``ON`` (no model return False),
``BATTERY_LOW``/``OK`` (no model return False)
* TIME family (time-clock enabled/disabled) no MockState
slot for time-clock toggles, returns False
Structured:
* Zone.CurrentState / ArmingState (== const)
* Unit.CurrentState / Level (== / >= / <= const)
* Thermostat.CurrentTemp / Humidity / setpoints (numeric compare)
* TimeDate.Month / Day / DayOfWeek / Hour / Minute (numeric compare)
Anything else returns ``False`` and logs at DEBUG once per
condition class keeps a chain *guarded* rather than fired-too-
eagerly when we can't yet decode the predicate.
Time-related comparisons need a :class:`Clock`; pass one to honor
Hour/Minute/DayOfWeek predicates correctly. Without a clock those
return False.
"""
def __init__(
self,
state,
*,
clock: Clock | None = None,
location: PanelLocation | None = None,
) -> None:
self._state = state
self._clock = clock
self._location = location
def __call__(self, condition: Program) -> bool:
"""Evaluate one AND/OR record against the bound MockState.
Treated as a plain predicate so callers can pass this instance
directly to ``ProgramEngine.set_condition_evaluator``.
"""
try:
if condition.and_op == CondOP.ARG1_TRADITIONAL:
return self._eval_traditional(condition)
return self._eval_structured(condition)
except _UnsupportedCondition as err:
_log.debug("evaluator: unsupported condition: %s", err)
return False
except Exception:
_log.exception("evaluator: condition evaluation crashed")
return False
# ---- Traditional ------------------------------------------------------
def _eval_traditional(self, c: Program) -> bool:
"""Decode a Traditional AND record.
Per ``clsConditionLine.Cond`` (clsConditionLine.cs:17-33), the
compact-form cond is split across two AND-record slots:
* disk byte 1 (= ``and_family``) carries the compact's high byte
(the family + selector encoding from GetConditionalText)
* disk bytes 3-4 (= ``and_arg1_ix``, ``and_instance`` derived
from ``cond2 >> 8``) carry the compact's low byte (the
object index, shifted into the high half)
Family decoding mirrors clsText.GetConditionalText (clsText.cs:
2224-2274):
family & 0xFC == 0x00 OTHER (low 4 bits = MiscConditional)
family & 0xFC == 0x04 ZONE (bit 0x02 = NOT_READY, else SECURE)
family & 0xFC == 0x08 CTRL (bit 0x02 = ON, else OFF)
family & 0xFC == 0x0C TIME (bit 0x02 = ENABLED, else DIS.)
family >= 0x10 SEC (high nibble = mode, low = area)
"""
family = c.and_family
instance = c.and_instance
family_major = family & 0xFC
secondary = bool(family & 0x02) # selector bit within the family
if family_major == 0x00:
return self._eval_other(family & 0x0F)
if family_major == ProgramCond.ZONE:
return self._eval_traditional_zone(instance, want_not_ready=secondary)
if family_major == ProgramCond.CTRL:
return self._eval_traditional_ctrl(instance, want_on=secondary)
if family_major == ProgramCond.TIME:
# Time-clock enabled / disabled — MockState doesn't model
# the enable bit, so we conservatively report disabled.
return False
# 0x10 and above: SEC family — high nibble = mode, low nibble = area.
return self._eval_traditional_sec(
area=family & 0x0F, mode=(family >> 4) & 0x07,
)
def _eval_traditional_zone(self, zone_num: int, *, want_not_ready: bool) -> bool:
"""SECURE matches ``current_state == 0``; NOT_READY matches any
nonzero current_state (per the panel's display semantics)."""
zone = self._state.zones.get(zone_num)
if zone is None:
# Undefined zone reads as SECURE (matches real-panel behaviour
# when a programmed zone slot doesn't exist).
return not want_not_ready
if want_not_ready:
return zone.current_state != 0
return zone.current_state == 0
def _eval_traditional_ctrl(self, unit_num: int, *, want_on: bool) -> bool:
"""ON matches any nonzero ``state``; OFF matches ``state == 0``.
MockUnitState.state encodes 0=off, 1=on, 100..200=dim level
all nonzero values count as "on" for this predicate, which
matches the panel's binary on/off display.
"""
unit = self._state.units.get(unit_num)
if unit is None:
return not want_on # missing unit reads as OFF
on = unit.state != 0
return on == want_on
def _eval_traditional_sec(self, *, area: int, mode: int) -> bool:
"""Area in security-mode N. ``area == 0`` means "any area in
this mode" per GetConditionalText:2262 — without a multi-area
model we approximate by checking area 1."""
if area == 0:
area = 1
a = self._state.areas.get(area)
if a is None:
return False
return a.mode == mode
def _eval_other(self, misc_code: int) -> bool:
"""OTHER family: low 4 bits = enuMiscConditional."""
try:
cat = MiscConditional(misc_code)
except ValueError:
raise _UnsupportedCondition(f"unknown misc condition {misc_code}")
if cat == MiscConditional.NONE:
return True
if cat == MiscConditional.NEVER:
return False
if cat in (MiscConditional.LIGHT, MiscConditional.DARK):
light = self._is_light_outside()
if light is None:
# Can't determine — be conservative, don't fire either way.
return False
return light if cat == MiscConditional.LIGHT else not light
# PHONE_*, AC_POWER_*, BATTERY_*, ENERGY_COST_* — no MockState
# model for any of these yet. Conservatively False.
return False
def _is_light_outside(self) -> bool | None:
"""Approximate "is the sun up" against the engine's PanelLocation
+ clock. Returns None when clock or location is missing
(caller decides what to do with the indeterminate result)."""
if self._clock is None or self._location is None:
return None
try:
now = self._clock.now()
sunrise, sunset = _sun_events(self._location, now.date())
except Exception:
return None
return sunrise <= now <= sunset
# ---- Structured -------------------------------------------------------
def _eval_structured(self, c: Program) -> bool:
"""Evaluate ``Arg1 OP Arg2`` style conditions.
Resolves Arg1 and Arg2 to numeric values via :meth:`_resolve_arg`
then compares with the operator. Comparison operators are
straightforward integer math; AND/OR/XOR at this layer are
treated as bitwise reductions on the resolved values (matches
the C# operator semantics for those uncommon op codes).
"""
op = c.and_op
arg1 = self._resolve_arg(
c.and_arg1_argtype, c.and_arg1_ix, c.and_arg1_field,
)
arg2 = self._resolve_arg(
c.and_arg2_argtype, c.and_arg2_ix, c.and_arg2_field,
)
if arg1 is None or arg2 is None:
return False
if op == CondOP.ARG1_EQ_ARG2:
return arg1 == arg2
if op == CondOP.ARG1_NE_ARG2:
return arg1 != arg2
if op == CondOP.ARG1_LT_ARG2:
return arg1 < arg2
if op == CondOP.ARG1_GT_ARG2:
return arg1 > arg2
if op == CondOP.ARG1_ODD:
return (arg1 & 1) == 1
if op == CondOP.ARG1_EVEN:
return (arg1 & 1) == 0
# MULTIPLE / IN / NOT_IN are bitfield checks the panel uses for
# day-of-week and area-set tests. arg2 is the bitmask.
if op == CondOP.ARG1_MULTIPLE_ARG2:
return arg2 != 0 and (arg1 % arg2) == 0
if op == CondOP.ARG1_IN_ARG2:
return bool(arg1 & arg2)
if op == CondOP.ARG1_NOT_IN_ARG2:
return not bool(arg1 & arg2)
raise _UnsupportedCondition(f"unknown structured op {op}")
def _resolve_arg(
self, argtype: int, ix: int, field: int,
) -> int | None:
"""Return the numeric value of one Arg side, or None if it can't
be resolved (unknown type, missing object, missing clock).
"""
if argtype == CondArgType.CONSTANT:
return ix
if argtype == CondArgType.ZONE:
return self._resolve_zone_field(ix, field)
if argtype == CondArgType.UNIT:
return self._resolve_unit_field(ix, field)
if argtype == CondArgType.THERMOSTAT:
return self._resolve_thermostat_field(ix, field)
if argtype == CondArgType.AREA:
return self._resolve_area_field(ix, field)
if argtype == CondArgType.TIME_DATE:
return self._resolve_timedate_field(field)
# USER_SETTING, AUXILLARY, AUDIO, ACCESS_CONTROL, MESSAGE,
# SYSTEM — no MockState models for these. Treat as unresolved
# so the comparison returns False.
raise _UnsupportedCondition(f"unsupported argtype {argtype}")
def _resolve_zone_field(self, ix: int, field: int) -> int | None:
zone = self._state.zones.get(ix)
if zone is None:
return None
# enuZoneField: LoopReading=1, CurrentState=2, ArmingState=3, AlarmState=4
if field == 1:
return zone.loop
if field == 2:
return zone.current_state
if field == 3:
return zone.arming_state
if field == 4:
return zone.latched_state
raise _UnsupportedCondition(f"zone field {field}")
def _resolve_unit_field(self, ix: int, field: int) -> int | None:
unit = self._state.units.get(ix)
if unit is None:
return None
# enuUnitField: CurrentState=1, PreviousState=2, Timer=3, Level=4
if field == 1:
# 0 = off, 1 = on, 100..200 = dim. The panel treats anything
# non-zero as "on" at this granularity.
return 1 if unit.state != 0 else 0
if field == 3:
return unit.time_remaining
if field == 4:
# Level: panel returns 0..100% — derive from state byte.
if unit.state >= 100:
return unit.state - 100
return 100 if unit.state == 1 else 0
raise _UnsupportedCondition(f"unit field {field}")
def _resolve_thermostat_field(self, ix: int, field: int) -> int | None:
t = self._state.thermostats.get(ix)
if t is None:
return None
# enuThermostatField map (subset MockState has data for):
# CurrentTemp=1, HeatSetpt=2, CoolSetpt=3, SystemMode=4, FanMode=5,
# HoldMode=6, Humidity=9, HumidifySetpoint=10, DehumidifySetpoint=11,
# OutdoorTemperature=12
return {
1: t.temperature_raw,
2: t.heat_setpoint_raw,
3: t.cool_setpoint_raw,
4: t.system_mode,
5: t.fan_mode,
6: t.hold_mode,
9: t.humidity_raw,
10: t.humidify_setpoint_raw,
11: t.dehumidify_setpoint_raw,
12: t.outdoor_temperature_raw,
}.get(field, None)
def _resolve_area_field(self, ix: int, field: int) -> int | None:
area = self._state.areas.get(ix)
if area is None:
return None
# No enuAreaField source — be permissive: field 1 = mode.
if field == 1:
return area.mode
raise _UnsupportedCondition(f"area field {field}")
def _resolve_timedate_field(self, field: int) -> int | None:
if self._clock is None:
return None
now = self._clock.now()
# enuTimeDateField: Date=1, Year=2, Month=3, Day=4, DayOfWeek=5,
# Time=6, DST_Flag=7, Hour=8, Minute=9, SunriseSunset=10.
if field == 2:
return now.year
if field == 3:
return now.month
if field == 4:
return now.day
if field == 5:
# DayOfWeek: panel uses 1=Mon..7=Sun per clsHAC. Python
# weekday() returns Mon=0..Sun=6 — add 1.
return now.weekday() + 1
if field == 6:
# Time-of-day encoded as (hour * 60 + minute) — minutes since
# midnight. Matches the C# packing in GetComplexConditionText.
return now.hour * 60 + now.minute
if field == 8:
return now.hour
if field == 9:
return now.minute
# Date / DST_Flag / SunriseSunset — not modelled here.
raise _UnsupportedCondition(f"timedate field {field}")
# --------------------------------------------------------------------------
# Engine
# --------------------------------------------------------------------------
@ -687,10 +1047,27 @@ class ProgramEngine:
bool. The default returns True for every AND, False for every
OR (a degenerate evaluator that means "all chains' first AND
groups always pass" — useful as a smoke-test default, not for
real automation). Real callers supply a state-aware evaluator.
real automation).
For real automation, call :meth:`use_state_evaluator` instead
(or build your own :class:`StateEvaluator` and pass it here).
"""
self._condition_evaluator = fn
def use_state_evaluator(self) -> "StateEvaluator":
"""Install a :class:`StateEvaluator` bound to this engine's
``MockState``, clock, and location. Returns the new evaluator
so the caller can introspect it.
Equivalent to ``engine.set_condition_evaluator(
StateEvaluator(panel.state, clock=clock, location=loc))``.
"""
evaluator = StateEvaluator(
self._panel.state, clock=self._clock, location=self._location,
)
self._condition_evaluator = evaluator
return evaluator
@staticmethod
def _default_condition_evaluator(condition: Program) -> bool:
"""Stub evaluator — caller should override via set_condition_evaluator."""

View File

@ -19,7 +19,13 @@ from omni_pca.program_engine import (
RealClock,
classify,
)
from omni_pca.programs import Days, Program, ProgramType
from omni_pca.programs import (
CondArgType,
CondOP,
Days,
Program,
ProgramType,
)
CONTROLLER_KEY = bytes(range(16))
@ -1017,3 +1023,332 @@ async def test_engine_every_chain_fires_on_interval() -> None:
await clock.advance_to(t0 + timedelta(seconds=60 * tick + 1))
await asyncio.sleep(0)
assert engine.metrics.clausal_fired == 3
# ---- Phase 6: detailed AND/OR semantics (StateEvaluator) ----------------
from omni_pca.mock_panel import ( # noqa: E402
MockAreaState,
MockThermostatState,
MockUnitState,
MockZoneState,
)
from omni_pca.program_engine import StateEvaluator # noqa: E402
def _and_traditional(
slot: int, family: int, instance: int = 0,
) -> Program:
"""Build a Traditional (OP=0) AND record.
Per clsConditionLine.Cond synthesis (clsConditionLine.cs:17-33),
the family byte lives in disk byte 1 = ``and_family`` (Python's
``cond & 0xFF``), and the object instance lives in disk byte 3
= ``and_instance`` (Python's ``(cond2 >> 8) & 0xFF``).
"""
return Program(
slot=slot, prog_type=int(ProgramType.AND),
cond=family & 0xFF, # byte 1 = family; byte 2 (OP) = 0
cond2=(instance & 0xFF) << 8, # byte 3 = instance; byte 4 = 0
)
def _and_structured(
slot: int,
op: int,
arg1_type: int, arg1_ix: int, arg1_field: int,
arg2_type: int, arg2_ix: int, arg2_field: int = 0,
) -> Program:
"""Build a Structured (OP>0) AND record.
Field layout per programs.py decoders:
cond high byte (>>8 & 0xFF) = op (and_op)
cond low byte (& 0xFF) = arg1_argtype (and_arg1_argtype)
cond2 = arg1_ix (and_arg1_ix)
cmd = arg1_field (and_arg1_field)
par = arg2_argtype (and_arg2_argtype)
pr2 = arg2_ix (and_arg2_ix)
month = arg2_field (and_arg2_field)
"""
return Program(
slot=slot, prog_type=int(ProgramType.AND),
cond=(op << 8) | arg1_type,
cond2=arg1_ix,
cmd=arg1_field,
par=arg2_type,
pr2=arg2_ix,
month=arg2_field,
)
def _state_with(**kwargs) -> MockState:
return MockState(**kwargs)
# ---- Traditional ZONE family -------------------------------------------
def test_state_evaluator_zone_secure_passes_when_state_zero() -> None:
state = _state_with(zones={
7: MockZoneState(name="FRONT DOOR", current_state=0),
})
ev = StateEvaluator(state)
# ZONE family = 0x04 (secure variant); instance = 7
cond = _and_traditional(1, family=0x04, instance=7)
assert ev(cond) is True
def test_state_evaluator_zone_secure_fails_when_tripped() -> None:
state = _state_with(zones={
7: MockZoneState(name="FRONT DOOR", current_state=1), # not-ready
})
ev = StateEvaluator(state)
cond = _and_traditional(1, family=0x04, instance=7)
assert ev(cond) is False
def test_state_evaluator_zone_not_ready_passes_when_tripped() -> None:
state = _state_with(zones={
7: MockZoneState(name="FRONT DOOR", current_state=1),
})
ev = StateEvaluator(state)
# family 0x06 = ZONE + NOT_READY selector bit
cond = _and_traditional(1, family=0x06, instance=7)
assert ev(cond) is True
def test_state_evaluator_zone_undefined_is_secure() -> None:
"""Undefined zone reads as SECURE — matches real-panel behaviour
when a programmed zone slot doesn't exist."""
ev = StateEvaluator(_state_with()) # no zones
secure_check = _and_traditional(1, family=0x04, instance=99)
not_ready_check = _and_traditional(2, family=0x06, instance=99)
assert ev(secure_check) is True
assert ev(not_ready_check) is False
# ---- Traditional CTRL family --------------------------------------------
def test_state_evaluator_unit_on_passes_when_state_one() -> None:
state = _state_with(units={5: MockUnitState(name="LAMP", state=1)})
ev = StateEvaluator(state)
# CTRL family + ON selector = 0x0A
cond = _and_traditional(1, family=0x0A, instance=5)
assert ev(cond) is True
def test_state_evaluator_unit_off_passes_when_state_zero() -> None:
state = _state_with(units={5: MockUnitState(name="LAMP", state=0)})
ev = StateEvaluator(state)
# CTRL family + OFF selector = 0x08
cond = _and_traditional(1, family=0x08, instance=5)
assert ev(cond) is True
def test_state_evaluator_unit_on_for_dimmed_unit() -> None:
"""Dim level 50% → state=150; ON predicate should pass."""
state = _state_with(units={5: MockUnitState(state=150)})
ev = StateEvaluator(state)
cond = _and_traditional(1, family=0x0A, instance=5)
assert ev(cond) is True
# ---- Traditional SEC family ---------------------------------------------
def test_state_evaluator_security_mode_match() -> None:
"""SEC family: family byte = (mode << 4) | area. Area 1 in mode 2."""
state = _state_with(areas={1: MockAreaState(name="MAIN", mode=2)})
ev = StateEvaluator(state)
cond = _and_traditional(1, family=(2 << 4) | 1) # mode 2, area 1 = 0x21
assert ev(cond) is True
# Area in different mode → fails.
cond_wrong = _and_traditional(1, family=(3 << 4) | 1) # mode 3
assert ev(cond_wrong) is False
# ---- Traditional OTHER family -------------------------------------------
def test_state_evaluator_never_is_always_false() -> None:
"""MiscConditional.NEVER (= 1) → always False, regardless of state."""
ev = StateEvaluator(_state_with())
# OTHER family + NEVER misc = 0x01
cond = _and_traditional(1, family=0x01)
assert ev(cond) is False
def test_state_evaluator_dark_without_location_is_false() -> None:
ev = StateEvaluator(_state_with()) # no location
# OTHER family + DARK misc = 0x03
cond = _and_traditional(1, family=0x03)
assert ev(cond) is False
# ---- Structured: Zone fields --------------------------------------------
def test_state_evaluator_structured_zone_current_state_eq() -> None:
"""Zone 5 CurrentState == 1 (not-ready) — structured form."""
state = _state_with(zones={5: MockZoneState(current_state=1)})
ev = StateEvaluator(state)
# EQ, Arg1=ZONE.CurrentState(2), Arg1IX=5, Arg2=CONSTANT, Arg2IX=1
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=1,
)
assert ev(cond) is True
def test_state_evaluator_structured_zone_state_ne() -> None:
state = _state_with(zones={5: MockZoneState(current_state=0)})
ev = StateEvaluator(state)
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_NE_ARG2),
arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=0,
)
assert ev(cond) is False # state IS 0
# ---- Structured: Thermostat fields --------------------------------------
def test_state_evaluator_structured_thermostat_temp_gt() -> None:
"""TEMPERATURE > 75 — structured comparison."""
# Thermostat raw temperature 168 ~ 76°F on the Omni linear scale,
# but we compare raw bytes here. Use a raw temp clearly above the
# constant.
state = _state_with(thermostats={
1: MockThermostatState(temperature_raw=80),
})
ev = StateEvaluator(state)
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_GT_ARG2),
arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=75,
)
assert ev(cond) is True
# And < should fail.
cond_lt = _and_structured(
slot=2, op=int(CondOP.ARG1_LT_ARG2),
arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=75,
)
assert ev(cond_lt) is False
# ---- Structured: TimeDate fields ----------------------------------------
def test_state_evaluator_structured_timedate_hour_compare() -> None:
"""Current hour == 22 — uses the clock."""
clock = FakeClock(datetime(2026, 5, 14, 22, 30, tzinfo=timezone.utc))
ev = StateEvaluator(_state_with(), clock=clock)
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8, # Hour
arg2_type=int(CondArgType.CONSTANT), arg2_ix=22,
)
assert ev(cond) is True
def test_state_evaluator_structured_timedate_dayofweek() -> None:
"""DayOfWeek == 4 (Thursday)."""
clock = FakeClock(datetime(2026, 5, 14, 12, 0, tzinfo=timezone.utc)) # Thu
ev = StateEvaluator(_state_with(), clock=clock)
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=5, # DOW
arg2_type=int(CondArgType.CONSTANT), arg2_ix=4,
)
assert ev(cond) is True
def test_state_evaluator_structured_timedate_without_clock_is_false() -> None:
"""No clock → TimeDate predicates resolve to None → comparison False."""
ev = StateEvaluator(_state_with()) # no clock
cond = _and_structured(
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8,
arg2_type=int(CondArgType.CONSTANT), arg2_ix=22,
)
assert ev(cond) is False
# ---- Engine integration --------------------------------------------------
@pytest.mark.asyncio
async def test_engine_use_state_evaluator_gates_real_conditions() -> None:
"""End-to-end: WHEN + AND IF UNIT 3 ON + THEN UNIT 9 ON.
Chain fires only when unit 3 is on at the time the event arrives."""
button_evt = event_id_user_macro_button(5)
# WHEN button 5; AND IF unit 3 ON; THEN unit 9 ON.
when = _when(1, button_evt)
and_cond = _and_traditional(2, family=0x0A, instance=3) # CTRL + ON, unit 3
then = _then_action(3, int(Command.UNIT_ON), 9)
# Start with unit 3 OFF.
panel = MockPanel(
controller_key=CONTROLLER_KEY,
state=MockState(
programs={
1: when.encode_wire_bytes(),
2: and_cond.encode_wire_bytes(),
3: then.encode_wire_bytes(),
},
units={3: MockUnitState(state=0)},
),
)
async with ProgramEngine(panel, clock=FakeClock(
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
)) as engine:
engine.use_state_evaluator()
# Unit 3 is OFF — chain blocked.
fired = await engine.emit_user_macro_button(5)
assert fired == 0
assert 9 not in panel.state.units or panel.state.units[9].state == 0
# Turn unit 3 ON, re-emit — chain fires.
panel.state.units[3].state = 1
fired = await engine.emit_user_macro_button(5)
assert fired == 1
assert panel.state.units[9].state == 1
@pytest.mark.asyncio
async def test_engine_state_evaluator_or_alternative() -> None:
"""WHEN + AND IF zone 5 secure + OR + AND IF zone 6 secure + THEN.
Fires if either zone is secure."""
button_evt = event_id_user_macro_button(5)
when = _when(1, button_evt)
and_z5 = _and_traditional(2, family=0x04, instance=5) # ZONE 5 secure
or_break = _or_cond(3) # OR boundary
and_z6 = _and_traditional(4, family=0x04, instance=6) # ZONE 6 secure
then = _then_action(5, int(Command.UNIT_ON), 10)
# Zone 5 tripped, Zone 6 secure → group A fails, group B passes.
panel = MockPanel(
controller_key=CONTROLLER_KEY,
state=MockState(
programs={
p.slot: p.encode_wire_bytes()
for p in (when, and_z5, or_break, and_z6, then)
},
zones={
5: MockZoneState(current_state=1),
6: MockZoneState(current_state=0),
},
),
)
async with ProgramEngine(panel, clock=FakeClock(
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
)) as engine:
engine.use_state_evaluator()
fired = await engine.emit_user_macro_button(5)
assert fired == 1 # OR-alternative passed
assert panel.state.units[10].state == 1