diff --git a/tui/src/birdcage_tui/bridge.py b/tui/src/birdcage_tui/bridge.py index cb126b8..6998403 100644 --- a/tui/src/birdcage_tui/bridge.py +++ b/tui/src/birdcage_tui/bridge.py @@ -158,6 +158,18 @@ class SerialBridge: self._connected = True self._menu = self._detect_menu() + def cancel_operation(self) -> None: + """Signal any in-progress long-running operation to abort. + + Safe to call from any thread. The cancel event is checked every + ~2 seconds by ``send_with_timeout``. + """ + self._cancel.set() + + def clear_cancel(self) -> None: + """Reset the cancel event so future operations proceed normally.""" + self._cancel.clear() + def disconnect(self) -> None: """Close the serial connection. @@ -166,13 +178,26 @@ class SerialBridge: the port cleanly. """ self._cancel.set() - with self._lock: + if not self._lock.acquire(timeout=5): + # Lock held by dead/stuck worker — force-close the port + # so the blocked serial read raises an exception. + logger.warning("Lock acquisition timed out, force-closing port") + with contextlib.suppress(Exception): + self._proto.disconnect() + self._connected = False + self._menu = Menu.UNKNOWN + self._cancel.clear() + return + + try: with contextlib.suppress(Exception): self._go_to_root() self._proto.disconnect() self._connected = False self._menu = Menu.UNKNOWN - self._cancel.clear() # reset for potential reconnect + self._cancel.clear() + finally: + self._lock.release() @property def is_connected(self) -> bool: diff --git a/tui/src/birdcage_tui/demo.py b/tui/src/birdcage_tui/demo.py index 62f6e60..eb56673 100644 --- a/tui/src/birdcage_tui/demo.py +++ b/tui/src/birdcage_tui/demo.py @@ -261,6 +261,12 @@ class DemoDevice: self._connected = True self._menu = _DemoMenu.ROOT + def cancel_operation(self) -> None: + pass # Demo operations are instant, nothing to cancel. + + def clear_cancel(self) -> None: + pass # No cancel state in demo mode. + def disconnect(self) -> None: self._connected = False self._menu = _DemoMenu.ROOT diff --git a/tui/src/birdcage_tui/screens/signal.py b/tui/src/birdcage_tui/screens/signal.py index 591db54..a0d1d3a 100644 --- a/tui/src/birdcage_tui/screens/signal.py +++ b/tui/src/birdcage_tui/screens/signal.py @@ -442,11 +442,18 @@ class SignalScreen(Container): try: self._do_sweep_firmware(device) return + except InterruptedError: + # User cancelled via Stop or app shutdown — don't retry. + log.info("Firmware sweep cancelled") + return except Exception: log.warning( "Firmware sweep failed, falling back to software", exc_info=True, ) + # Only fall through to software if still active. + if not self._sweeping: + return with contextlib.suppress(Exception): self.app.call_from_thread( self._set_sweep_status, @@ -456,6 +463,9 @@ class SignalScreen(Container): self._do_sweep_software(device) finally: self._sweeping = False + # Clear cancel so the next sweep doesn't immediately abort. + if self._device and hasattr(self._device, "clear_cancel"): + self._device.clear_cancel() with contextlib.suppress(Exception): self.app.call_from_thread(self._reset_sweep_buttons) @@ -641,6 +651,9 @@ class SignalScreen(Container): def _handle_sweep_stop(self) -> None: self._sweeping = False + # Cancel any blocked serial operation (firmware sweep). + if self._device and hasattr(self._device, "cancel_operation"): + self._device.cancel_operation() self._set_sweep_status("Stopping...") self._reset_sweep_buttons() @@ -676,6 +689,8 @@ class SignalScreen(Container): self._do_scan_inner(device) finally: self._scanning = False + if self._device and hasattr(self._device, "clear_cancel"): + self._device.clear_cancel() with contextlib.suppress(Exception): self.app.call_from_thread(self._reset_scan_buttons) @@ -875,6 +890,8 @@ class SignalScreen(Container): def _handle_scan_stop(self) -> None: self._scanning = False + if self._device and hasattr(self._device, "cancel_operation"): + self._device.cancel_operation() self._set_scan_status("Stopping...") self._reset_scan_buttons() diff --git a/tui/tests/__init__.py b/tui/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tui/tests/test_sweep_stop.py b/tui/tests/test_sweep_stop.py new file mode 100644 index 0000000..479207a --- /dev/null +++ b/tui/tests/test_sweep_stop.py @@ -0,0 +1,154 @@ +"""Test that the sweep Stop button actually stops a sweep. + +Uses Textual's run_test() and pilot to drive the TUI in demo mode, +start a sweep, click Stop, and verify the sweep terminates. +""" + +import asyncio + +import pytest +from textual.widgets import Button + +from birdcage_tui.app import BirdcageApp + + +@pytest.mark.asyncio +async def test_sweep_stop_firmware(): + """Stop button should abort a firmware sweep and reset UI state.""" + app = BirdcageApp() + app.demo_mode = True + + async with app.run_test(size=(120, 40)) as pilot: + # Wait for app to fully mount. + await pilot.pause() + + # Navigate to Signal screen (F3). + await pilot.press("f3") + await pilot.pause() + + # Switch to Sweep mode via the mode bar. + signal = app.query_one("SignalScreen") + signal.switch_mode("sweep") + await pilot.pause() + + # Click Start Sweep. + start_btn = app.query_one("#btn-start-sweep", Button) + await pilot.click(start_btn) + + # Give the firmware sweep worker time to start and complete + # (demo mode takes ~0.5s). + await asyncio.sleep(1.0) + + # Click Stop. + stop_btn = app.query_one("#btn-stop-sweep", Button) + await pilot.click(stop_btn) + await pilot.pause() + + # Verify: _sweeping should be False. + assert not signal._sweeping, "_sweeping should be False after Stop" + + # Verify: Start button should have primary variant (ready for new sweep). + assert start_btn.variant == "primary", ( + f"Start button variant should be 'primary', got {start_btn.variant!r}" + ) + + # Take a screenshot for visual inspection. + app.save_screenshot("/tmp/birdcage_sweep_stop_test.svg") + + +@pytest.mark.asyncio +async def test_sweep_stop_during_software(): + """Stop button should abort a software sweep mid-execution.""" + app = BirdcageApp() + app.demo_mode = True + + async with app.run_test(size=(120, 40)) as pilot: + await pilot.pause() + await pilot.press("f3") + await pilot.pause() + + signal = app.query_one("SignalScreen") + signal.switch_mode("sweep") + await pilot.pause() + + # Check the "Software mode" checkbox to force slow path. + from textual.widgets import Checkbox + + sw_checkbox = app.query_one("#sweep-software-mode", Checkbox) + sw_checkbox.value = True + await pilot.pause() + + # Start sweep (software mode — takes longer per point). + start_btn = app.query_one("#btn-start-sweep", Button) + await pilot.click(start_btn) + + # Brief pause to let the worker start, but not finish. + await asyncio.sleep(0.3) + + # Click Stop while the software sweep is in progress. + stop_btn = app.query_one("#btn-stop-sweep", Button) + await pilot.click(stop_btn) + + # Wait for the worker to notice the stop flag. + await asyncio.sleep(1.0) + + # Verify: sweep stopped. + assert not signal._sweeping, "_sweeping should be False after Stop" + assert start_btn.variant == "primary" + + app.save_screenshot("/tmp/birdcage_sweep_stop_sw_test.svg") + + +@pytest.mark.asyncio +async def test_sweep_restart_after_stop(): + """After stopping, a new sweep should start without errors.""" + app = BirdcageApp() + app.demo_mode = True + + async with app.run_test(size=(120, 40)) as pilot: + await pilot.pause() + await pilot.press("f3") + await pilot.pause() + + signal = app.query_one("SignalScreen") + signal.switch_mode("sweep") + await pilot.pause() + + # Verify device is available. + assert signal._device is not None, "Device should be set after mount" + + # First sweep -- click Start and wait for completion. + start_btn = app.query_one("#btn-start-sweep", Button) + await pilot.click(start_btn) + + # Give the worker time to start. + await asyncio.sleep(0.5) + + # Poll until sweep completes. + for _ in range(40): + await asyncio.sleep(0.2) + if not signal._sweeping: + # Give one more beat for call_from_thread callbacks. + await asyncio.sleep(0.3) + break + + assert not signal._sweeping, "Sweep still running after polling" + assert len(signal._sweep_data) > 0, ( + f"First sweep should have data; " + f"device type={type(signal._device).__name__}" + ) + + # Second sweep. + await pilot.click(start_btn) + await asyncio.sleep(0.5) + + for _ in range(40): + await asyncio.sleep(0.2) + if not signal._sweeping: + await asyncio.sleep(0.3) + break + + assert not signal._sweeping + assert len(signal._sweep_data) > 0, "Second sweep should have data" + + app.save_screenshot("/tmp/birdcage_sweep_restart_test.svg")