mock_panel: v1 UploadPrograms streaming + program-echo tests

MockPanel only handled the v2 (single-slot, request/reply)
UploadProgram path. v1 panels use a streaming variant:
client sends UploadPrograms (bare), panel emits one ProgramData
per defined slot, ack-walked by the client, terminated by EOD.

Wire layout is byte-identical to v2 — only the envelope opcode
and stream pattern differ (clsHAC.OL1ReadConfig at clsHAC.cs:4403,
4538-4540, 4642-4651). The mock now mirrors the UploadNames
streaming pattern with its own cursor.

Tests cover both the populated-state stream-then-EOD case and
the empty-state immediate-EOD case, alongside the existing v2
single-slot round-trip tests.
This commit is contained in:
Ryan Malloy 2026-05-12 18:21:05 -06:00
parent 290ba5a78d
commit 933d326dd3
2 changed files with 139 additions and 9 deletions

View File

@ -344,6 +344,9 @@ class MockPanel:
# v1 UploadNames cursor: index into self._v1_name_stream() while a # v1 UploadNames cursor: index into self._v1_name_stream() while a
# streaming download is in flight, ``None`` when no stream active. # streaming download is in flight, ``None`` when no stream active.
self._upload_names_cursor: int | None = None self._upload_names_cursor: int | None = None
# v1 UploadPrograms cursor: index into self._v1_program_stream() while
# a streaming download is in flight, ``None`` when no stream active.
self._upload_programs_cursor: int | None = None
# -------- public observables (handy in tests) -------- # -------- public observables (handy in tests) --------
@ -1236,10 +1239,14 @@ class MockPanel:
return self._v1_reply_auxiliary_status(payload), () return self._v1_reply_auxiliary_status(payload), ()
if opcode == OmniLinkMessageType.UploadNames: if opcode == OmniLinkMessageType.UploadNames:
return self._v1_start_upload_names_stream(), () return self._v1_start_upload_names_stream(), ()
if opcode == OmniLinkMessageType.UploadPrograms:
return self._v1_start_upload_programs_stream(), ()
if opcode == OmniLinkMessageType.Ack: if opcode == OmniLinkMessageType.Ack:
# During an active UploadNames stream, each client Ack # During an active stream, each client Ack advances the
# advances the cursor. With no active stream, drop silently # appropriate cursor. With no active stream, Ack as a request
# (Ack as a request opcode is only meaningful mid-stream). # opcode is only meaningful mid-stream — NAK it.
if self._upload_programs_cursor is not None:
return self._v1_advance_upload_programs_stream(), ()
if self._upload_names_cursor is not None: if self._upload_names_cursor is not None:
return self._v1_advance_upload_names_stream(), () return self._v1_advance_upload_names_stream(), ()
return _build_v1_nak(opcode), () return _build_v1_nak(opcode), ()
@ -1450,6 +1457,46 @@ class MockPanel:
t, n, name = names[self._upload_names_cursor] t, n, name = names[self._upload_names_cursor]
return self._v1_namedata_msg(t, n, name) return self._v1_namedata_msg(t, n, name)
# ---- UploadPrograms streaming ----
#
# Wire flow per clsHAC.OL1ReadConfig (clsHAC.cs:4403, 4538-4540, 4642-4651):
# client → UploadPrograms (bare)
# panel → ProgramData (slot N body)
# client → Ack
# panel → ProgramData (slot N+1 body) ...
# panel → EOD
#
# ProgramData body layout matches v2 exactly (clsOLMsgProgramData
# mirrors clsOL2MsgProgramData byte-for-byte) — both prepend a 2-byte
# BE ProgramNumber to the 14-byte wire body. Only the outer envelope
# opcode differs (v1 vs v2).
def _v1_program_stream(self) -> list[int]:
"""Sorted list of defined program slot numbers."""
return sorted(self.state.programs)
def _v1_programdata_msg(self, slot: int) -> Message:
body = self.state.programs.get(slot, b"\x00" * 14)
payload = bytes([(slot >> 8) & 0xFF, slot & 0xFF]) + body
return encode_v1(OmniLinkMessageType.ProgramData, payload)
def _v1_start_upload_programs_stream(self) -> Message:
slots = self._v1_program_stream()
if not slots:
self._upload_programs_cursor = None
return _build_v1_eod()
self._upload_programs_cursor = 0
return self._v1_programdata_msg(slots[0])
def _v1_advance_upload_programs_stream(self) -> Message:
slots = self._v1_program_stream()
assert self._upload_programs_cursor is not None
self._upload_programs_cursor += 1
if self._upload_programs_cursor >= len(slots):
self._upload_programs_cursor = None
return _build_v1_eod()
return self._v1_programdata_msg(slots[self._upload_programs_cursor])
# ---- v1 Command / ExecuteSecurityCommand wrappers ---- # ---- v1 Command / ExecuteSecurityCommand wrappers ----
# The wire payload format is byte-identical to v2 (clsOLMsgCommand.cs # The wire payload format is byte-identical to v2 (clsOLMsgCommand.cs
# vs clsOL2MsgCommand.cs); only the outer opcode and the reply Ack # vs clsOL2MsgCommand.cs); only the outer opcode and the reply Ack

View File

@ -1,10 +1,16 @@
"""End-to-end wire round-trip: client → MockPanel → program decoded. """End-to-end wire round-trip: client → MockPanel → program decoded.
Seeds the MockPanel with a known :class:`Program`, drives the v2 Seeds the MockPanel with known :class:`Program` records, exercises
``UploadProgram`` opcode through a real TCP socket, and asserts the both wire dialects, and asserts the decoded result equals what was
decoded round-trip equals the seeded Program. Proves the on-the-wire seeded.
framing (2-byte BE ProgramNumber header + 14-byte body wrapped in a
``ProgramData`` reply) lines up with our decoder. * v2 (TCP, request/response per slot): drives ``UploadProgram`` once
per slot. Proves the per-program framing (2-byte BE ProgramNumber +
14-byte body wrapped in a ``ProgramData`` reply).
* v1 (UDP, streaming): drives bare ``UploadPrograms``, ack-walks the
streamed ``ProgramData`` replies to ``EOD``. Proves the streaming
lock-step matches the panel's behaviour described in
``clsHAC.OL1ReadConfig`` (clsHAC.cs:4403, 4538-4540, 4642-4651).
""" """
from __future__ import annotations from __future__ import annotations
@ -15,8 +21,9 @@ import pytest
from omni_pca.connection import OmniConnection from omni_pca.connection import OmniConnection
from omni_pca.mock_panel import MockPanel, MockState from omni_pca.mock_panel import MockPanel, MockState
from omni_pca.opcodes import OmniLink2MessageType from omni_pca.opcodes import OmniLink2MessageType, OmniLinkMessageType
from omni_pca.programs import Days, Program, ProgramType from omni_pca.programs import Days, Program, ProgramType
from omni_pca.v1 import OmniClientV1
CONTROLLER_KEY = bytes.fromhex("000102030405060708090a0b0c0d0e0f") CONTROLLER_KEY = bytes.fromhex("000102030405060708090a0b0c0d0e0f")
@ -134,3 +141,79 @@ async def test_v2_upload_program_event_type_no_swap_on_wire() -> None:
decoded = Program.from_wire_bytes(body, slot=7) decoded = Program.from_wire_bytes(body, slot=7)
assert decoded.month == 5 assert decoded.month == 5
assert decoded.day == 12 assert decoded.day == 12
# ---- v1 streaming -------------------------------------------------------
def _decode_v1_programdata(payload: bytes) -> tuple[int, Program]:
"""Strip the BE ProgramNumber prefix from a v1 ``ProgramData`` payload,
decode the 14-byte body. Mirrors the v2 helper inline above."""
assert len(payload) >= 2 + 14
slot = (payload[0] << 8) | payload[1]
return slot, Program.from_wire_bytes(payload[2 : 2 + 14], slot=slot)
@pytest.mark.asyncio
async def test_v1_upload_programs_streams_all_seeded_slots() -> None:
"""The v1 ``UploadPrograms`` opcode is bare; the panel streams one
``ProgramData`` reply per defined slot, each followed by a client Ack,
terminated by ``EOD``. Order is by ascending slot index which is
what we feed back from ``sorted(state.programs)``."""
seeded = {
12: Program(slot=12, prog_type=int(ProgramType.TIMED), cmd=3, hour=6, minute=0,
days=int(Days.MONDAY | Days.FRIDAY)),
42: Program(slot=42, prog_type=int(ProgramType.TIMED), cond=0x8D09, cond2=0x9B09,
cmd=0x44, par=3, pr2=0x0100, month=8, day=12,
days=int(Days.MONDAY), hour=7, minute=15),
99: Program(slot=99, prog_type=int(ProgramType.EVENT), cmd=5, month=5, day=12),
}
panel = MockPanel(
controller_key=CONTROLLER_KEY,
state=MockState(programs={s: p.encode_wire_bytes() for s, p in seeded.items()}),
)
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
received: dict[int, Program] = {}
async for reply in c.connection.iter_streaming(
OmniLinkMessageType.UploadPrograms
):
assert reply.opcode == int(OmniLinkMessageType.ProgramData)
slot, prog = _decode_v1_programdata(reply.payload)
received[slot] = prog
assert set(received) == set(seeded)
for slot, want in seeded.items():
got = received[slot]
# Field-by-field — same checks as the v2 test, plus a slot equality.
assert got.slot == slot
assert got.prog_type == want.prog_type
assert got.cond == want.cond
assert got.cond2 == want.cond2
assert got.cmd == want.cmd
assert got.par == want.par
assert got.pr2 == want.pr2
assert got.month == want.month
assert got.day == want.day
assert got.days == want.days
assert got.hour == want.hour
assert got.minute == want.minute
@pytest.mark.asyncio
async def test_v1_upload_programs_empty_state_yields_immediate_eod() -> None:
"""No programs defined → the streaming iterator terminates without
yielding anything (the panel jumps straight to EOD)."""
panel = MockPanel(controller_key=CONTROLLER_KEY, state=MockState())
async with panel.serve(transport="udp") as (host, port):
async with OmniClientV1(
host=host, port=port, controller_key=CONTROLLER_KEY, timeout=2.0
) as c:
replies = [
r async for r in c.connection.iter_streaming(
OmniLinkMessageType.UploadPrograms
)
]
assert replies == []