Compare commits
No commits in common. "d4c4e530f60c6fee60cb5be71c263971d7e507df" and "cc32081caf65f6fc33cea0f29e18d589afb0bafb" have entirely different histories.
d4c4e530f6
...
cc32081caf
26
.github/workflows/validate.yml
vendored
@ -1,26 +0,0 @@
|
|||||||
name: Validate
|
|
||||||
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
branches: [main]
|
|
||||||
pull_request:
|
|
||||||
schedule:
|
|
||||||
- cron: "0 4 * * 1" # weekly Monday 04:00 UTC
|
|
||||||
workflow_dispatch:
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
hacs:
|
|
||||||
name: HACS validation
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
- uses: hacs/action@main
|
|
||||||
with:
|
|
||||||
category: integration
|
|
||||||
|
|
||||||
hassfest:
|
|
||||||
name: Hassfest
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
- uses: home-assistant/actions/hassfest@master
|
|
||||||
@ -82,4 +82,4 @@ First release. Working library + Home Assistant custom component, validated end-
|
|||||||
- **PyPI publish**: `omni-pca` not yet on PyPI; HA `manifest.json` requirements line will only resolve once it is. For now users either install the wheel manually or pip-install from a Git URL.
|
- **PyPI publish**: `omni-pca` not yet on PyPI; HA `manifest.json` requirements line will only resolve once it is. For now users either install the wheel manually or pip-install from a Git URL.
|
||||||
- **HACS submission**: pending live-panel validation.
|
- **HACS submission**: pending live-panel validation.
|
||||||
|
|
||||||
[2026.5.10]: https://github.com/rsp2k/omni-pca/releases/tag/v2026.5.10
|
[2026.5.10]: https://git.supported.systems/warehack.ing/omni-pca/releases/tag/v2026.5.10
|
||||||
|
|||||||
16
README.md
@ -4,7 +4,7 @@ Async Python client for HAI/Leviton Omni-Link II home automation panels — Omni
|
|||||||
|
|
||||||
Includes a Home Assistant custom component (`custom_components/omni_pca/`).
|
Includes a Home Assistant custom component (`custom_components/omni_pca/`).
|
||||||
|
|
||||||
**Project home:** <https://github.com/rsp2k/omni-pca>
|
**Project home:** <https://git.supported.systems/warehack.ing/omni-pca>
|
||||||
**Documentation:** <https://hai-omni-pro-ii.warehack.ing/>
|
**Documentation:** <https://hai-omni-pro-ii.warehack.ing/>
|
||||||
|
|
||||||
## Status
|
## Status
|
||||||
@ -18,14 +18,20 @@ The full byte-level protocol spec lives at <https://hai-omni-pro-ii.warehack.ing
|
|||||||
|
|
||||||
## Install
|
## Install
|
||||||
|
|
||||||
|
The library isn't on PyPI yet (pending), so install directly from the Gitea release:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
pip install omni-pca
|
# Pinned to a specific release (recommended)
|
||||||
|
pip install "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
|
||||||
|
|
||||||
|
# Or the wheel from the release page
|
||||||
|
pip install https://git.supported.systems/warehack.ing/omni-pca/releases/download/v2026.5.10/omni_pca-2026.5.10-py3-none-any.whl
|
||||||
|
|
||||||
# Or with uv
|
# Or with uv
|
||||||
uv add omni-pca
|
uv add "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
|
||||||
```
|
```
|
||||||
|
|
||||||
For Home Assistant users, install the integration through HACS — see the [HA install how-to](https://hai-omni-pro-ii.warehack.ing/how-to/install-in-home-assistant/).
|
Once published to PyPI, the canonical install will be `pip install omni-pca`.
|
||||||
|
|
||||||
## Quick start (library)
|
## Quick start (library)
|
||||||
|
|
||||||
@ -73,7 +79,7 @@ The HA integration picks the right client automatically based on the **Transport
|
|||||||
cd /path/to/your/homeassistant/config/
|
cd /path/to/your/homeassistant/config/
|
||||||
mkdir -p custom_components
|
mkdir -p custom_components
|
||||||
cd custom_components
|
cd custom_components
|
||||||
git clone https://github.com/rsp2k/omni-pca tmp-omni
|
git clone https://git.supported.systems/warehack.ing/omni-pca tmp-omni
|
||||||
cp -r tmp-omni/custom_components/omni_pca .
|
cp -r tmp-omni/custom_components/omni_pca .
|
||||||
rm -rf tmp-omni
|
rm -rf tmp-omni
|
||||||
```
|
```
|
||||||
|
|||||||
@ -6,18 +6,16 @@ opens an encrypted session straight to the panel and listens for unsolicited
|
|||||||
push messages.
|
push messages.
|
||||||
|
|
||||||
This integration is the HA-facing wrapper around the
|
This integration is the HA-facing wrapper around the
|
||||||
[`omni-pca`](https://github.com/rsp2k/omni-pca) Python library; the library
|
[`omni-pca`](https://git.supported.systems/warehack.ing/omni-pca) Python library; the library
|
||||||
handles the wire protocol, this component surfaces it as HA entities.
|
handles the wire protocol, this component surfaces it as HA entities.
|
||||||
|
|
||||||
## Install
|
## Install
|
||||||
|
|
||||||
### HACS
|
### HACS (recommended once published)
|
||||||
|
|
||||||
1. HACS → Integrations → search **HAI / Leviton Omni Panel**.
|
1. HACS → Integrations → custom repository → add
|
||||||
2. Install, then restart Home Assistant.
|
`https://git.supported.systems/warehack.ing/omni-pca`, category **Integration**.
|
||||||
|
2. Install **HAI / Leviton Omni Panel**, then restart Home Assistant.
|
||||||
(If not yet in the HACS default catalog: HACS → Integrations → custom
|
|
||||||
repository → add `https://github.com/rsp2k/omni-pca`, category **Integration**.)
|
|
||||||
|
|
||||||
### Manual
|
### Manual
|
||||||
|
|
||||||
@ -123,6 +121,6 @@ hashed) — useful for bug reports.
|
|||||||
- **No entities for X**: only objects with a name configured on the panel
|
- **No entities for X**: only objects with a name configured on the panel
|
||||||
are discovered. PC Access's "Names" page is where they live.
|
are discovered. PC Access's "Names" page is where they live.
|
||||||
|
|
||||||
See the [parent README](https://github.com/rsp2k/omni-pca) for protocol /
|
See the [parent README](https://git.supported.systems/warehack.ing/omni-pca) for protocol /
|
||||||
library details. Detailed reverse-engineering notes are in
|
library details. Detailed reverse-engineering notes are in
|
||||||
[`docs/JOURNEY.md`](https://github.com/rsp2k/omni-pca/blob/main/docs/JOURNEY.md).
|
[`docs/JOURNEY.md`](https://git.supported.systems/warehack.ing/omni-pca/blob/main/docs/JOURNEY.md).
|
||||||
|
|||||||
@ -7,7 +7,7 @@
|
|||||||
"dependencies": [],
|
"dependencies": [],
|
||||||
"codeowners": ["@rsp2k"],
|
"codeowners": ["@rsp2k"],
|
||||||
"requirements": ["omni-pca==2026.5.11"],
|
"requirements": ["omni-pca==2026.5.11"],
|
||||||
"documentation": "https://github.com/rsp2k/omni-pca",
|
"documentation": "https://git.supported.systems/warehack.ing/omni-pca",
|
||||||
"issue_tracker": "https://github.com/rsp2k/omni-pca/issues",
|
"issue_tracker": "https://git.supported.systems/warehack.ing/omni-pca/issues",
|
||||||
"integration_type": "hub"
|
"integration_type": "hub"
|
||||||
}
|
}
|
||||||
|
|||||||
|
Before Width: | Height: | Size: 108 KiB After Width: | Height: | Size: 107 KiB |
|
Before Width: | Height: | Size: 128 KiB After Width: | Height: | Size: 130 KiB |
|
Before Width: | Height: | Size: 107 KiB After Width: | Height: | Size: 106 KiB |
|
Before Width: | Height: | Size: 221 KiB After Width: | Height: | Size: 220 KiB |
|
Before Width: | Height: | Size: 86 KiB After Width: | Height: | Size: 85 KiB |
|
Before Width: | Height: | Size: 190 KiB After Width: | Height: | Size: 189 KiB |
|
Before Width: | Height: | Size: 102 KiB |
|
Before Width: | Height: | Size: 128 KiB |
|
Before Width: | Height: | Size: 122 KiB |
|
Before Width: | Height: | Size: 287 KiB |
|
Before Width: | Height: | Size: 86 KiB |
|
Before Width: | Height: | Size: 190 KiB |
@ -34,10 +34,7 @@ engine = ["astral>=2.2,<3"]
|
|||||||
omni-pca = "omni_pca.__main__:main"
|
omni-pca = "omni_pca.__main__:main"
|
||||||
|
|
||||||
[project.urls]
|
[project.urls]
|
||||||
Repository = "https://github.com/rsp2k/omni-pca"
|
Repository = "https://git.supported.systems/warehack.ing/omni-pca"
|
||||||
Issues = "https://github.com/rsp2k/omni-pca/issues"
|
|
||||||
Changelog = "https://github.com/rsp2k/omni-pca/blob/main/CHANGELOG.md"
|
|
||||||
Documentation = "https://hai-omni-pro-ii.warehack.ing/"
|
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
requires = ["uv_build>=0.11.8,<0.12.0"]
|
requires = ["uv_build>=0.11.8,<0.12.0"]
|
||||||
|
|||||||
@ -48,16 +48,7 @@ from dataclasses import dataclass, field
|
|||||||
from datetime import date, datetime, time, timedelta, timezone
|
from datetime import date, datetime, time, timedelta, timezone
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
from .programs import (
|
from .programs import Days, Program, ProgramType, TimeKind
|
||||||
CondArgType,
|
|
||||||
CondOP,
|
|
||||||
Days,
|
|
||||||
MiscConditional,
|
|
||||||
Program,
|
|
||||||
ProgramCond,
|
|
||||||
ProgramType,
|
|
||||||
TimeKind,
|
|
||||||
)
|
|
||||||
|
|
||||||
_log = logging.getLogger(__name__)
|
_log = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -614,357 +605,6 @@ def evaluate_conditions(
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
# --------------------------------------------------------------------------
|
|
||||||
# State-aware AND/OR evaluator
|
|
||||||
# --------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
class _UnsupportedCondition(Exception):
|
|
||||||
"""Raised internally when an AND/OR record encodes a check we don't
|
|
||||||
yet evaluate. The caller (StateEvaluator) treats this as False —
|
|
||||||
"we can't prove this condition", so the chain stays guarded — and
|
|
||||||
logs once so the next semantic-decode pass has a punch list.
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
class StateEvaluator:
|
|
||||||
"""Decode AND/OR records against a :class:`MockState` snapshot.
|
|
||||||
|
|
||||||
Each AND/OR :class:`Program` record encodes either:
|
|
||||||
|
|
||||||
* **Traditional** (``and_op == 0``): a compact bit-packed condition
|
|
||||||
in the ``cond`` u16 selecting a family (ProgramCond: ZONE / CTRL /
|
|
||||||
TIME / SEC / OTHER) with an inline operand. Decoded per
|
|
||||||
``clsText.GetConditionalText`` (clsText.cs:2224-2274).
|
|
||||||
|
|
||||||
* **Structured** (``and_op > 0``): a ``Arg1 OP Arg2`` triple where
|
|
||||||
Arg1 and Arg2 are typed references (Zone / Unit / Thermostat /
|
|
||||||
TimeDate / Constant / …) plus per-type field selectors. The
|
|
||||||
operator is one of :class:`CondOP` (EQ / NE / LT / GT / …).
|
|
||||||
|
|
||||||
Coverage in this initial cut:
|
|
||||||
|
|
||||||
Traditional:
|
|
||||||
* ZONE family — zone secure / not-ready against MockZoneState
|
|
||||||
* CTRL family — unit on / off against MockUnitState
|
|
||||||
* SEC family — area in security-mode against MockAreaState
|
|
||||||
* OTHER family: ``NEVER`` (always false), ``LIGHT``/``DARK``
|
|
||||||
approximated via the engine's sun events when location is set,
|
|
||||||
``AC_POWER_OFF``/``ON`` (no model — return False),
|
|
||||||
``BATTERY_LOW``/``OK`` (no model — return False)
|
|
||||||
* TIME family (time-clock enabled/disabled) — no MockState
|
|
||||||
slot for time-clock toggles, returns False
|
|
||||||
|
|
||||||
Structured:
|
|
||||||
* Zone.CurrentState / ArmingState (== const)
|
|
||||||
* Unit.CurrentState / Level (== / >= / <= const)
|
|
||||||
* Thermostat.CurrentTemp / Humidity / setpoints (numeric compare)
|
|
||||||
* TimeDate.Month / Day / DayOfWeek / Hour / Minute (numeric compare)
|
|
||||||
|
|
||||||
Anything else returns ``False`` and logs at DEBUG once per
|
|
||||||
condition class — keeps a chain *guarded* rather than fired-too-
|
|
||||||
eagerly when we can't yet decode the predicate.
|
|
||||||
|
|
||||||
Time-related comparisons need a :class:`Clock`; pass one to honor
|
|
||||||
Hour/Minute/DayOfWeek predicates correctly. Without a clock those
|
|
||||||
return False.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
state,
|
|
||||||
*,
|
|
||||||
clock: Clock | None = None,
|
|
||||||
location: PanelLocation | None = None,
|
|
||||||
) -> None:
|
|
||||||
self._state = state
|
|
||||||
self._clock = clock
|
|
||||||
self._location = location
|
|
||||||
|
|
||||||
def __call__(self, condition: Program) -> bool:
|
|
||||||
"""Evaluate one AND/OR record against the bound MockState.
|
|
||||||
|
|
||||||
Treated as a plain predicate so callers can pass this instance
|
|
||||||
directly to ``ProgramEngine.set_condition_evaluator``.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
if condition.and_op == CondOP.ARG1_TRADITIONAL:
|
|
||||||
return self._eval_traditional(condition)
|
|
||||||
return self._eval_structured(condition)
|
|
||||||
except _UnsupportedCondition as err:
|
|
||||||
_log.debug("evaluator: unsupported condition: %s", err)
|
|
||||||
return False
|
|
||||||
except Exception:
|
|
||||||
_log.exception("evaluator: condition evaluation crashed")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# ---- Traditional ------------------------------------------------------
|
|
||||||
|
|
||||||
def _eval_traditional(self, c: Program) -> bool:
|
|
||||||
"""Decode a Traditional AND record.
|
|
||||||
|
|
||||||
Per ``clsConditionLine.Cond`` (clsConditionLine.cs:17-33), the
|
|
||||||
compact-form cond is split across two AND-record slots:
|
|
||||||
|
|
||||||
* disk byte 1 (= ``and_family``) carries the compact's high byte
|
|
||||||
(the family + selector encoding from GetConditionalText)
|
|
||||||
* disk bytes 3-4 (= ``and_arg1_ix``, ``and_instance`` derived
|
|
||||||
from ``cond2 >> 8``) carry the compact's low byte (the
|
|
||||||
object index, shifted into the high half)
|
|
||||||
|
|
||||||
Family decoding mirrors clsText.GetConditionalText (clsText.cs:
|
|
||||||
2224-2274):
|
|
||||||
|
|
||||||
family & 0xFC == 0x00 → OTHER (low 4 bits = MiscConditional)
|
|
||||||
family & 0xFC == 0x04 → ZONE (bit 0x02 = NOT_READY, else SECURE)
|
|
||||||
family & 0xFC == 0x08 → CTRL (bit 0x02 = ON, else OFF)
|
|
||||||
family & 0xFC == 0x0C → TIME (bit 0x02 = ENABLED, else DIS.)
|
|
||||||
family >= 0x10 → SEC (high nibble = mode, low = area)
|
|
||||||
"""
|
|
||||||
family = c.and_family
|
|
||||||
instance = c.and_instance
|
|
||||||
family_major = family & 0xFC
|
|
||||||
secondary = bool(family & 0x02) # selector bit within the family
|
|
||||||
if family_major == 0x00:
|
|
||||||
return self._eval_other(family & 0x0F)
|
|
||||||
if family_major == ProgramCond.ZONE:
|
|
||||||
return self._eval_traditional_zone(instance, want_not_ready=secondary)
|
|
||||||
if family_major == ProgramCond.CTRL:
|
|
||||||
return self._eval_traditional_ctrl(instance, want_on=secondary)
|
|
||||||
if family_major == ProgramCond.TIME:
|
|
||||||
# Time-clock enabled / disabled — MockState doesn't model
|
|
||||||
# the enable bit, so we conservatively report disabled.
|
|
||||||
return False
|
|
||||||
# 0x10 and above: SEC family — high nibble = mode, low nibble = area.
|
|
||||||
return self._eval_traditional_sec(
|
|
||||||
area=family & 0x0F, mode=(family >> 4) & 0x07,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _eval_traditional_zone(self, zone_num: int, *, want_not_ready: bool) -> bool:
|
|
||||||
"""SECURE matches ``current_state == 0``; NOT_READY matches any
|
|
||||||
nonzero current_state (per the panel's display semantics)."""
|
|
||||||
zone = self._state.zones.get(zone_num)
|
|
||||||
if zone is None:
|
|
||||||
# Undefined zone reads as SECURE (matches real-panel behaviour
|
|
||||||
# when a programmed zone slot doesn't exist).
|
|
||||||
return not want_not_ready
|
|
||||||
if want_not_ready:
|
|
||||||
return zone.current_state != 0
|
|
||||||
return zone.current_state == 0
|
|
||||||
|
|
||||||
def _eval_traditional_ctrl(self, unit_num: int, *, want_on: bool) -> bool:
|
|
||||||
"""ON matches any nonzero ``state``; OFF matches ``state == 0``.
|
|
||||||
|
|
||||||
MockUnitState.state encodes 0=off, 1=on, 100..200=dim level —
|
|
||||||
all nonzero values count as "on" for this predicate, which
|
|
||||||
matches the panel's binary on/off display.
|
|
||||||
"""
|
|
||||||
unit = self._state.units.get(unit_num)
|
|
||||||
if unit is None:
|
|
||||||
return not want_on # missing unit reads as OFF
|
|
||||||
on = unit.state != 0
|
|
||||||
return on == want_on
|
|
||||||
|
|
||||||
def _eval_traditional_sec(self, *, area: int, mode: int) -> bool:
|
|
||||||
"""Area in security-mode N. ``area == 0`` means "any area in
|
|
||||||
this mode" per GetConditionalText:2262 — without a multi-area
|
|
||||||
model we approximate by checking area 1."""
|
|
||||||
if area == 0:
|
|
||||||
area = 1
|
|
||||||
a = self._state.areas.get(area)
|
|
||||||
if a is None:
|
|
||||||
return False
|
|
||||||
return a.mode == mode
|
|
||||||
|
|
||||||
def _eval_other(self, misc_code: int) -> bool:
|
|
||||||
"""OTHER family: low 4 bits = enuMiscConditional."""
|
|
||||||
try:
|
|
||||||
cat = MiscConditional(misc_code)
|
|
||||||
except ValueError:
|
|
||||||
raise _UnsupportedCondition(f"unknown misc condition {misc_code}")
|
|
||||||
if cat == MiscConditional.NONE:
|
|
||||||
return True
|
|
||||||
if cat == MiscConditional.NEVER:
|
|
||||||
return False
|
|
||||||
if cat in (MiscConditional.LIGHT, MiscConditional.DARK):
|
|
||||||
light = self._is_light_outside()
|
|
||||||
if light is None:
|
|
||||||
# Can't determine — be conservative, don't fire either way.
|
|
||||||
return False
|
|
||||||
return light if cat == MiscConditional.LIGHT else not light
|
|
||||||
# PHONE_*, AC_POWER_*, BATTERY_*, ENERGY_COST_* — no MockState
|
|
||||||
# model for any of these yet. Conservatively False.
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _is_light_outside(self) -> bool | None:
|
|
||||||
"""Approximate "is the sun up" against the engine's PanelLocation
|
|
||||||
+ clock. Returns None when clock or location is missing
|
|
||||||
(caller decides what to do with the indeterminate result)."""
|
|
||||||
if self._clock is None or self._location is None:
|
|
||||||
return None
|
|
||||||
try:
|
|
||||||
now = self._clock.now()
|
|
||||||
sunrise, sunset = _sun_events(self._location, now.date())
|
|
||||||
except Exception:
|
|
||||||
return None
|
|
||||||
return sunrise <= now <= sunset
|
|
||||||
|
|
||||||
# ---- Structured -------------------------------------------------------
|
|
||||||
|
|
||||||
def _eval_structured(self, c: Program) -> bool:
|
|
||||||
"""Evaluate ``Arg1 OP Arg2`` style conditions.
|
|
||||||
|
|
||||||
Resolves Arg1 and Arg2 to numeric values via :meth:`_resolve_arg`
|
|
||||||
then compares with the operator. Comparison operators are
|
|
||||||
straightforward integer math; AND/OR/XOR at this layer are
|
|
||||||
treated as bitwise reductions on the resolved values (matches
|
|
||||||
the C# operator semantics for those uncommon op codes).
|
|
||||||
"""
|
|
||||||
op = c.and_op
|
|
||||||
arg1 = self._resolve_arg(
|
|
||||||
c.and_arg1_argtype, c.and_arg1_ix, c.and_arg1_field,
|
|
||||||
)
|
|
||||||
arg2 = self._resolve_arg(
|
|
||||||
c.and_arg2_argtype, c.and_arg2_ix, c.and_arg2_field,
|
|
||||||
)
|
|
||||||
if arg1 is None or arg2 is None:
|
|
||||||
return False
|
|
||||||
if op == CondOP.ARG1_EQ_ARG2:
|
|
||||||
return arg1 == arg2
|
|
||||||
if op == CondOP.ARG1_NE_ARG2:
|
|
||||||
return arg1 != arg2
|
|
||||||
if op == CondOP.ARG1_LT_ARG2:
|
|
||||||
return arg1 < arg2
|
|
||||||
if op == CondOP.ARG1_GT_ARG2:
|
|
||||||
return arg1 > arg2
|
|
||||||
if op == CondOP.ARG1_ODD:
|
|
||||||
return (arg1 & 1) == 1
|
|
||||||
if op == CondOP.ARG1_EVEN:
|
|
||||||
return (arg1 & 1) == 0
|
|
||||||
# MULTIPLE / IN / NOT_IN are bitfield checks the panel uses for
|
|
||||||
# day-of-week and area-set tests. arg2 is the bitmask.
|
|
||||||
if op == CondOP.ARG1_MULTIPLE_ARG2:
|
|
||||||
return arg2 != 0 and (arg1 % arg2) == 0
|
|
||||||
if op == CondOP.ARG1_IN_ARG2:
|
|
||||||
return bool(arg1 & arg2)
|
|
||||||
if op == CondOP.ARG1_NOT_IN_ARG2:
|
|
||||||
return not bool(arg1 & arg2)
|
|
||||||
raise _UnsupportedCondition(f"unknown structured op {op}")
|
|
||||||
|
|
||||||
def _resolve_arg(
|
|
||||||
self, argtype: int, ix: int, field: int,
|
|
||||||
) -> int | None:
|
|
||||||
"""Return the numeric value of one Arg side, or None if it can't
|
|
||||||
be resolved (unknown type, missing object, missing clock).
|
|
||||||
"""
|
|
||||||
if argtype == CondArgType.CONSTANT:
|
|
||||||
return ix
|
|
||||||
if argtype == CondArgType.ZONE:
|
|
||||||
return self._resolve_zone_field(ix, field)
|
|
||||||
if argtype == CondArgType.UNIT:
|
|
||||||
return self._resolve_unit_field(ix, field)
|
|
||||||
if argtype == CondArgType.THERMOSTAT:
|
|
||||||
return self._resolve_thermostat_field(ix, field)
|
|
||||||
if argtype == CondArgType.AREA:
|
|
||||||
return self._resolve_area_field(ix, field)
|
|
||||||
if argtype == CondArgType.TIME_DATE:
|
|
||||||
return self._resolve_timedate_field(field)
|
|
||||||
# USER_SETTING, AUXILLARY, AUDIO, ACCESS_CONTROL, MESSAGE,
|
|
||||||
# SYSTEM — no MockState models for these. Treat as unresolved
|
|
||||||
# so the comparison returns False.
|
|
||||||
raise _UnsupportedCondition(f"unsupported argtype {argtype}")
|
|
||||||
|
|
||||||
def _resolve_zone_field(self, ix: int, field: int) -> int | None:
|
|
||||||
zone = self._state.zones.get(ix)
|
|
||||||
if zone is None:
|
|
||||||
return None
|
|
||||||
# enuZoneField: LoopReading=1, CurrentState=2, ArmingState=3, AlarmState=4
|
|
||||||
if field == 1:
|
|
||||||
return zone.loop
|
|
||||||
if field == 2:
|
|
||||||
return zone.current_state
|
|
||||||
if field == 3:
|
|
||||||
return zone.arming_state
|
|
||||||
if field == 4:
|
|
||||||
return zone.latched_state
|
|
||||||
raise _UnsupportedCondition(f"zone field {field}")
|
|
||||||
|
|
||||||
def _resolve_unit_field(self, ix: int, field: int) -> int | None:
|
|
||||||
unit = self._state.units.get(ix)
|
|
||||||
if unit is None:
|
|
||||||
return None
|
|
||||||
# enuUnitField: CurrentState=1, PreviousState=2, Timer=3, Level=4
|
|
||||||
if field == 1:
|
|
||||||
# 0 = off, 1 = on, 100..200 = dim. The panel treats anything
|
|
||||||
# non-zero as "on" at this granularity.
|
|
||||||
return 1 if unit.state != 0 else 0
|
|
||||||
if field == 3:
|
|
||||||
return unit.time_remaining
|
|
||||||
if field == 4:
|
|
||||||
# Level: panel returns 0..100% — derive from state byte.
|
|
||||||
if unit.state >= 100:
|
|
||||||
return unit.state - 100
|
|
||||||
return 100 if unit.state == 1 else 0
|
|
||||||
raise _UnsupportedCondition(f"unit field {field}")
|
|
||||||
|
|
||||||
def _resolve_thermostat_field(self, ix: int, field: int) -> int | None:
|
|
||||||
t = self._state.thermostats.get(ix)
|
|
||||||
if t is None:
|
|
||||||
return None
|
|
||||||
# enuThermostatField map (subset MockState has data for):
|
|
||||||
# CurrentTemp=1, HeatSetpt=2, CoolSetpt=3, SystemMode=4, FanMode=5,
|
|
||||||
# HoldMode=6, Humidity=9, HumidifySetpoint=10, DehumidifySetpoint=11,
|
|
||||||
# OutdoorTemperature=12
|
|
||||||
return {
|
|
||||||
1: t.temperature_raw,
|
|
||||||
2: t.heat_setpoint_raw,
|
|
||||||
3: t.cool_setpoint_raw,
|
|
||||||
4: t.system_mode,
|
|
||||||
5: t.fan_mode,
|
|
||||||
6: t.hold_mode,
|
|
||||||
9: t.humidity_raw,
|
|
||||||
10: t.humidify_setpoint_raw,
|
|
||||||
11: t.dehumidify_setpoint_raw,
|
|
||||||
12: t.outdoor_temperature_raw,
|
|
||||||
}.get(field, None)
|
|
||||||
|
|
||||||
def _resolve_area_field(self, ix: int, field: int) -> int | None:
|
|
||||||
area = self._state.areas.get(ix)
|
|
||||||
if area is None:
|
|
||||||
return None
|
|
||||||
# No enuAreaField source — be permissive: field 1 = mode.
|
|
||||||
if field == 1:
|
|
||||||
return area.mode
|
|
||||||
raise _UnsupportedCondition(f"area field {field}")
|
|
||||||
|
|
||||||
def _resolve_timedate_field(self, field: int) -> int | None:
|
|
||||||
if self._clock is None:
|
|
||||||
return None
|
|
||||||
now = self._clock.now()
|
|
||||||
# enuTimeDateField: Date=1, Year=2, Month=3, Day=4, DayOfWeek=5,
|
|
||||||
# Time=6, DST_Flag=7, Hour=8, Minute=9, SunriseSunset=10.
|
|
||||||
if field == 2:
|
|
||||||
return now.year
|
|
||||||
if field == 3:
|
|
||||||
return now.month
|
|
||||||
if field == 4:
|
|
||||||
return now.day
|
|
||||||
if field == 5:
|
|
||||||
# DayOfWeek: panel uses 1=Mon..7=Sun per clsHAC. Python
|
|
||||||
# weekday() returns Mon=0..Sun=6 — add 1.
|
|
||||||
return now.weekday() + 1
|
|
||||||
if field == 6:
|
|
||||||
# Time-of-day encoded as (hour * 60 + minute) — minutes since
|
|
||||||
# midnight. Matches the C# packing in GetComplexConditionText.
|
|
||||||
return now.hour * 60 + now.minute
|
|
||||||
if field == 8:
|
|
||||||
return now.hour
|
|
||||||
if field == 9:
|
|
||||||
return now.minute
|
|
||||||
# Date / DST_Flag / SunriseSunset — not modelled here.
|
|
||||||
raise _UnsupportedCondition(f"timedate field {field}")
|
|
||||||
|
|
||||||
|
|
||||||
# --------------------------------------------------------------------------
|
# --------------------------------------------------------------------------
|
||||||
# Engine
|
# Engine
|
||||||
# --------------------------------------------------------------------------
|
# --------------------------------------------------------------------------
|
||||||
@ -1047,27 +687,10 @@ class ProgramEngine:
|
|||||||
bool. The default returns True for every AND, False for every
|
bool. The default returns True for every AND, False for every
|
||||||
OR (a degenerate evaluator that means "all chains' first AND
|
OR (a degenerate evaluator that means "all chains' first AND
|
||||||
groups always pass" — useful as a smoke-test default, not for
|
groups always pass" — useful as a smoke-test default, not for
|
||||||
real automation).
|
real automation). Real callers supply a state-aware evaluator.
|
||||||
|
|
||||||
For real automation, call :meth:`use_state_evaluator` instead
|
|
||||||
(or build your own :class:`StateEvaluator` and pass it here).
|
|
||||||
"""
|
"""
|
||||||
self._condition_evaluator = fn
|
self._condition_evaluator = fn
|
||||||
|
|
||||||
def use_state_evaluator(self) -> "StateEvaluator":
|
|
||||||
"""Install a :class:`StateEvaluator` bound to this engine's
|
|
||||||
``MockState``, clock, and location. Returns the new evaluator
|
|
||||||
so the caller can introspect it.
|
|
||||||
|
|
||||||
Equivalent to ``engine.set_condition_evaluator(
|
|
||||||
StateEvaluator(panel.state, clock=clock, location=loc))``.
|
|
||||||
"""
|
|
||||||
evaluator = StateEvaluator(
|
|
||||||
self._panel.state, clock=self._clock, location=self._location,
|
|
||||||
)
|
|
||||||
self._condition_evaluator = evaluator
|
|
||||||
return evaluator
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _default_condition_evaluator(condition: Program) -> bool:
|
def _default_condition_evaluator(condition: Program) -> bool:
|
||||||
"""Stub evaluator — caller should override via set_condition_evaluator."""
|
"""Stub evaluator — caller should override via set_condition_evaluator."""
|
||||||
|
|||||||
@ -19,13 +19,7 @@ from omni_pca.program_engine import (
|
|||||||
RealClock,
|
RealClock,
|
||||||
classify,
|
classify,
|
||||||
)
|
)
|
||||||
from omni_pca.programs import (
|
from omni_pca.programs import Days, Program, ProgramType
|
||||||
CondArgType,
|
|
||||||
CondOP,
|
|
||||||
Days,
|
|
||||||
Program,
|
|
||||||
ProgramType,
|
|
||||||
)
|
|
||||||
|
|
||||||
CONTROLLER_KEY = bytes(range(16))
|
CONTROLLER_KEY = bytes(range(16))
|
||||||
|
|
||||||
@ -1023,332 +1017,3 @@ async def test_engine_every_chain_fires_on_interval() -> None:
|
|||||||
await clock.advance_to(t0 + timedelta(seconds=60 * tick + 1))
|
await clock.advance_to(t0 + timedelta(seconds=60 * tick + 1))
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
assert engine.metrics.clausal_fired == 3
|
assert engine.metrics.clausal_fired == 3
|
||||||
|
|
||||||
|
|
||||||
# ---- Phase 6: detailed AND/OR semantics (StateEvaluator) ----------------
|
|
||||||
|
|
||||||
|
|
||||||
from omni_pca.mock_panel import ( # noqa: E402
|
|
||||||
MockAreaState,
|
|
||||||
MockThermostatState,
|
|
||||||
MockUnitState,
|
|
||||||
MockZoneState,
|
|
||||||
)
|
|
||||||
from omni_pca.program_engine import StateEvaluator # noqa: E402
|
|
||||||
|
|
||||||
|
|
||||||
def _and_traditional(
|
|
||||||
slot: int, family: int, instance: int = 0,
|
|
||||||
) -> Program:
|
|
||||||
"""Build a Traditional (OP=0) AND record.
|
|
||||||
|
|
||||||
Per clsConditionLine.Cond synthesis (clsConditionLine.cs:17-33),
|
|
||||||
the family byte lives in disk byte 1 = ``and_family`` (Python's
|
|
||||||
``cond & 0xFF``), and the object instance lives in disk byte 3
|
|
||||||
= ``and_instance`` (Python's ``(cond2 >> 8) & 0xFF``).
|
|
||||||
"""
|
|
||||||
return Program(
|
|
||||||
slot=slot, prog_type=int(ProgramType.AND),
|
|
||||||
cond=family & 0xFF, # byte 1 = family; byte 2 (OP) = 0
|
|
||||||
cond2=(instance & 0xFF) << 8, # byte 3 = instance; byte 4 = 0
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _and_structured(
|
|
||||||
slot: int,
|
|
||||||
op: int,
|
|
||||||
arg1_type: int, arg1_ix: int, arg1_field: int,
|
|
||||||
arg2_type: int, arg2_ix: int, arg2_field: int = 0,
|
|
||||||
) -> Program:
|
|
||||||
"""Build a Structured (OP>0) AND record.
|
|
||||||
|
|
||||||
Field layout per programs.py decoders:
|
|
||||||
cond high byte (>>8 & 0xFF) = op (and_op)
|
|
||||||
cond low byte (& 0xFF) = arg1_argtype (and_arg1_argtype)
|
|
||||||
cond2 = arg1_ix (and_arg1_ix)
|
|
||||||
cmd = arg1_field (and_arg1_field)
|
|
||||||
par = arg2_argtype (and_arg2_argtype)
|
|
||||||
pr2 = arg2_ix (and_arg2_ix)
|
|
||||||
month = arg2_field (and_arg2_field)
|
|
||||||
"""
|
|
||||||
return Program(
|
|
||||||
slot=slot, prog_type=int(ProgramType.AND),
|
|
||||||
cond=(op << 8) | arg1_type,
|
|
||||||
cond2=arg1_ix,
|
|
||||||
cmd=arg1_field,
|
|
||||||
par=arg2_type,
|
|
||||||
pr2=arg2_ix,
|
|
||||||
month=arg2_field,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _state_with(**kwargs) -> MockState:
|
|
||||||
return MockState(**kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
# ---- Traditional ZONE family -------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_zone_secure_passes_when_state_zero() -> None:
|
|
||||||
state = _state_with(zones={
|
|
||||||
7: MockZoneState(name="FRONT DOOR", current_state=0),
|
|
||||||
})
|
|
||||||
ev = StateEvaluator(state)
|
|
||||||
# ZONE family = 0x04 (secure variant); instance = 7
|
|
||||||
cond = _and_traditional(1, family=0x04, instance=7)
|
|
||||||
assert ev(cond) is True
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_zone_secure_fails_when_tripped() -> None:
|
|
||||||
state = _state_with(zones={
|
|
||||||
7: MockZoneState(name="FRONT DOOR", current_state=1), # not-ready
|
|
||||||
})
|
|
||||||
ev = StateEvaluator(state)
|
|
||||||
cond = _and_traditional(1, family=0x04, instance=7)
|
|
||||||
assert ev(cond) is False
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_zone_not_ready_passes_when_tripped() -> None:
|
|
||||||
state = _state_with(zones={
|
|
||||||
7: MockZoneState(name="FRONT DOOR", current_state=1),
|
|
||||||
})
|
|
||||||
ev = StateEvaluator(state)
|
|
||||||
# family 0x06 = ZONE + NOT_READY selector bit
|
|
||||||
cond = _and_traditional(1, family=0x06, instance=7)
|
|
||||||
assert ev(cond) is True
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_zone_undefined_is_secure() -> None:
|
|
||||||
"""Undefined zone reads as SECURE — matches real-panel behaviour
|
|
||||||
when a programmed zone slot doesn't exist."""
|
|
||||||
ev = StateEvaluator(_state_with()) # no zones
|
|
||||||
secure_check = _and_traditional(1, family=0x04, instance=99)
|
|
||||||
not_ready_check = _and_traditional(2, family=0x06, instance=99)
|
|
||||||
assert ev(secure_check) is True
|
|
||||||
assert ev(not_ready_check) is False
|
|
||||||
|
|
||||||
|
|
||||||
# ---- Traditional CTRL family --------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_unit_on_passes_when_state_one() -> None:
|
|
||||||
state = _state_with(units={5: MockUnitState(name="LAMP", state=1)})
|
|
||||||
ev = StateEvaluator(state)
|
|
||||||
# CTRL family + ON selector = 0x0A
|
|
||||||
cond = _and_traditional(1, family=0x0A, instance=5)
|
|
||||||
assert ev(cond) is True
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_unit_off_passes_when_state_zero() -> None:
|
|
||||||
state = _state_with(units={5: MockUnitState(name="LAMP", state=0)})
|
|
||||||
ev = StateEvaluator(state)
|
|
||||||
# CTRL family + OFF selector = 0x08
|
|
||||||
cond = _and_traditional(1, family=0x08, instance=5)
|
|
||||||
assert ev(cond) is True
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_unit_on_for_dimmed_unit() -> None:
|
|
||||||
"""Dim level 50% → state=150; ON predicate should pass."""
|
|
||||||
state = _state_with(units={5: MockUnitState(state=150)})
|
|
||||||
ev = StateEvaluator(state)
|
|
||||||
cond = _and_traditional(1, family=0x0A, instance=5)
|
|
||||||
assert ev(cond) is True
|
|
||||||
|
|
||||||
|
|
||||||
# ---- Traditional SEC family ---------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_security_mode_match() -> None:
|
|
||||||
"""SEC family: family byte = (mode << 4) | area. Area 1 in mode 2."""
|
|
||||||
state = _state_with(areas={1: MockAreaState(name="MAIN", mode=2)})
|
|
||||||
ev = StateEvaluator(state)
|
|
||||||
cond = _and_traditional(1, family=(2 << 4) | 1) # mode 2, area 1 = 0x21
|
|
||||||
assert ev(cond) is True
|
|
||||||
# Area in different mode → fails.
|
|
||||||
cond_wrong = _and_traditional(1, family=(3 << 4) | 1) # mode 3
|
|
||||||
assert ev(cond_wrong) is False
|
|
||||||
|
|
||||||
|
|
||||||
# ---- Traditional OTHER family -------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_never_is_always_false() -> None:
|
|
||||||
"""MiscConditional.NEVER (= 1) → always False, regardless of state."""
|
|
||||||
ev = StateEvaluator(_state_with())
|
|
||||||
# OTHER family + NEVER misc = 0x01
|
|
||||||
cond = _and_traditional(1, family=0x01)
|
|
||||||
assert ev(cond) is False
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_dark_without_location_is_false() -> None:
|
|
||||||
ev = StateEvaluator(_state_with()) # no location
|
|
||||||
# OTHER family + DARK misc = 0x03
|
|
||||||
cond = _and_traditional(1, family=0x03)
|
|
||||||
assert ev(cond) is False
|
|
||||||
|
|
||||||
|
|
||||||
# ---- Structured: Zone fields --------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_structured_zone_current_state_eq() -> None:
|
|
||||||
"""Zone 5 CurrentState == 1 (not-ready) — structured form."""
|
|
||||||
state = _state_with(zones={5: MockZoneState(current_state=1)})
|
|
||||||
ev = StateEvaluator(state)
|
|
||||||
# EQ, Arg1=ZONE.CurrentState(2), Arg1IX=5, Arg2=CONSTANT, Arg2IX=1
|
|
||||||
cond = _and_structured(
|
|
||||||
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
|
|
||||||
arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2,
|
|
||||||
arg2_type=int(CondArgType.CONSTANT), arg2_ix=1,
|
|
||||||
)
|
|
||||||
assert ev(cond) is True
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_structured_zone_state_ne() -> None:
|
|
||||||
state = _state_with(zones={5: MockZoneState(current_state=0)})
|
|
||||||
ev = StateEvaluator(state)
|
|
||||||
cond = _and_structured(
|
|
||||||
slot=1, op=int(CondOP.ARG1_NE_ARG2),
|
|
||||||
arg1_type=int(CondArgType.ZONE), arg1_ix=5, arg1_field=2,
|
|
||||||
arg2_type=int(CondArgType.CONSTANT), arg2_ix=0,
|
|
||||||
)
|
|
||||||
assert ev(cond) is False # state IS 0
|
|
||||||
|
|
||||||
|
|
||||||
# ---- Structured: Thermostat fields --------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_structured_thermostat_temp_gt() -> None:
|
|
||||||
"""TEMPERATURE > 75 — structured comparison."""
|
|
||||||
# Thermostat raw temperature 168 ~ 76°F on the Omni linear scale,
|
|
||||||
# but we compare raw bytes here. Use a raw temp clearly above the
|
|
||||||
# constant.
|
|
||||||
state = _state_with(thermostats={
|
|
||||||
1: MockThermostatState(temperature_raw=80),
|
|
||||||
})
|
|
||||||
ev = StateEvaluator(state)
|
|
||||||
cond = _and_structured(
|
|
||||||
slot=1, op=int(CondOP.ARG1_GT_ARG2),
|
|
||||||
arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1,
|
|
||||||
arg2_type=int(CondArgType.CONSTANT), arg2_ix=75,
|
|
||||||
)
|
|
||||||
assert ev(cond) is True
|
|
||||||
# And < should fail.
|
|
||||||
cond_lt = _and_structured(
|
|
||||||
slot=2, op=int(CondOP.ARG1_LT_ARG2),
|
|
||||||
arg1_type=int(CondArgType.THERMOSTAT), arg1_ix=1, arg1_field=1,
|
|
||||||
arg2_type=int(CondArgType.CONSTANT), arg2_ix=75,
|
|
||||||
)
|
|
||||||
assert ev(cond_lt) is False
|
|
||||||
|
|
||||||
|
|
||||||
# ---- Structured: TimeDate fields ----------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_structured_timedate_hour_compare() -> None:
|
|
||||||
"""Current hour == 22 — uses the clock."""
|
|
||||||
clock = FakeClock(datetime(2026, 5, 14, 22, 30, tzinfo=timezone.utc))
|
|
||||||
ev = StateEvaluator(_state_with(), clock=clock)
|
|
||||||
cond = _and_structured(
|
|
||||||
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
|
|
||||||
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8, # Hour
|
|
||||||
arg2_type=int(CondArgType.CONSTANT), arg2_ix=22,
|
|
||||||
)
|
|
||||||
assert ev(cond) is True
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_structured_timedate_dayofweek() -> None:
|
|
||||||
"""DayOfWeek == 4 (Thursday)."""
|
|
||||||
clock = FakeClock(datetime(2026, 5, 14, 12, 0, tzinfo=timezone.utc)) # Thu
|
|
||||||
ev = StateEvaluator(_state_with(), clock=clock)
|
|
||||||
cond = _and_structured(
|
|
||||||
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
|
|
||||||
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=5, # DOW
|
|
||||||
arg2_type=int(CondArgType.CONSTANT), arg2_ix=4,
|
|
||||||
)
|
|
||||||
assert ev(cond) is True
|
|
||||||
|
|
||||||
|
|
||||||
def test_state_evaluator_structured_timedate_without_clock_is_false() -> None:
|
|
||||||
"""No clock → TimeDate predicates resolve to None → comparison False."""
|
|
||||||
ev = StateEvaluator(_state_with()) # no clock
|
|
||||||
cond = _and_structured(
|
|
||||||
slot=1, op=int(CondOP.ARG1_EQ_ARG2),
|
|
||||||
arg1_type=int(CondArgType.TIME_DATE), arg1_ix=0, arg1_field=8,
|
|
||||||
arg2_type=int(CondArgType.CONSTANT), arg2_ix=22,
|
|
||||||
)
|
|
||||||
assert ev(cond) is False
|
|
||||||
|
|
||||||
|
|
||||||
# ---- Engine integration --------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_engine_use_state_evaluator_gates_real_conditions() -> None:
|
|
||||||
"""End-to-end: WHEN + AND IF UNIT 3 ON + THEN UNIT 9 ON.
|
|
||||||
Chain fires only when unit 3 is on at the time the event arrives."""
|
|
||||||
button_evt = event_id_user_macro_button(5)
|
|
||||||
# WHEN button 5; AND IF unit 3 ON; THEN unit 9 ON.
|
|
||||||
when = _when(1, button_evt)
|
|
||||||
and_cond = _and_traditional(2, family=0x0A, instance=3) # CTRL + ON, unit 3
|
|
||||||
then = _then_action(3, int(Command.UNIT_ON), 9)
|
|
||||||
|
|
||||||
# Start with unit 3 OFF.
|
|
||||||
panel = MockPanel(
|
|
||||||
controller_key=CONTROLLER_KEY,
|
|
||||||
state=MockState(
|
|
||||||
programs={
|
|
||||||
1: when.encode_wire_bytes(),
|
|
||||||
2: and_cond.encode_wire_bytes(),
|
|
||||||
3: then.encode_wire_bytes(),
|
|
||||||
},
|
|
||||||
units={3: MockUnitState(state=0)},
|
|
||||||
),
|
|
||||||
)
|
|
||||||
async with ProgramEngine(panel, clock=FakeClock(
|
|
||||||
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
|
|
||||||
)) as engine:
|
|
||||||
engine.use_state_evaluator()
|
|
||||||
# Unit 3 is OFF — chain blocked.
|
|
||||||
fired = await engine.emit_user_macro_button(5)
|
|
||||||
assert fired == 0
|
|
||||||
assert 9 not in panel.state.units or panel.state.units[9].state == 0
|
|
||||||
|
|
||||||
# Turn unit 3 ON, re-emit — chain fires.
|
|
||||||
panel.state.units[3].state = 1
|
|
||||||
fired = await engine.emit_user_macro_button(5)
|
|
||||||
assert fired == 1
|
|
||||||
assert panel.state.units[9].state == 1
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_engine_state_evaluator_or_alternative() -> None:
|
|
||||||
"""WHEN + AND IF zone 5 secure + OR + AND IF zone 6 secure + THEN.
|
|
||||||
Fires if either zone is secure."""
|
|
||||||
button_evt = event_id_user_macro_button(5)
|
|
||||||
when = _when(1, button_evt)
|
|
||||||
and_z5 = _and_traditional(2, family=0x04, instance=5) # ZONE 5 secure
|
|
||||||
or_break = _or_cond(3) # OR boundary
|
|
||||||
and_z6 = _and_traditional(4, family=0x04, instance=6) # ZONE 6 secure
|
|
||||||
then = _then_action(5, int(Command.UNIT_ON), 10)
|
|
||||||
|
|
||||||
# Zone 5 tripped, Zone 6 secure → group A fails, group B passes.
|
|
||||||
panel = MockPanel(
|
|
||||||
controller_key=CONTROLLER_KEY,
|
|
||||||
state=MockState(
|
|
||||||
programs={
|
|
||||||
p.slot: p.encode_wire_bytes()
|
|
||||||
for p in (when, and_z5, or_break, and_z6, then)
|
|
||||||
},
|
|
||||||
zones={
|
|
||||||
5: MockZoneState(current_state=1),
|
|
||||||
6: MockZoneState(current_state=0),
|
|
||||||
},
|
|
||||||
),
|
|
||||||
)
|
|
||||||
async with ProgramEngine(panel, clock=FakeClock(
|
|
||||||
datetime(2026, 5, 14, 0, 0, tzinfo=timezone.utc)
|
|
||||||
)) as engine:
|
|
||||||
engine.use_state_evaluator()
|
|
||||||
fired = await engine.emit_user_macro_button(5)
|
|
||||||
assert fired == 1 # OR-alternative passed
|
|
||||||
assert panel.state.units[10].state == 1
|
|
||||||
|
|||||||