Compare commits
3 Commits
cc32081caf
...
d4c4e530f6
| Author | SHA1 | Date | |
|---|---|---|---|
| d4c4e530f6 | |||
| 16655da34c | |||
| 116591be90 |
26
.github/workflows/validate.yml
vendored
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
name: Validate
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: [main]
|
||||||
|
pull_request:
|
||||||
|
schedule:
|
||||||
|
- cron: "0 4 * * 1" # weekly Monday 04:00 UTC
|
||||||
|
workflow_dispatch:
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
hacs:
|
||||||
|
name: HACS validation
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
- uses: hacs/action@main
|
||||||
|
with:
|
||||||
|
category: integration
|
||||||
|
|
||||||
|
hassfest:
|
||||||
|
name: Hassfest
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
- uses: home-assistant/actions/hassfest@master
|
||||||
@ -82,4 +82,4 @@ First release. Working library + Home Assistant custom component, validated end-
|
|||||||
- **PyPI publish**: `omni-pca` not yet on PyPI; HA `manifest.json` requirements line will only resolve once it is. For now users either install the wheel manually or pip-install from a Git URL.
|
- **PyPI publish**: `omni-pca` not yet on PyPI; HA `manifest.json` requirements line will only resolve once it is. For now users either install the wheel manually or pip-install from a Git URL.
|
||||||
- **HACS submission**: pending live-panel validation.
|
- **HACS submission**: pending live-panel validation.
|
||||||
|
|
||||||
[2026.5.10]: https://git.supported.systems/warehack.ing/omni-pca/releases/tag/v2026.5.10
|
[2026.5.10]: https://github.com/rsp2k/omni-pca/releases/tag/v2026.5.10
|
||||||
|
|||||||
16
README.md
@ -4,7 +4,7 @@ Async Python client for HAI/Leviton Omni-Link II home automation panels — Omni
|
|||||||
|
|
||||||
Includes a Home Assistant custom component (`custom_components/omni_pca/`).
|
Includes a Home Assistant custom component (`custom_components/omni_pca/`).
|
||||||
|
|
||||||
**Project home:** <https://git.supported.systems/warehack.ing/omni-pca>
|
**Project home:** <https://github.com/rsp2k/omni-pca>
|
||||||
**Documentation:** <https://hai-omni-pro-ii.warehack.ing/>
|
**Documentation:** <https://hai-omni-pro-ii.warehack.ing/>
|
||||||
|
|
||||||
## Status
|
## Status
|
||||||
@ -18,20 +18,14 @@ The full byte-level protocol spec lives at <https://hai-omni-pro-ii.warehack.ing
|
|||||||
|
|
||||||
## Install
|
## Install
|
||||||
|
|
||||||
The library isn't on PyPI yet (pending), so install directly from the Gitea release:
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# Pinned to a specific release (recommended)
|
pip install omni-pca
|
||||||
pip install "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
|
|
||||||
|
|
||||||
# Or the wheel from the release page
|
|
||||||
pip install https://git.supported.systems/warehack.ing/omni-pca/releases/download/v2026.5.10/omni_pca-2026.5.10-py3-none-any.whl
|
|
||||||
|
|
||||||
# Or with uv
|
# Or with uv
|
||||||
uv add "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
|
uv add omni-pca
|
||||||
```
|
```
|
||||||
|
|
||||||
Once published to PyPI, the canonical install will be `pip install omni-pca`.
|
For Home Assistant users, install the integration through HACS — see the [HA install how-to](https://hai-omni-pro-ii.warehack.ing/how-to/install-in-home-assistant/).
|
||||||
|
|
||||||
## Quick start (library)
|
## Quick start (library)
|
||||||
|
|
||||||
@ -79,7 +73,7 @@ The HA integration picks the right client automatically based on the **Transport
|
|||||||
cd /path/to/your/homeassistant/config/
|
cd /path/to/your/homeassistant/config/
|
||||||
mkdir -p custom_components
|
mkdir -p custom_components
|
||||||
cd custom_components
|
cd custom_components
|
||||||
git clone https://git.supported.systems/warehack.ing/omni-pca tmp-omni
|
git clone https://github.com/rsp2k/omni-pca tmp-omni
|
||||||
cp -r tmp-omni/custom_components/omni_pca .
|
cp -r tmp-omni/custom_components/omni_pca .
|
||||||
rm -rf tmp-omni
|
rm -rf tmp-omni
|
||||||
```
|
```
|
||||||
|
|||||||
@ -6,16 +6,18 @@ opens an encrypted session straight to the panel and listens for unsolicited
|
|||||||
push messages.
|
push messages.
|
||||||
|
|
||||||
This integration is the HA-facing wrapper around the
|
This integration is the HA-facing wrapper around the
|
||||||
[`omni-pca`](https://git.supported.systems/warehack.ing/omni-pca) Python library; the library
|
[`omni-pca`](https://github.com/rsp2k/omni-pca) Python library; the library
|
||||||
handles the wire protocol, this component surfaces it as HA entities.
|
handles the wire protocol, this component surfaces it as HA entities.
|
||||||
|
|
||||||
## Install
|
## Install
|
||||||
|
|
||||||
### HACS (recommended once published)
|
### HACS
|
||||||
|
|
||||||
1. HACS → Integrations → custom repository → add
|
1. HACS → Integrations → search **HAI / Leviton Omni Panel**.
|
||||||
`https://git.supported.systems/warehack.ing/omni-pca`, category **Integration**.
|
2. Install, then restart Home Assistant.
|
||||||
2. Install **HAI / Leviton Omni Panel**, then restart Home Assistant.
|
|
||||||
|
(If not yet in the HACS default catalog: HACS → Integrations → custom
|
||||||
|
repository → add `https://github.com/rsp2k/omni-pca`, category **Integration**.)
|
||||||
|
|
||||||
### Manual
|
### Manual
|
||||||
|
|
||||||
@ -121,6 +123,6 @@ hashed) — useful for bug reports.
|
|||||||
- **No entities for X**: only objects with a name configured on the panel
|
- **No entities for X**: only objects with a name configured on the panel
|
||||||
are discovered. PC Access's "Names" page is where they live.
|
are discovered. PC Access's "Names" page is where they live.
|
||||||
|
|
||||||
See the [parent README](https://git.supported.systems/warehack.ing/omni-pca) for protocol /
|
See the [parent README](https://github.com/rsp2k/omni-pca) for protocol /
|
||||||
library details. Detailed reverse-engineering notes are in
|
library details. Detailed reverse-engineering notes are in
|
||||||
[`docs/JOURNEY.md`](https://git.supported.systems/warehack.ing/omni-pca/blob/main/docs/JOURNEY.md).
|
[`docs/JOURNEY.md`](https://github.com/rsp2k/omni-pca/blob/main/docs/JOURNEY.md).
|
||||||
|
|||||||
@ -7,7 +7,7 @@
|
|||||||
"dependencies": [],
|
"dependencies": [],
|
||||||
"codeowners": ["@rsp2k"],
|
"codeowners": ["@rsp2k"],
|
||||||
"requirements": ["omni-pca==2026.5.11"],
|
"requirements": ["omni-pca==2026.5.11"],
|
||||||
"documentation": "https://git.supported.systems/warehack.ing/omni-pca",
|
"documentation": "https://github.com/rsp2k/omni-pca",
|
||||||
"issue_tracker": "https://git.supported.systems/warehack.ing/omni-pca/issues",
|
"issue_tracker": "https://github.com/rsp2k/omni-pca/issues",
|
||||||
"integration_type": "hub"
|
"integration_type": "hub"
|
||||||
}
|
}
|
||||||
|
|||||||
|
Before Width: | Height: | Size: 107 KiB After Width: | Height: | Size: 108 KiB |
|
Before Width: | Height: | Size: 130 KiB After Width: | Height: | Size: 128 KiB |
|
Before Width: | Height: | Size: 106 KiB After Width: | Height: | Size: 107 KiB |
|
Before Width: | Height: | Size: 220 KiB After Width: | Height: | Size: 221 KiB |
|
Before Width: | Height: | Size: 85 KiB After Width: | Height: | Size: 86 KiB |
|
Before Width: | Height: | Size: 189 KiB After Width: | Height: | Size: 190 KiB |
BIN
dev/artifacts/screenshots/2026-05-11/01-overview.png
Normal file
|
After Width: | Height: | Size: 102 KiB |
BIN
dev/artifacts/screenshots/2026-05-11/02-integrations-list.png
Normal file
|
After Width: | Height: | Size: 128 KiB |
BIN
dev/artifacts/screenshots/2026-05-11/03-omni-pca-config.png
Normal file
|
After Width: | Height: | Size: 122 KiB |
BIN
dev/artifacts/screenshots/2026-05-11/04-panel-device.png
Normal file
|
After Width: | Height: | Size: 287 KiB |
BIN
dev/artifacts/screenshots/2026-05-11/05-entities-omni.png
Normal file
|
After Width: | Height: | Size: 86 KiB |
BIN
dev/artifacts/screenshots/2026-05-11/06-developer-states.png
Normal file
|
After Width: | Height: | Size: 190 KiB |
@ -34,7 +34,10 @@ engine = ["astral>=2.2,<3"]
|
|||||||
omni-pca = "omni_pca.__main__:main"
|
omni-pca = "omni_pca.__main__:main"
|
||||||
|
|
||||||
[project.urls]
|
[project.urls]
|
||||||
Repository = "https://git.supported.systems/warehack.ing/omni-pca"
|
Repository = "https://github.com/rsp2k/omni-pca"
|
||||||
|
Issues = "https://github.com/rsp2k/omni-pca/issues"
|
||||||
|
Changelog = "https://github.com/rsp2k/omni-pca/blob/main/CHANGELOG.md"
|
||||||
|
Documentation = "https://hai-omni-pro-ii.warehack.ing/"
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
requires = ["uv_build>=0.11.8,<0.12.0"]
|
requires = ["uv_build>=0.11.8,<0.12.0"]
|
||||||
|
|||||||
@ -48,7 +48,16 @@ from dataclasses import dataclass, field
|
|||||||
from datetime import date, datetime, time, timedelta, timezone
|
from datetime import date, datetime, time, timedelta, timezone
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
from .programs import Days, Program, ProgramType, TimeKind
|
from .programs import (
|
||||||
|
CondArgType,
|
||||||
|
CondOP,
|
||||||
|
Days,
|
||||||
|
MiscConditional,
|
||||||
|
Program,
|
||||||
|
ProgramCond,
|
||||||
|
ProgramType,
|
||||||
|
TimeKind,
|
||||||
|
)
|
||||||
|
|
||||||
_log = logging.getLogger(__name__)
|
_log = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -605,6 +614,357 @@ def evaluate_conditions(
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------
|
||||||
|
# State-aware AND/OR evaluator
|
||||||
|
# --------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class _UnsupportedCondition(Exception):
|
||||||
|
"""Raised internally when an AND/OR record encodes a check we don't
|
||||||
|
yet evaluate. The caller (StateEvaluator) treats this as False —
|
||||||
|
"we can't prove this condition", so the chain stays guarded — and
|
||||||
|
logs once so the next semantic-decode pass has a punch list.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class StateEvaluator:
|
||||||
|
"""Decode AND/OR records against a :class:`MockState` snapshot.
|
||||||
|
|
||||||
|
Each AND/OR :class:`Program` record encodes either:
|
||||||
|
|
||||||
|
* **Traditional** (``and_op == 0``): a compact bit-packed condition
|
||||||
|
in the ``cond`` u16 selecting a family (ProgramCond: ZONE / CTRL /
|
||||||
|
TIME / SEC / OTHER) with an inline operand. Decoded per
|
||||||
|
``clsText.GetConditionalText`` (clsText.cs:2224-2274).
|
||||||
|
|
||||||
|
* **Structured** (``and_op > 0``): a ``Arg1 OP Arg2`` triple where
|
||||||
|
Arg1 and Arg2 are typed references (Zone / Unit / Thermostat /
|
||||||
|
TimeDate / Constant / …) plus per-type field selectors. The
|
||||||
|
operator is one of :class:`CondOP` (EQ / NE / LT / GT / …).
|
||||||
|
|
||||||
|
Coverage in this initial cut:
|
||||||
|
|
||||||
|
Traditional:
|
||||||
|
* ZONE family — zone secure / not-ready against MockZoneState
|
||||||
|
* CTRL family — unit on / off against MockUnitState
|
||||||
|
* SEC family — area in security-mode against MockAreaState
|
||||||
|
* OTHER family: ``NEVER`` (always false), ``LIGHT``/``DARK``
|
||||||
|
approximated via the engine's sun events when location is set,
|
||||||
|
``AC_POWER_OFF``/``ON`` (no model — return False),
|
||||||
|
``BATTERY_LOW``/``OK`` (no model — return False)
|
||||||
|
* TIME family (time-clock enabled/disabled) — no MockState
|
||||||
|
slot for time-clock toggles, returns False
|
||||||
|
|
||||||
|
Structured:
|
||||||
|
* Zone.CurrentState / ArmingState (== const)
|
||||||
|
* Unit.CurrentState / Level (== / >= / <= const)
|
||||||
|
* Thermostat.CurrentTemp / Humidity / setpoints (numeric compare)
|
||||||
|
* TimeDate.Month / Day / DayOfWeek / Hour / Minute (numeric compare)
|
||||||
|
|
||||||
|
Anything else returns ``False`` and logs at DEBUG once per
|
||||||
|
condition class — keeps a chain *guarded* rather than fired-too-
|
||||||
|
eagerly when we can't yet decode the predicate.
|
||||||
|
|
||||||
|
Time-related comparisons need a :class:`Clock`; pass one to honor
|
||||||
|
Hour/Minute/DayOfWeek predicates correctly. Without a clock those
|
||||||
|
return False.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
state,
|
||||||
|
*,
|
||||||
|
clock: Clock | None = None,
|
||||||
|
location: PanelLocation | None = None,
|
||||||
|
) -> None:
|
||||||
|
self._state = state
|
||||||
|
self._clock = clock
|
||||||
|
self._location = location
|
||||||
|
|
||||||
|
def __call__(self, condition: Program) -> bool:
|
||||||
|
"""Evaluate one AND/OR record against the bound MockState.
|
||||||
|
|
||||||
|
Treated as a plain predicate so callers can pass this instance
|
||||||
|
directly to ``ProgramEngine.set_condition_evaluator``.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if condition.and_op == CondOP.ARG1_TRADITIONAL:
|
||||||
|
return self._eval_traditional(condition)
|
||||||
|
return self._eval_structured(condition)
|
||||||
|
except _UnsupportedCondition as err:
|
||||||
|
_log.debug("evaluator: unsupported condition: %s", err)
|
||||||
|
return False
|
||||||
|
except Exception:
|
||||||
|
_log.exception("evaluator: condition evaluation crashed")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# ---- Traditional ------------------------------------------------------
|
||||||
|
|
||||||
|
def _eval_traditional(self, c: Program) -> bool:
|
||||||
|
"""Decode a Traditional AND record.
|
||||||
|
|
||||||
|
Per ``clsConditionLine.Cond`` (clsConditionLine.cs:17-33), the
|
||||||
|
compact-form cond is split across two AND-record slots:
|
||||||
|
|
||||||
|
* disk byte 1 (= ``and_family``) carries the compact's high byte
|
||||||
|
(the family + selector encoding from GetConditionalText)
|
||||||
|
* disk bytes 3-4 (= ``and_arg1_ix``, ``and_instance`` derived
|
||||||
|
from ``cond2 >> 8``) carry the compact's low byte (the
|
||||||
|
object index, shifted into the high half)
|
||||||
|
|
||||||
|
Family decoding mirrors clsText.GetConditionalText (clsText.cs:
|
||||||
|
2224-2274):
|
||||||
|
|
||||||
|
family & 0xFC == 0x00 → OTHER (low 4 bits = MiscConditional)
|
||||||
|
family & 0xFC == 0x04 → ZONE (bit 0x02 = NOT_READY, else SECURE)
|
||||||
|
family & 0xFC == 0x08 → CTRL (bit 0x02 = ON, else OFF)
|
||||||
|
family & 0xFC == 0x0C → TIME (bit 0x02 = ENABLED, else DIS.)
|
||||||
|
family >= 0x10 → SEC (high nibble = mode, low = area)
|
||||||
|
"""
|
||||||
|
family = c.and_family
|
||||||
|
instance = c.and_instance
|
||||||
|
family_major = family & 0xFC
|
||||||
|
secondary = bool(family & 0x02) # selector bit within the family
|
||||||
|
if family_major == 0x00:
|
||||||
|
return self._eval_other(family & 0x0F)
|
||||||
|
if family_major == ProgramCond.ZONE:
|
||||||
|
return self._eval_traditional_zone(instance, want_not_ready=secondary)
|
||||||
|
if family_major == ProgramCond.CTRL:
|
||||||
|
return self._eval_traditional_ctrl(instance, want_on=secondary)
|
||||||
|
if family_major == ProgramCond.TIME:
|
||||||
|
# Time-clock enabled / disabled — MockState doesn't model
|
||||||
|
# the enable bit, so we conservatively report disabled.
|
||||||
|
return False
|
||||||
|
# 0x10 and above: SEC family — high nibble = mode, low nibble = area.
|
||||||
|
return self._eval_traditional_sec(
|
||||||
|
area=family & 0x0F, mode=(family >> 4) & 0x07,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _eval_traditional_zone(self, zone_num: int, *, want_not_ready: bool) -> bool:
|
||||||
|
"""SECURE matches ``current_state == 0``; NOT_READY matches any
|
||||||
|
nonzero current_state (per the panel's display semantics)."""
|
||||||
|
zone = self._state.zones.get(zone_num)
|
||||||
|
if zone is None:
|
||||||
|
# Undefined zone reads as SECURE (matches real-panel behaviour
|
||||||
|
# when a programmed zone slot doesn't exist).
|
||||||
|
return not want_not_ready
|
||||||
|
if want_not_ready:
|
||||||
|
return zone.current_state != 0
|
||||||
|
return zone.current_state == 0
|
||||||
|
|
||||||
|
def _eval_traditional_ctrl(self, unit_num: int, *, want_on: bool) -> bool:
|
||||||
|
"""ON matches any nonzero ``state``; OFF matches ``state == 0``.
|
||||||
|
|
||||||
|
MockUnitState.state encodes 0=off, 1=on, 100..200=dim level —
|
||||||
|
all nonzero values count as "on" for this predicate, which
|
||||||
|
matches the panel's binary on/off display.
|
||||||
|
"""
|
||||||
|
unit = self._state.units.get(unit_num)
|
||||||
|
if unit is None:
|
||||||
|
return not want_on # missing unit reads as OFF
|
||||||
|
on = unit.state != 0
|
||||||
|
return on == want_on
|
||||||
|
|
||||||
|
def _eval_traditional_sec(self, *, area: int, mode: int) -> bool:
|
||||||
|
"""Area in security-mode N. ``area == 0`` means "any area in
|
||||||
|
this mode" per GetConditionalText:2262 — without a multi-area
|
||||||
|
model we approximate by checking area 1."""
|
||||||
|
if area == 0:
|
||||||
|
area = 1
|
||||||
|
a = self._state.areas.get(area)
|
||||||
|
if a is None:
|
||||||
|
return False
|
||||||
|
return a.mode == mode
|
||||||
|
|
||||||
|
def _eval_other(self, misc_code: int) -> bool:
|
||||||
|
"""OTHER family: low 4 bits = enuMiscConditional."""
|
||||||
|
try:
|
||||||
|
cat = MiscConditional(misc_code)
|
||||||
|
except ValueError:
|
||||||
|
raise _UnsupportedCondition(f"unknown misc condition {misc_code}")
|
||||||
|
if cat == MiscConditional.NONE:
|
||||||
|
return True
|
||||||
|
if cat == MiscConditional.NEVER:
|
||||||
|
return False
|
||||||
|
if cat in (MiscConditional.LIGHT, MiscConditional.DARK):
|
||||||
|
light = self._is_light_outside()
|
||||||
|
if light is None:
|
||||||
|
# Can't determine — be conservative, don't fire either way.
|
||||||
|
return False
|
||||||
|
return light if cat == MiscConditional.LIGHT else not light
|
||||||
|
# PHONE_*, AC_POWER_*, BATTERY_*, ENERGY_COST_* — no MockState
|
||||||
|
# model for any of these yet. Conservatively False.
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _is_light_outside(self) -> bool | None:
|
||||||
|
"""Approximate "is the sun up" against the engine's PanelLocation
|
||||||
|
+ clock. Returns None when clock or location is missing
|
||||||
|
(caller decides what to do with the indeterminate result)."""
|
||||||
|
if self._clock is None or self._location is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
now = self._clock.now()
|
||||||
|
sunrise, sunset = _sun_events(self._location, now.date())
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
return sunrise <= now <= sunset
|
||||||
|
|
||||||
|
# ---- Structured -------------------------------------------------------
|
||||||
|
|
||||||
|
def _eval_structured(self, c: Program) -> bool:
|
||||||
|
"""Evaluate ``Arg1 OP Arg2`` style conditions.
|
||||||
|
|
||||||
|
Resolves Arg1 and Arg2 to numeric values via :meth:`_resolve_arg`
|
||||||
|
then compares with the operator. Comparison operators are
|
||||||
|
straightforward integer math; AND/OR/XOR at this layer are
|
||||||
|
treated as bitwise reductions on the resolved values (matches
|
||||||
|
the C# operator semantics for those uncommon op codes).
|
||||||
|
"""
|
||||||
|
op = c.and_op
|
||||||
|
arg1 = self._resolve_arg(
|
||||||
|
c.and_arg1_argtype, c.and_arg1_ix, c.and_arg1_field,
|
||||||
|
)
|
||||||
|
arg2 = self._resolve_arg(
|
||||||
|
c.and_arg2_argtype, c.and_arg2_ix, c.and_arg2_field,
|
||||||
|
)
|
||||||
|
if arg1 is None or arg2 is None:
|
||||||
|
return False
|
||||||
|
if op == CondOP.ARG1_EQ_ARG2:
|
||||||
|
return arg1 == arg2
|
||||||
|
if op == CondOP.ARG1_NE_ARG2:
|
||||||
|
return arg1 != arg2
|
||||||
|
if op == CondOP.ARG1_LT_ARG2:
|
||||||
|
return arg1 < arg2
|
||||||
|
if op == CondOP.ARG1_GT_ARG2:
|
||||||
|
return arg1 > arg2
|
||||||
|
if op == CondOP.ARG1_ODD:
|
||||||
|
return (arg1 & 1) == 1
|
||||||
|
if op == CondOP.ARG1_EVEN:
|
||||||
|
return (arg1 & 1) == 0
|
||||||
|
# MULTIPLE / IN / NOT_IN are bitfield checks the panel uses for
|
||||||
|
# day-of-week and area-set tests. arg2 is the bitmask.
|
||||||
|
if op == CondOP.ARG1_MULTIPLE_ARG2:
|
||||||
|
return arg2 != 0 and (arg1 % arg2) == 0
|
||||||
|
if op == CondOP.ARG1_IN_ARG2:
|
||||||
|
return bool(arg1 & arg2)
|
||||||
|
if op == CondOP.ARG1_NOT_IN_ARG2:
|
||||||
|
return not bool(arg1 & arg2)
|
||||||
|
raise _UnsupportedCondition(f"unknown structured op {op}")
|
||||||
|
|
||||||
|
def _resolve_arg(
|
||||||
|
self, argtype: int, ix: int, field: int,
|
||||||
|
) -> int | None:
|
||||||
|
"""Return the numeric value of one Arg side, or None if it can't
|
||||||
|
be resolved (unknown type, missing object, missing clock).
|
||||||
|
"""
|
||||||
|
if argtype == CondArgType.CONSTANT:
|
||||||
|
return ix
|
||||||
|
if argtype == CondArgType.ZONE:
|
||||||
|
return self._resolve_zone_field(ix, field)
|
||||||
|
if argtype == CondArgType.UNIT:
|
||||||
|
return self._resolve_unit_field(ix, field)
|
||||||
|
if argtype == CondArgType.THERMOSTAT:
|
||||||
|
return self._resolve_thermostat_field(ix, field)
|
||||||
|
if argtype == CondArgType.AREA:
|
||||||
|
return self._resolve_area_field(ix, field)
|
||||||
|
if argtype == CondArgType.TIME_DATE:
|
||||||
|
return self._resolve_timedate_field(field)
|
||||||
|
# USER_SETTING, AUXILLARY, AUDIO, ACCESS_CONTROL, MESSAGE,
|
||||||
|
# SYSTEM — no MockState models for these. Treat as unresolved
|
||||||
|
# so the comparison returns False.
|
||||||
|
raise _UnsupportedCondition(f"unsupported argtype {argtype}")
|
||||||
|
|
||||||
|
def _resolve_zone_field(self, ix: int, field: int) -> int | None:
|
||||||
|
zone = self._state.zones.get(ix)
|
||||||
|
if zone is None:
|
||||||
|
return None
|
||||||
|
# enuZoneField: LoopReading=1, CurrentState=2, ArmingState=3, AlarmState=4
|
||||||
|
if field == 1:
|
||||||
|
return zone.loop
|
||||||
|
if field == 2:
|
||||||
|
return zone.current_state
|
||||||
|
if field == 3:
|
||||||
|
return zone.arming_state
|
||||||
|
if field == 4:
|
||||||
|
return zone.latched_state
|
||||||
|
raise _UnsupportedCondition(f"zone field {field}")
|
||||||
|
|
||||||
|
def _resolve_unit_field(self, ix: int, field: int) -> int | None:
|
||||||
|
unit = self._state.units.get(ix)
|
||||||
|
if unit is None:
|
||||||
|
return None
|
||||||
|
# enuUnitField: CurrentState=1, PreviousState=2, Timer=3, Level=4
|
||||||
|
if field == 1:
|
||||||
|
# 0 = off, 1 = on, 100..200 = dim. The panel treats anything
|
||||||
|
# non-zero as "on" at this granularity.
|
||||||
|
return 1 if unit.state != 0 else 0
|
||||||
|
if field == 3:
|
||||||
|
return unit.time_remaining
|
||||||
|
if field == 4:
|
||||||
|
# Level: panel returns 0..100% — derive from state byte.
|
||||||
|
if unit.state >= 100:
|
||||||
|
return unit.state - 100
|
||||||
|
return 100 if unit.state == 1 else 0
|
||||||
|
raise _UnsupportedCondition(f"unit field {field}")
|
||||||
|
|
||||||
|
def _resolve_thermostat_field(self, ix: int, field: int) -> int | None:
|
||||||
|
t = self._state.thermostats.get(ix)
|
||||||
|
if t is None:
|
||||||
|
return None
|
||||||
|
# enuThermostatField map (subset MockState has data for):
|
||||||
|
# CurrentTemp=1, HeatSetpt=2, CoolSetpt=3, SystemMode=4, FanMode=5,
|
||||||
|
# HoldMode=6, Humidity=9, HumidifySetpoint=10, DehumidifySetpoint=11,
|
||||||
|
# OutdoorTemperature=12
|
||||||
|
return {
|
||||||
|
1: t.temperature_raw,
|
||||||
|
2: t.heat_setpoint_raw,
|
||||||
|
3: t.cool_setpoint_raw,
|
||||||
|
4: t.system_mode,
|
||||||
|
5: t.fan_mode,
|
||||||
|
6: t.hold_mode,
|
||||||
|
9: t.humidity_raw,
|
||||||
|
10: t.humidify_setpoint_raw,
|
||||||
|
11: t.dehumidify_setpoint_raw,
|
||||||
|
12: t.outdoor_temperature_raw,
|
||||||
|
}.get(field, None)
|
||||||
|
|
||||||
|
def _resolve_area_field(self, ix: int, field: int) -> int | None:
|
||||||
|
area = self._state.areas.get(ix)
|
||||||
|
if area is None:
|
||||||
|
return None
|
||||||
|
# No enuAreaField source — be permissive: field 1 = mode.
|
||||||
|
if field == 1:
|
||||||
|
return area.mode
|
||||||
|
raise _UnsupportedCondition(f"area field {field}")
|
||||||
|
|
||||||
|
def _resolve_timedate_field(self, field: int) -> int | None:
|
||||||
|
if self._clock is None:
|
||||||
|
return None
|
||||||
|
now = self._clock.now()
|
||||||
|
# enuTimeDateField: Date=1, Year=2, Month=3, Day=4, DayOfWeek=5,
|
||||||
|
# Time=6, DST_Flag=7, Hour=8, Minute=9, SunriseSunset=10.
|
||||||
|
if field == 2:
|
||||||
|
return now.year
|
||||||
|
if field == 3:
|
||||||
|
return now.month
|
||||||
|
if field == 4:
|
||||||
|
return now.day
|
||||||
|
if field == 5:
|
||||||
|
# DayOfWeek: panel uses 1=Mon..7=Sun per clsHAC. Python
|
||||||
|
# weekday() returns Mon=0..Sun=6 — add 1.
|
||||||
|
return now.weekday() + 1
|
||||||
|
if field == 6:
|
||||||
|
# Time-of-day encoded as (hour * 60 + minute) — minutes since
|
||||||
|
# midnight. Matches the C# packing in GetComplexConditionText.
|
||||||
|
return now.hour * 60 + now.minute
|
||||||
|
if field == 8:
|
||||||
|
return now.hour
|
||||||
|
if field == 9:
|
||||||
|
return now.minute
|
||||||
|
# Date / DST_Flag / SunriseSunset — not modelled here.
|
||||||
|
raise _UnsupportedCondition(f"timedate field {field}")
|
||||||
|
|
||||||
|
|
||||||
# --------------------------------------------------------------------------
|
# --------------------------------------------------------------------------
|
||||||
# Engine
|
# Engine
|
||||||
# --------------------------------------------------------------------------
|
# --------------------------------------------------------------------------
|
||||||
@ -687,10 +1047,27 @@ class ProgramEngine:
|
|||||||
bool. The default returns True for every AND, False for every
|
bool. The default returns True for every AND, False for every
|
||||||
OR (a degenerate evaluator that means "all chains' first AND
|
OR (a degenerate evaluator that means "all chains' first AND
|
||||||
groups always pass" — useful as a smoke-test default, not for
|
groups always pass" — useful as a smoke-test default, not for
|
||||||
real automation). Real callers supply a state-aware evaluator.
|
real automation).
|
||||||
|
|
||||||
|
For real automation, call :meth:`use_state_evaluator` instead
|
||||||
|
(or build your own :class:`StateEvaluator` and pass it here).
|
||||||
"""
|
"""
|
||||||
self._condition_evaluator = fn
|
self._condition_evaluator = fn
|
||||||
|
|
||||||
|
def use_state_evaluator(self) -> "StateEvaluator":
|
||||||
|
"""Install a :class:`StateEvaluator` bound to this engine's
|
||||||
|
``MockState``, clock, and location. Returns the new evaluator
|
||||||
|
so the caller can introspect it.
|
||||||
|
|
||||||
|
Equivalent to ``engine.set_condition_evaluator(
|
||||||
|
StateEvaluator(panel.state, clock=clock, location=loc))``.
|
||||||
|
"""
|
||||||
|
evaluator = StateEvaluator(
|
||||||
|
self._panel.state, clock=self._clock, location=self._location,
|
||||||
|
)
|
||||||
|
self._condition_evaluator = evaluator
|
||||||
|
return evaluator
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _default_condition_evaluator(condition: Program) -> bool:
|
def _default_condition_evaluator(condition: Program) -> bool:
|
||||||
"""Stub evaluator — caller should override via set_condition_evaluator."""
|
"""Stub evaluator — caller should override via set_condition_evaluator."""
|
||||||
|
|||||||
@ -19,7 +19,13 @@ from omni_pca.program_engine import (
|
|||||||
RealClock,
|
RealClock,
|
||||||
classify,
|
classify,
|
||||||
)
|
)
|
||||||
from omni_pca.programs import Days, Program, ProgramType
|
from omni_pca.programs import (
|
||||||
|
CondArgType,
|
||||||
|
CondOP,
|
||||||
|
Days,
|
||||||
|
Program,
|
||||||
|
ProgramType,
|
||||||
|
)
|
||||||
|
|
||||||
CONTROLLER_KEY = bytes(range(16))
|
CONTROLLER_KEY = bytes(range(16))
|
||||||
|
|
||||||
@ -1017,3 +1023,332 @@ async def test_engine_every_chain_fires_on_interval() -> None:
|
|||||||
await clock.advance_to(t0 + timedelta(seconds=60 * tick + 1))
|
await clock.advance_to(t0 + timedelta(seconds=60 * tick + 1))
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
assert engine.metrics.clausal_fired == 3
|
assert engine.metrics.clausal_fired == 3
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Phase 6: detailed AND/OR semantics (StateEvaluator) ----------------
|
||||||
|
|
||||||
|
|
||||||
|
from omni_pca.mock_panel import ( # noqa: E402
|
||||||
|
MockAreaState,
|
||||||
|
MockThermostatState,
|
||||||
|
MockUnitState,
|
||||||
|
MockZoneState,
|
||||||
|
)
|
||||||
|
from omni_pca.program_engine import StateEvaluator # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
def _and_traditional(
|
||||||
|
slot: int, family: int, instance: int = 0,
|
||||||
|
) -> Program:
|
||||||
|
"""Build a Traditional (OP=0) AND record.
|
||||||
|
|
||||||
|
Per clsConditionLine.Cond synthesis (clsConditionLine.cs:17-33),
|
||||||
|
the family byte lives in disk byte 1 = ``and_family`` (Python's
|
||||||
|
``cond & 0xFF``), and the object instance lives in disk byte 3
|
||||||
|
= ``and_instance`` (Python's ``(cond2 >> 8) & 0xFF``).
|
||||||
|
"""
|
||||||
|
return Program(
|
||||||
|
slot=slot, prog_type=int(ProgramType.AND),
|
||||||
|
cond=family & 0xFF, # byte 1 = family; byte 2 (OP) = 0
|
||||||
|
cond2=(instance & 0xFF) << 8, # byte 3 = instance; byte 4 = 0
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _and_structured(
|
||||||
|
slot: int,
|
||||||
|
op: int,
|
||||||
|
arg1_type: int, arg1_ix: int, arg1_field: int,
|
||||||
|
arg2_type: int, arg2_ix: int, arg2_field: int = 0,
|
||||||
|
) -> Program:
|
||||||
|
"""Build a Structured (OP>0) AND record.
|
||||||
|
|
||||||
|
Field layout per programs.py decoders:
|
||||||
|
cond high byte (>>8 & 0xFF) = op (and_op)
|
||||||
|
cond low byte (& 0xFF) = arg1_argtype (and_arg1_argtype)
|
||||||
|
cond2 = arg1_ix (and_arg1_ix)
|
||||||
|
cmd = arg1_field (and_arg1_field)
|
||||||
|
par = arg2_argtype (and_arg2_argtype)
|
||||||
|
pr2 = arg2_ix (and_arg2_ix)
|
||||||
|
month = arg2_field (and_arg2_field)
|
||||||
|
"""
|
||||||
|
return Program(
|
||||||
|
slot=slot, prog_type=int(ProgramType.AND),
|
||||||
|
cond=(op << 8) | arg1_type,
|
||||||
|
cond2=arg1_ix,
|
||||||
|
cmd=arg1_field,
|
||||||
|
par=arg2_type,
|
||||||
|
pr2=arg2_ix,
|
||||||
|
month=arg2_field,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _state_with(**kwargs) -> MockState:
|
||||||
|
return MockState(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Traditional ZONE family -------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_zone_secure_passes_when_state_zero() -> None:
|
||||||
|
state = _state_with(zones={
|
||||||
|
7: MockZoneState(name="FRONT DOOR", current_state=0),
|
||||||
|
})
|
||||||
|
ev = StateEvaluator(state)
|
||||||
|
# ZONE family = 0x04 (secure variant); instance = 7
|
||||||
|
cond = _and_traditional(1, family=0x04, instance=7)
|
||||||
|
assert ev(cond) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_zone_secure_fails_when_tripped() -> None:
|
||||||
|
state = _state_with(zones={
|
||||||
|
7: MockZoneState(name="FRONT DOOR", current_state=1), # not-ready
|
||||||
|
})
|
||||||
|
ev = StateEvaluator(state)
|
||||||
|
cond = _and_traditional(1, family=0x04, instance=7)
|
||||||
|
assert ev(cond) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_zone_not_ready_passes_when_tripped() -> None:
|
||||||
|
state = _state_with(zones={
|
||||||
|
7: MockZoneState(name="FRONT DOOR", current_state=1),
|
||||||
|
})
|
||||||
|
ev = StateEvaluator(state)
|
||||||
|
# family 0x06 = ZONE + NOT_READY selector bit
|
||||||
|
cond = _and_traditional(1, family=0x06, instance=7)
|
||||||
|
assert ev(cond) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_zone_undefined_is_secure() -> None:
|
||||||
|
"""Undefined zone reads as SECURE — matches real-panel behaviour
|
||||||
|
when a programmed zone slot doesn't exist."""
|
||||||
|
ev = StateEvaluator(_state_with()) # no zones
|
||||||
|
secure_check = _and_traditional(1, family=0x04, instance=99)
|
||||||
|
not_ready_check = _and_traditional(2, family=0x06, instance=99)
|
||||||
|
assert ev(secure_check) is True
|
||||||
|
assert ev(not_ready_check) is False
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Traditional CTRL family --------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_unit_on_passes_when_state_one() -> None:
|
||||||
|
state = _state_with(units={5: MockUnitState(name="LAMP", state=1)})
|
||||||
|
ev = StateEvaluator(state)
|
||||||
|
# CTRL family + ON selector = 0x0A
|
||||||
|
cond = _and_traditional(1, family=0x0A, instance=5)
|
||||||
|
assert ev(cond) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_unit_off_passes_when_state_zero() -> None:
|
||||||
|
state = _state_with(units={5: MockUnitState(name="LAMP", state=0)})
|
||||||
|
ev = StateEvaluator(state)
|
||||||
|
# CTRL family + OFF selector = 0x08
|
||||||
|
cond = _and_traditional(1, family=0x08, instance=5)
|
||||||
|
assert ev(cond) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_unit_on_for_dimmed_unit() -> None:
|
||||||
|
"""Dim level 50% → state=150; ON predicate should pass."""
|
||||||
|
state = _state_with(units={5: MockUnitState(state=150)})
|
||||||
|
ev = StateEvaluator(state)
|
||||||
|
cond = _and_traditional(1, family=0x0A, instance=5)
|
||||||
|
assert ev(cond) is True
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Traditional SEC family ---------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_security_mode_match() -> None:
|
||||||
|
"""SEC family: family byte = (mode << 4) | area. Area 1 in mode 2."""
|
||||||
|
state = _state_with(areas={1: MockAreaState(name="MAIN", mode=2)})
|
||||||
|
ev = StateEvaluator(state)
|
||||||
|
cond = _and_traditional(1, family=(2 << 4) | 1) # mode 2, area 1 = 0x21
|
||||||
|
assert ev(cond) is True
|
||||||
|
# Area in different mode → fails.
|
||||||
|
cond_wrong = _and_traditional(1, family=(3 << 4) | 1) # mode 3
|
||||||
|
assert ev(cond_wrong) is False
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Traditional OTHER family -------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_never_is_always_false() -> None:
|
||||||
|
"""MiscConditional.NEVER (= 1) → always False, regardless of state."""
|
||||||
|
ev = StateEvaluator(_state_with())
|
||||||
|
# OTHER family + NEVER misc = 0x01
|
||||||
|
cond = _and_traditional(1, family=0x01)
|
||||||
|
assert ev(cond) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_dark_without_location_is_false() -> None:
|
||||||
|
ev = StateEvaluator(_state_with()) # no location
|
||||||
|
# OTHER family + DARK misc = 0x03
|
||||||
|
cond = _and_traditional(1, family=0x03)
|
||||||
|
assert ev(cond) is False
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Structured: Zone fields --------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_structured_zone_current_state_eq() -> None:
|
||||||
|
"""Zone 5 CurrentState == 1 (not-ready) — structured form."""
|
||||||
|
state = _state_with(zones={5: MockZoneState(current_state=1)})
|
||||||
|
ev = StateEvaluator(state)
|
||||||
|
# EQ, Arg1=ZONE.CurrentState(2), Arg1IX=5, Arg2=CONSTANT, Arg2IX=1
|
||||||
|
cond = _and_structured(
|
||||||
|
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
|
||||||
|
arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2,
|
||||||
|
arg2_type=int(CondArgType.CONSTANT), arg2_ix=1,
|
||||||
|
)
|
||||||
|
assert ev(cond) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_structured_zone_state_ne() -> None:
|
||||||
|
state = _state_with(zones={5: MockZoneState(current_state=0)})
|
||||||
|
ev = StateEvaluator(state)
|
||||||
|
cond = _and_structured(
|
||||||
|
slot=1, op=int(CondOP.ARG1_NE_ARG2),
|
||||||
|
arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2,
|
||||||
|
arg2_type=int(CondArgType.CONSTANT), arg2_ix=0,
|
||||||
|
)
|
||||||
|
assert ev(cond) is False # state IS 0
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Structured: Thermostat fields --------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_structured_thermostat_temp_gt() -> None:
|
||||||
|
"""TEMPERATURE > 75 — structured comparison."""
|
||||||
|
# Thermostat raw temperature 168 ~ 76°F on the Omni linear scale,
|
||||||
|
# but we compare raw bytes here. Use a raw temp clearly above the
|
||||||
|
# constant.
|
||||||
|
state = _state_with(thermostats={
|
||||||
|
1: MockThermostatState(temperature_raw=80),
|
||||||
|
})
|
||||||
|
ev = StateEvaluator(state)
|
||||||
|
cond = _and_structured(
|
||||||
|
slot=1, op=int(CondOP.ARG1_GT_ARG2),
|
||||||
|
arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1,
|
||||||
|
arg2_type=int(CondArgType.CONSTANT), arg2_ix=75,
|
||||||
|
)
|
||||||
|
assert ev(cond) is True
|
||||||
|
# And < should fail.
|
||||||
|
cond_lt = _and_structured(
|
||||||
|
slot=2, op=int(CondOP.ARG1_LT_ARG2),
|
||||||
|
arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1,
|
||||||
|
arg2_type=int(CondArgType.CONSTANT), arg2_ix=75,
|
||||||
|
)
|
||||||
|
assert ev(cond_lt) is False
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Structured: TimeDate fields ----------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_structured_timedate_hour_compare() -> None:
|
||||||
|
"""Current hour == 22 — uses the clock."""
|
||||||
|
clock = FakeClock(datetime(2026, 5, 14, 22, 30, tzinfo=timezone.utc))
|
||||||
|
ev = StateEvaluator(_state_with(), clock=clock)
|
||||||
|
cond = _and_structured(
|
||||||
|
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
|
||||||
|
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8, # Hour
|
||||||
|
arg2_type=int(CondArgType.CONSTANT), arg2_ix=22,
|
||||||
|
)
|
||||||
|
assert ev(cond) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_structured_timedate_dayofweek() -> None:
|
||||||
|
"""DayOfWeek == 4 (Thursday)."""
|
||||||
|
clock = FakeClock(datetime(2026, 5, 14, 12, 0, tzinfo=timezone.utc)) # Thu
|
||||||
|
ev = StateEvaluator(_state_with(), clock=clock)
|
||||||
|
cond = _and_structured(
|
||||||
|
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
|
||||||
|
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=5, # DOW
|
||||||
|
arg2_type=int(CondArgType.CONSTANT), arg2_ix=4,
|
||||||
|
)
|
||||||
|
assert ev(cond) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_evaluator_structured_timedate_without_clock_is_false() -> None:
|
||||||
|
"""No clock → TimeDate predicates resolve to None → comparison False."""
|
||||||
|
ev = StateEvaluator(_state_with()) # no clock
|
||||||
|
cond = _and_structured(
|
||||||
|
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
|
||||||
|
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8,
|
||||||
|
arg2_type=int(CondArgType.CONSTANT), arg2_ix=22,
|
||||||
|
)
|
||||||
|
assert ev(cond) is False
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Engine integration --------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_engine_use_state_evaluator_gates_real_conditions() -> None:
|
||||||
|
"""End-to-end: WHEN + AND IF UNIT 3 ON + THEN UNIT 9 ON.
|
||||||
|
Chain fires only when unit 3 is on at the time the event arrives."""
|
||||||
|
button_evt = event_id_user_macro_button(5)
|
||||||
|
# WHEN button 5; AND IF unit 3 ON; THEN unit 9 ON.
|
||||||
|
when = _when(1, button_evt)
|
||||||
|
and_cond = _and_traditional(2, family=0x0A, instance=3) # CTRL + ON, unit 3
|
||||||
|
then = _then_action(3, int(Command.UNIT_ON), 9)
|
||||||
|
|
||||||
|
# Start with unit 3 OFF.
|
||||||
|
panel = MockPanel(
|
||||||
|
controller_key=CONTROLLER_KEY,
|
||||||
|
state=MockState(
|
||||||
|
programs={
|
||||||
|
1: when.encode_wire_bytes(),
|
||||||
|
2: and_cond.encode_wire_bytes(),
|
||||||
|
3: then.encode_wire_bytes(),
|
||||||
|
},
|
||||||
|
units={3: MockUnitState(state=0)},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
async with ProgramEngine(panel, clock=FakeClock(
|
||||||
|
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
|
||||||
|
)) as engine:
|
||||||
|
engine.use_state_evaluator()
|
||||||
|
# Unit 3 is OFF — chain blocked.
|
||||||
|
fired = await engine.emit_user_macro_button(5)
|
||||||
|
assert fired == 0
|
||||||
|
assert 9 not in panel.state.units or panel.state.units[9].state == 0
|
||||||
|
|
||||||
|
# Turn unit 3 ON, re-emit — chain fires.
|
||||||
|
panel.state.units[3].state = 1
|
||||||
|
fired = await engine.emit_user_macro_button(5)
|
||||||
|
assert fired == 1
|
||||||
|
assert panel.state.units[9].state == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_engine_state_evaluator_or_alternative() -> None:
|
||||||
|
"""WHEN + AND IF zone 5 secure + OR + AND IF zone 6 secure + THEN.
|
||||||
|
Fires if either zone is secure."""
|
||||||
|
button_evt = event_id_user_macro_button(5)
|
||||||
|
when = _when(1, button_evt)
|
||||||
|
and_z5 = _and_traditional(2, family=0x04, instance=5) # ZONE 5 secure
|
||||||
|
or_break = _or_cond(3) # OR boundary
|
||||||
|
and_z6 = _and_traditional(4, family=0x04, instance=6) # ZONE 6 secure
|
||||||
|
then = _then_action(5, int(Command.UNIT_ON), 10)
|
||||||
|
|
||||||
|
# Zone 5 tripped, Zone 6 secure → group A fails, group B passes.
|
||||||
|
panel = MockPanel(
|
||||||
|
controller_key=CONTROLLER_KEY,
|
||||||
|
state=MockState(
|
||||||
|
programs={
|
||||||
|
p.slot: p.encode_wire_bytes()
|
||||||
|
for p in (when, and_z5, or_break, and_z6, then)
|
||||||
|
},
|
||||||
|
zones={
|
||||||
|
5: MockZoneState(current_state=1),
|
||||||
|
6: MockZoneState(current_state=0),
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
async with ProgramEngine(panel, clock=FakeClock(
|
||||||
|
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
|
||||||
|
)) as engine:
|
||||||
|
engine.use_state_evaluator()
|
||||||
|
fired = await engine.emit_user_macro_button(5)
|
||||||
|
assert fired == 1 # OR-alternative passed
|
||||||
|
assert panel.state.units[10].state == 1
|
||||||
|
|||||||