Wire Stop button to cancel blocked firmware sweeps

Stop handler now calls cancel_operation() on the device bridge,
which sets a threading.Event that interrupts the 2s-timeout serial
read loop in send_with_timeout(). InterruptedError is caught
separately to prevent falling back to software sweep on cancel.

disconnect() uses acquire(timeout=5) with force-close fallback
instead of blocking lock acquisition — prevents deadlock when a
stuck worker holds the serial lock during shutdown.

Add 3 Textual async tests (pytest-asyncio) to verify Stop behavior:
firmware sweep stop, software sweep stop, and sweep restart.
This commit is contained in:
Ryan Malloy 2026-02-14 17:12:11 -07:00
parent 972c26b22f
commit e7e71c47d7
5 changed files with 204 additions and 2 deletions

View File

@ -158,6 +158,18 @@ class SerialBridge:
self._connected = True
self._menu = self._detect_menu()
def cancel_operation(self) -> None:
"""Signal any in-progress long-running operation to abort.
Safe to call from any thread. The cancel event is checked every
~2 seconds by ``send_with_timeout``.
"""
self._cancel.set()
def clear_cancel(self) -> None:
"""Reset the cancel event so future operations proceed normally."""
self._cancel.clear()
def disconnect(self) -> None:
"""Close the serial connection.
@ -166,13 +178,26 @@ class SerialBridge:
the port cleanly.
"""
self._cancel.set()
with self._lock:
if not self._lock.acquire(timeout=5):
# Lock held by dead/stuck worker — force-close the port
# so the blocked serial read raises an exception.
logger.warning("Lock acquisition timed out, force-closing port")
with contextlib.suppress(Exception):
self._proto.disconnect()
self._connected = False
self._menu = Menu.UNKNOWN
self._cancel.clear()
return
try:
with contextlib.suppress(Exception):
self._go_to_root()
self._proto.disconnect()
self._connected = False
self._menu = Menu.UNKNOWN
self._cancel.clear() # reset for potential reconnect
self._cancel.clear()
finally:
self._lock.release()
@property
def is_connected(self) -> bool:

View File

@ -261,6 +261,12 @@ class DemoDevice:
self._connected = True
self._menu = _DemoMenu.ROOT
def cancel_operation(self) -> None:
pass # Demo operations are instant, nothing to cancel.
def clear_cancel(self) -> None:
pass # No cancel state in demo mode.
def disconnect(self) -> None:
self._connected = False
self._menu = _DemoMenu.ROOT

View File

@ -442,11 +442,18 @@ class SignalScreen(Container):
try:
self._do_sweep_firmware(device)
return
except InterruptedError:
# User cancelled via Stop or app shutdown — don't retry.
log.info("Firmware sweep cancelled")
return
except Exception:
log.warning(
"Firmware sweep failed, falling back to software",
exc_info=True,
)
# Only fall through to software if still active.
if not self._sweeping:
return
with contextlib.suppress(Exception):
self.app.call_from_thread(
self._set_sweep_status,
@ -456,6 +463,9 @@ class SignalScreen(Container):
self._do_sweep_software(device)
finally:
self._sweeping = False
# Clear cancel so the next sweep doesn't immediately abort.
if self._device and hasattr(self._device, "clear_cancel"):
self._device.clear_cancel()
with contextlib.suppress(Exception):
self.app.call_from_thread(self._reset_sweep_buttons)
@ -641,6 +651,9 @@ class SignalScreen(Container):
def _handle_sweep_stop(self) -> None:
self._sweeping = False
# Cancel any blocked serial operation (firmware sweep).
if self._device and hasattr(self._device, "cancel_operation"):
self._device.cancel_operation()
self._set_sweep_status("Stopping...")
self._reset_sweep_buttons()
@ -676,6 +689,8 @@ class SignalScreen(Container):
self._do_scan_inner(device)
finally:
self._scanning = False
if self._device and hasattr(self._device, "clear_cancel"):
self._device.clear_cancel()
with contextlib.suppress(Exception):
self.app.call_from_thread(self._reset_scan_buttons)
@ -875,6 +890,8 @@ class SignalScreen(Container):
def _handle_scan_stop(self) -> None:
self._scanning = False
if self._device and hasattr(self._device, "cancel_operation"):
self._device.cancel_operation()
self._set_scan_status("Stopping...")
self._reset_scan_buttons()

0
tui/tests/__init__.py Normal file
View File

View File

@ -0,0 +1,154 @@
"""Test that the sweep Stop button actually stops a sweep.
Uses Textual's run_test() and pilot to drive the TUI in demo mode,
start a sweep, click Stop, and verify the sweep terminates.
"""
import asyncio
import pytest
from textual.widgets import Button
from birdcage_tui.app import BirdcageApp
@pytest.mark.asyncio
async def test_sweep_stop_firmware():
"""Stop button should abort a firmware sweep and reset UI state."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
# Wait for app to fully mount.
await pilot.pause()
# Navigate to Signal screen (F3).
await pilot.press("f3")
await pilot.pause()
# Switch to Sweep mode via the mode bar.
signal = app.query_one("SignalScreen")
signal.switch_mode("sweep")
await pilot.pause()
# Click Start Sweep.
start_btn = app.query_one("#btn-start-sweep", Button)
await pilot.click(start_btn)
# Give the firmware sweep worker time to start and complete
# (demo mode takes ~0.5s).
await asyncio.sleep(1.0)
# Click Stop.
stop_btn = app.query_one("#btn-stop-sweep", Button)
await pilot.click(stop_btn)
await pilot.pause()
# Verify: _sweeping should be False.
assert not signal._sweeping, "_sweeping should be False after Stop"
# Verify: Start button should have primary variant (ready for new sweep).
assert start_btn.variant == "primary", (
f"Start button variant should be 'primary', got {start_btn.variant!r}"
)
# Take a screenshot for visual inspection.
app.save_screenshot("/tmp/birdcage_sweep_stop_test.svg")
@pytest.mark.asyncio
async def test_sweep_stop_during_software():
"""Stop button should abort a software sweep mid-execution."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await pilot.press("f3")
await pilot.pause()
signal = app.query_one("SignalScreen")
signal.switch_mode("sweep")
await pilot.pause()
# Check the "Software mode" checkbox to force slow path.
from textual.widgets import Checkbox
sw_checkbox = app.query_one("#sweep-software-mode", Checkbox)
sw_checkbox.value = True
await pilot.pause()
# Start sweep (software mode — takes longer per point).
start_btn = app.query_one("#btn-start-sweep", Button)
await pilot.click(start_btn)
# Brief pause to let the worker start, but not finish.
await asyncio.sleep(0.3)
# Click Stop while the software sweep is in progress.
stop_btn = app.query_one("#btn-stop-sweep", Button)
await pilot.click(stop_btn)
# Wait for the worker to notice the stop flag.
await asyncio.sleep(1.0)
# Verify: sweep stopped.
assert not signal._sweeping, "_sweeping should be False after Stop"
assert start_btn.variant == "primary"
app.save_screenshot("/tmp/birdcage_sweep_stop_sw_test.svg")
@pytest.mark.asyncio
async def test_sweep_restart_after_stop():
"""After stopping, a new sweep should start without errors."""
app = BirdcageApp()
app.demo_mode = True
async with app.run_test(size=(120, 40)) as pilot:
await pilot.pause()
await pilot.press("f3")
await pilot.pause()
signal = app.query_one("SignalScreen")
signal.switch_mode("sweep")
await pilot.pause()
# Verify device is available.
assert signal._device is not None, "Device should be set after mount"
# First sweep -- click Start and wait for completion.
start_btn = app.query_one("#btn-start-sweep", Button)
await pilot.click(start_btn)
# Give the worker time to start.
await asyncio.sleep(0.5)
# Poll until sweep completes.
for _ in range(40):
await asyncio.sleep(0.2)
if not signal._sweeping:
# Give one more beat for call_from_thread callbacks.
await asyncio.sleep(0.3)
break
assert not signal._sweeping, "Sweep still running after polling"
assert len(signal._sweep_data) > 0, (
f"First sweep should have data; "
f"device type={type(signal._device).__name__}"
)
# Second sweep.
await pilot.click(start_btn)
await asyncio.sleep(0.5)
for _ in range(40):
await asyncio.sleep(0.2)
if not signal._sweeping:
await asyncio.sleep(0.3)
break
assert not signal._sweeping
assert len(signal._sweep_data) > 0, "Second sweep should have data"
app.save_screenshot("/tmp/birdcage_sweep_restart_test.svg")