diff --git a/src/birdcage/protocol.py b/src/birdcage/protocol.py index c03885e..0a20d97 100644 --- a/src/birdcage/protocol.py +++ b/src/birdcage/protocol.py @@ -283,6 +283,38 @@ class CarryoutG2Protocol(FirmwareProtocol): time.sleep(0.001) # brief settle before next command return resp_data.decode("utf-8", errors="ignore") + def _probe_prompt(self) -> str: + """Send bare CR and read the prompt string. + + Returns the prompt text (e.g. ``'\\r\\nTRK>'`` or ``'\\r\\nMOT>'``). + Used to detect which submenu the firmware is in without sending + a command that could have side effects. + """ + if not self._serial: + raise RuntimeError("Not connected") + self._serial.write(b"\r") + resp = bytearray() + while True: + byte = self._serial.read(1) + if len(byte) == 0: + break # timeout — return what we have + resp.append(byte[0]) + if byte[0] == self.PROMPT_CHAR: + break + return resp.decode("utf-8", errors="ignore") + + def reset_to_root(self) -> None: + """Return to TRK> root without killing the shell. + + At TRK>, the ``q`` command terminates the UART shell entirely + (requires power cycle). This probes the current prompt first + and only sends ``q`` if we're in a submenu. + """ + prompt = self._probe_prompt() + if "TRK>" in prompt: + return # already at root + self._send("q") + def initialize(self, callback: Callable[[str], None] | None = None) -> None: """Prepare G2 for motor commands. @@ -293,22 +325,18 @@ class CarryoutG2Protocol(FirmwareProtocol): logger.info( "Initializing Carryout G2 (tracker must be pre-disabled via NVS 20)" ) - self._send("q") + self.reset_to_root() self.enter_motor_menu() logger.info("Carryout G2 initialized and ready") def enter_motor_menu(self) -> None: - self._send("q") + self.reset_to_root() self._send(self.MOTOR_COMMAND) def kill_search(self) -> None: """No-op — G2 search is disabled permanently via NVS index 20.""" logger.debug("G2 search kill is a no-op (NVS 20 disables tracker)") - def reset_to_root(self) -> None: - """Return to the firmware root menu.""" - self._send("q") - def get_position(self) -> Position: """Query dish position. @@ -351,7 +379,7 @@ class CarryoutG2Protocol(FirmwareProtocol): def enter_dvb_menu(self) -> None: """Enter DVB signal analysis submenu (must be at root menu).""" - self._send("q") # ensure root + self.reset_to_root() self._send("dvb") def enable_lna(self) -> None: @@ -392,6 +420,21 @@ class CarryoutG2Protocol(FirmwareProtocol): raise ValueError(f"Could not parse RSSI from: {response!r}") + def send_with_timeout(self, cmd: str, timeout: float = 90) -> str: + """Send a command with a custom serial timeout. + + Used for long-running firmware commands (azscanwxp, azscan) that stream + output over tens of seconds before the final prompt. + """ + if not self._serial: + raise RuntimeError("Not connected") + original = self._serial.timeout + self._serial.timeout = timeout + try: + return self._send(cmd) + finally: + self._serial.timeout = original + def send_raw(self, cmd: str) -> str: """Send arbitrary command, return raw prompt-terminated response.""" return self._send(cmd) diff --git a/tui/src/birdcage_tui/bridge.py b/tui/src/birdcage_tui/bridge.py index a90e218..6ab4a4d 100644 --- a/tui/src/birdcage_tui/bridge.py +++ b/tui/src/birdcage_tui/bridge.py @@ -66,6 +66,32 @@ class SerialBridge: self._menu = Menu.UNKNOWN self._connected = False + # ------------------------------------------------------------------ + # Menu prompt → string mapping for status display + # ------------------------------------------------------------------ + + _MENU_PROMPTS: dict[Menu, str] = { + Menu.ROOT: "TRK>", + Menu.MOT: "MOT>", + Menu.DVB: "DVB>", + Menu.NVS: "NVS>", + Menu.A3981: "A3981>", + Menu.ADC: "ADC>", + Menu.OS: "OS>", + Menu.STEP: "STEP>", + Menu.PEAK: "PEAK>", + Menu.EEPROM: "EE>", + Menu.GPIO: "GPIO>", + Menu.LATLON: "LATLON>", + Menu.DIPSWITCH: "DIPSWITCH>", + Menu.UNKNOWN: "???", + } + + @property + def current_menu(self) -> str: + """Current firmware prompt string for status display.""" + return self._MENU_PROMPTS.get(self._menu, "???") + # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @@ -82,6 +108,21 @@ class SerialBridge: self._proto.reset_to_root() self._menu = Menu.ROOT + def _detect_menu(self) -> Menu: + """Probe firmware prompt and return the corresponding Menu enum. + + Caller must hold ``_lock``. + """ + prompt = self._proto._probe_prompt() + upper = prompt.upper() + # Check against known prompt strings (handles EE> vs EEPROM>, etc.) + for menu, prompt_str in self._MENU_PROMPTS.items(): + if menu == Menu.UNKNOWN: + continue + if prompt_str.upper() in upper: + return menu + return Menu.UNKNOWN + def _ensure_menu(self, target: Menu) -> None: """Navigate to *target* submenu if not already there. @@ -114,7 +155,7 @@ class SerialBridge: with self._lock: self._proto.connect(port, baudrate) self._connected = True - self._menu = Menu.UNKNOWN + self._menu = self._detect_menu() def disconnect(self) -> None: """Close the serial connection.""" @@ -139,7 +180,8 @@ class SerialBridge: with self._lock: if not skip_init: self._proto.initialize() - self._menu = Menu.MOT # initialize() ends in MOT> + self._menu = Menu.MOT # initialize() ends in MOT> + # else: leave _menu as whatever connect() detected # ------------------------------------------------------------------ # Motor (MOT>) @@ -296,6 +338,116 @@ class SerialBridge: return result + def get_pid_gains(self) -> dict[str, dict[str, float]]: + """Read PID gains for both motor axes. + + Firmware returns: ``Kp=600 Kv=60 Ki=1`` per motor. + + Returns: + ``{"az": {"kp": 600, "kv": 60, "ki": 1}, + "el": {"kp": 250, "kv": 50, "ki": 1}}`` + """ + with self._lock: + self._ensure_menu(Menu.MOT) + response = self._send("pid") + + # Parse "Kp=600 Kv=60 Ki=1" patterns. The pid command without args + # shows both motors. We look for two sets of Kp/Kv/Ki values. + kp_matches = re.findall(r"Kp[=:]?\s*(\d+)", response) + kv_matches = re.findall(r"Kv[=:]?\s*(\d+)", response) + ki_matches = re.findall(r"Ki[=:]?\s*(\d+)", response) + + result = { + "az": {"kp": 600.0, "kv": 60.0, "ki": 1.0}, + "el": {"kp": 250.0, "kv": 50.0, "ki": 1.0}, + } + + if len(kp_matches) >= 2: + result["az"]["kp"] = float(kp_matches[0]) + result["el"]["kp"] = float(kp_matches[1]) + elif len(kp_matches) == 1: + result["az"]["kp"] = float(kp_matches[0]) + + if len(kv_matches) >= 2: + result["az"]["kv"] = float(kv_matches[0]) + result["el"]["kv"] = float(kv_matches[1]) + elif len(kv_matches) == 1: + result["az"]["kv"] = float(kv_matches[0]) + + if len(ki_matches) >= 2: + result["az"]["ki"] = float(ki_matches[0]) + result["el"]["ki"] = float(ki_matches[1]) + elif len(ki_matches) == 1: + result["az"]["ki"] = float(ki_matches[0]) + + return result + + def set_pid_gains(self, motor_id: int, kp: float, kv: float, ki: float) -> None: + """Write PID gains for a single motor axis. + + Args: + motor_id: 0 for AZ, 1 for EL. + kp: Proportional gain. + kv: Velocity gain. + ki: Integral gain. + """ + with self._lock: + self._ensure_menu(Menu.MOT) + self._send(f"pid {motor_id} {int(kp)} {int(kv)} {int(ki)}") + + def az_sweep_firmware( + self, + start_az: float, + span: float, + step_cdeg: int, + num_xponders: int, + timeout: float = 120, + ) -> list[dict[str, float]]: + """Execute a firmware-accelerated AZ sweep via azscanwxp. + + Moves to *start_az* first, then runs the firmware sweep command which + handles motor movement and RSSI measurement atomically — no per-point + serial round-trips. + + Args: + start_az: Starting azimuth in degrees. + span: Total sweep width in degrees. + step_cdeg: Step size in centidegrees (100 = 1.00°). + num_xponders: Number of transponders to cycle per position. + timeout: Serial read timeout for the long-running command. + + Returns: + List of dicts with keys: az, rssi, lock, snr. + """ + with self._lock: + self._ensure_menu(Menu.MOT) + # Move to start position and wait for prompt. + self._send(f"a 0 {start_az}") + # Execute firmware sweep with extended timeout. + response = self._proto.send_with_timeout( + f"azscanwxp 0 {span} {step_cdeg} {num_xponders}", + timeout=timeout, + ) + + # Parse streaming output lines. + # Motor: Angle: RSSI: Lock:<0/1> SNR: + results: list[dict[str, float]] = [] + for match in re.finditer( + r"Angle:(-?\d+)\s+RSSI:(\d+)\s+Lock:(\d)\s+SNR:(-?\d+\.?\d*)", + response, + ): + results.append( + { + "az": int(match.group(1)) / 100.0, + "rssi": float(match.group(2)), + "lock": float(match.group(3)), + "snr": float(match.group(4)), + } + ) + + logger.info("Firmware sweep returned %d points", len(results)) + return results + # ------------------------------------------------------------------ # Signal (DVB>) # ------------------------------------------------------------------ diff --git a/tui/src/birdcage_tui/demo.py b/tui/src/birdcage_tui/demo.py index e86ab88..62f6e60 100644 --- a/tui/src/birdcage_tui/demo.py +++ b/tui/src/birdcage_tui/demo.py @@ -333,6 +333,53 @@ class DemoDevice: "el_steps": int(self._el * 24960 / 360), } + def get_pid_gains(self) -> dict[str, dict[str, float]]: + return { + "az": {"kp": 600.0, "kv": 60.0, "ki": 1.0}, + "el": {"kp": 250.0, "kv": 50.0, "ki": 1.0}, + } + + def set_pid_gains(self, motor_id: int, kp: float, kv: float, ki: float) -> None: + pass # No-op in demo mode. + + def az_sweep_firmware( + self, + start_az: float, + span: float, + step_cdeg: int, + num_xponders: int, + timeout: float = 120, + ) -> list[dict[str, float]]: + """Simulate a firmware azscanwxp sweep with Gaussian signal peak.""" + step_deg = step_cdeg / 100.0 + if step_deg <= 0: + step_deg = 1.0 + + results: list[dict[str, float]] = [] + az = start_az + end_az = start_az + span + while az <= end_az + 1e-9: + dist_sq = (az - _SAT_AZ) ** 2 + (self._el - _SAT_EL) ** 2 + signal = _RSSI_PEAK * math.exp(-dist_sq / _RSSI_BEAM_WIDTH) + rssi = _RSSI_NOISE_FLOOR + signal + random.gauss(0.0, 30.0) + locked = 1 if rssi > 1500 else 0 + snr = max(0.0, (rssi - _RSSI_NOISE_FLOOR) / 50.0) + random.gauss(0.0, 0.5) + results.append( + { + "az": round(az, 2), + "rssi": round(rssi), + "lock": float(locked), + "snr": round(max(0.0, snr), 1), + } + ) + az += step_deg + + # Brief delay to simulate firmware execution time. + time.sleep(0.5) + self._target_az = end_az + self._last_move_time = time.monotonic() + return results + # ------------------------------------------------------------------ # Signal (DVB>) # ------------------------------------------------------------------ @@ -530,6 +577,16 @@ class DemoDevice: az_steps = int(self._az * 40000 / 360) el_steps = int(self._el * 24960 / 360) return f"Position[0] = {az_steps} Position[1] = {el_steps}\nMOT>" + if cmd == "pid" or cmd.startswith("pid "): + parts = cmd.split() + if len(parts) == 1: + return ( + "Motor 0: Kp=600 Kv=60 Ki=1\nMotor 1: Kp=250 Kv=50 Ki=1\nMOT>" + ) + elif len(parts) >= 4: + motor_id = parts[1] + return f"PID set for motor {motor_id}\nMOT>" + return "Usage: pid [motor] [Kp] [Kv] [Ki]\nMOT>" return f"Unknown command: {cmd}\nMOT>" def _handle_dvb(self, cmd: str) -> str: diff --git a/tui/src/birdcage_tui/screens/signal.py b/tui/src/birdcage_tui/screens/signal.py index 9a5ac27..44e859e 100644 --- a/tui/src/birdcage_tui/screens/signal.py +++ b/tui/src/birdcage_tui/screens/signal.py @@ -1,83 +1,261 @@ -"""F2 Signal screen -- RSSI monitoring, sparklines, LNB control. +"""F3 Signal screen -- Find and Measure. -Widget container for ContentSwitcher. Provides start/stop signal -monitoring with configurable iteration count and poll rate, dual -sparklines for DVB and ADC RSSI, peak tracking, and LNA toggle. +Three sub-modes via ModeBar + ContentSwitcher: + + Monitor -- RSSI monitoring with dual sparklines, signal gauge, receiver info + Sweep -- 1D azimuth-vs-RSSI bar chart across an AZ range at fixed EL + Sky Map -- 2D AZ x EL grid scan with heatmap visualization and CSV export + +Merges the old Signal + Scan screens into a single tab. """ +import contextlib +import csv import logging import re +from pathlib import Path from textual import work from textual.app import ComposeResult from textual.containers import Container, Horizontal, Vertical -from textual.widgets import Button, Input, Static -from textual.worker import Worker +from textual.widgets import ( + Button, + Checkbox, + ContentSwitcher, + Input, + ProgressBar, + Static, +) +from textual.worker import Worker, get_current_worker +from birdcage_tui.widgets.mode_bar import ModeBar +from birdcage_tui.widgets.receiver_info import ReceiverInfo from birdcage_tui.widgets.signal_gauge import SignalGauge +from birdcage_tui.widgets.sky_heatmap import SkyHeatmap from birdcage_tui.widgets.sparkline_widget import SparklineWidget +from birdcage_tui.widgets.status_strip import StatusStrip +from birdcage_tui.widgets.sweep_plot import SweepPlot log = logging.getLogger(__name__) +# Type alias -- SerialBridge and DemoDevice share the same duck-typed interface. +DeviceLike = object + class SignalScreen(Container): - """F2: Signal monitoring and RSSI display.""" + """F3: Find and Measure -- signal monitoring, sweep, and sky mapping.""" def __init__(self, **kwargs) -> None: super().__init__(**kwargs) - self._device: object = None + self._device: DeviceLike | None = None + + # Monitor state self._monitoring = False self._lna_enabled = False self._peak_rssi = 0 self._total_samples = 0 self._signal_worker: Worker | None = None + self._receiver_loaded = False + + # Sweep state + self._sweeping = False + self._sweep_data: list[tuple[float, float]] = [] + + # Sky Map state + self._scanning = False + self._scan_data: list[tuple[float, float, float]] = [] + + # ------------------------------------------------------------------ + # Compose + # ------------------------------------------------------------------ def compose(self) -> ComposeResult: with Container(classes="screen-container"): - with Vertical(classes="panel"): - yield Static("Signal Strength", classes="panel-title") - yield SignalGauge(id="signal-gauge") - with Vertical(): - yield SparklineWidget( - max_points=80, label="DVB RSSI", color="#00d4aa", id="dvb-spark" - ) - yield SparklineWidget( - max_points=80, label="ADC RSSI", color="#2080d0", id="adc-spark" - ) - with Horizontal(classes="panel"): - yield Static("Samples: 0", id="sample-count", classes="label") - yield Static(" Peak: 0", id="peak-value", classes="label") - yield Static(" LNA: OFF", id="lna-status", classes="label") - yield Static(" Lock: NO", id="lock-status", classes="label") - with Horizontal(classes="bottom-controls"): - yield Static("Iters ", classes="label") - yield Input(value="10", id="iter-input", type="integer") - yield Static(" Rate ", classes="label") - yield Input(value="2", id="rate-input", type="integer") - yield Button("Start", id="btn-start", variant="primary") - yield Button("Stop", id="btn-stop") - yield Button("Enable LNA", id="btn-lna") + yield ModeBar( + modes={ + "monitor": "Monitor", + "sweep": "Sweep", + "skymap": "Sky Map", + }, + initial="monitor", + classes="mode-bar", + ) + with ContentSwitcher(id="signal-modes", initial="monitor"): + # -- Monitor mode ----------------------------------------- + with Container(id="monitor"): + with Horizontal(classes="top-row"): + with Vertical(classes="panel"): + yield Static("Signal Strength", classes="panel-title") + yield SignalGauge(id="signal-gauge") + yield SparklineWidget( + max_points=80, + label="DVB RSSI", + color="#00d4aa", + id="dvb-spark", + ) + yield SparklineWidget( + max_points=80, + label="ADC RSSI", + color="#2080d0", + id="adc-spark", + ) + with Vertical(classes="panel"): + yield Static("Receiver", classes="panel-title") + yield ReceiverInfo(id="receiver-info") + with Horizontal(classes="panel"): + yield Static("Samples: 0", id="sample-count", classes="label") + yield Static(" Peak: 0", id="peak-value", classes="label") + yield Static(" LNA: OFF", id="lna-status", classes="label") + yield Static(" Lock: NO", id="lock-status", classes="label") + with Horizontal(classes="bottom-controls"): + yield Static("Iters ", classes="label") + yield Input(value="10", id="iter-input", type="integer") + yield Static(" Rate ", classes="label") + yield Input(value="2", id="rate-input", type="integer") + yield Static(" Hz", classes="label") + yield Button("Start", id="btn-start", variant="primary") + yield Button("Stop", id="btn-stop") + yield Button("Enable LNA", id="btn-lna") + yield Button("Reset Peak", id="btn-reset-peak") + + # -- Sweep mode ------------------------------------------- + with Container(id="sweep"): + with Vertical(classes="panel"): + yield Static("AZ Sweep", classes="panel-title") + yield SweepPlot(id="sweep-plot") + with Horizontal(classes="scan-status"): + yield Static("Idle", id="sweep-status-text") + yield ProgressBar( + id="sweep-progress", total=100, show_eta=False + ) + with Horizontal(classes="bottom-controls"): + yield Static("AZ Start ", classes="label") + yield Input(value="160", id="sweep-az-start", type="number") + yield Static(" AZ End ", classes="label") + yield Input(value="220", id="sweep-az-end", type="number") + yield Static(" Step ", classes="label") + yield Input(value="1.5", id="sweep-az-step", type="number") + yield Static(" EL (fixed) ", classes="label") + yield Input(value="38.0", id="sweep-el-fixed", type="number") + yield Static(" Iters ", classes="label") + yield Input(value="10", id="sweep-iters", type="integer") + with Horizontal(classes="bottom-controls"): + yield Button( + "Start Sweep", id="btn-start-sweep", variant="primary" + ) + yield Button("Stop", id="btn-stop-sweep") + yield Button("Export CSV", id="btn-export-sweep") + yield Checkbox( + "Software mode", + id="sweep-software-mode", + value=False, + ) + + # -- Sky Map mode ----------------------------------------- + with Container(id="skymap"): + with Vertical(classes="panel"): + yield Static("Sky Scan", classes="panel-title") + yield SkyHeatmap(az_bins=40, el_bins=10, id="heatmap") + yield SparklineWidget( + max_points=80, + label="Sweep RSSI", + color="#00d4aa", + id="scan-spark", + ) + with Horizontal(classes="scan-status"): + yield Static("Idle", id="scan-status-text") + yield ProgressBar(id="scan-progress", total=100, show_eta=False) + with Horizontal(classes="bottom-controls"): + yield Static("AZ ", classes="label") + yield Input(value="160", id="scan-az-start", type="number") + yield Static("-", classes="label") + yield Input(value="220", id="scan-az-end", type="number") + yield Static(" Step ", classes="label") + yield Input(value="1.5", id="scan-az-step", type="number") + yield Static(" EL ", classes="label") + yield Input(value="18", id="scan-el-start", type="number") + yield Static("-", classes="label") + yield Input(value="65", id="scan-el-end", type="number") + yield Static(" Step ", classes="label") + yield Input(value="5.0", id="scan-el-step", type="number") + with Horizontal(classes="bottom-controls"): + yield Static("Transponders ", classes="label") + yield Input(value="3", id="xponder-input", type="integer") + yield Button( + "Start Scan", id="btn-start-scan", variant="primary" + ) + yield Button("Stop", id="btn-stop-scan") + yield Button("Export CSV", id="btn-export-scan") + yield Checkbox( + "Software mode", + id="scan-software-mode", + value=False, + ) # ------------------------------------------------------------------ # Device lifecycle # ------------------------------------------------------------------ - def set_device(self, device: object) -> None: + def set_device(self, device: DeviceLike) -> None: """Store the device reference.""" self._device = device def on_show(self) -> None: - """Called when this screen becomes visible.""" - pass # Monitoring is explicit via Start/Stop buttons. + """Load receiver info on first show (requires device).""" + if not self._receiver_loaded and self._device is not None: + self._load_receiver_info() + + def on_position_update(self, az: float, el: float) -> None: + """Called by app-level position poll. Signal screen doesn't use this.""" def on_unmount(self) -> None: - """Stop monitoring thread on teardown.""" + """Stop all workers on teardown.""" self._monitoring = False + self._sweeping = False + self._scanning = False # ------------------------------------------------------------------ - # Signal poll worker + # ModeBar switching # ------------------------------------------------------------------ + def on_mode_bar_mode_changed(self, event: ModeBar.ModeChanged) -> None: + """Switch ContentSwitcher when ModeBar selection changes.""" + switcher = self.query_one("#signal-modes", ContentSwitcher) + switcher.current = event.mode + + def switch_mode(self, mode_key: str) -> None: + """Programmatic mode switch (called by app.py for QuickActions).""" + switcher = self.query_one("#signal-modes", ContentSwitcher) + switcher.current = mode_key + # Update the ModeBar visual state to match. + mode_bar = self.query_one(ModeBar) + for btn in mode_bar.query(".mode-btn"): + btn.remove_class("active") + with contextlib.suppress(Exception): + mode_bar.query_one(f"#mode-{mode_key}").add_class("active") + + # ------------------------------------------------------------------ + # Input helpers + # ------------------------------------------------------------------ + + def _read_float(self, widget_id: str, fallback: float) -> float: + """Read a float from an Input widget, returning *fallback* on error.""" + try: + return float(self.query_one(f"#{widget_id}", Input).value) + except (ValueError, TypeError): + return fallback + + def _read_int(self, widget_id: str, fallback: int) -> int: + """Read an int from an Input widget, returning *fallback* on error.""" + try: + return int(self.query_one(f"#{widget_id}", Input).value) + except (ValueError, TypeError): + return fallback + + # ================================================================== + # MONITOR MODE + # ================================================================== + @work(thread=True, exclusive=True, group="signal-poll") def _do_signal_poll(self) -> None: """Poll RSSI at the configured rate while monitoring is active.""" @@ -117,7 +295,8 @@ class SignalScreen(Container): self.app.call_from_thread(self._update_gauge, rssi_avg, rssi_cur, reads) self.app.call_from_thread(self._push_dvb_spark, float(rssi_avg)) - self.app.call_from_thread(self._update_stats) + self.app.call_from_thread(self._update_monitor_stats) + self.app.call_from_thread(self._update_status_strip_rssi, rssi_avg) except Exception: log.debug("DVB RSSI poll failed", exc_info=True) @@ -143,9 +322,7 @@ class SignalScreen(Container): shutdown.wait(1.0 / rate) - # ------------------------------------------------------------------ - # Thread-safe widget update callbacks - # ------------------------------------------------------------------ + # -- Monitor thread-safe callbacks -- def _read_input(self, input_id: str) -> str: """Read an Input widget's value (must run on main thread).""" @@ -163,7 +340,7 @@ class SignalScreen(Container): def _push_adc_spark(self, value: float) -> None: self.query_one("#adc-spark", SparklineWidget).push(value) - def _update_stats(self) -> None: + def _update_monitor_stats(self) -> None: self.query_one("#sample-count", Static).update( f"Samples: {self._total_samples}" ) @@ -177,26 +354,17 @@ class SignalScreen(Container): label = "ON" if self._lna_enabled else "OFF" self.query_one("#lna-status", Static).update(f" LNA: {label}") - # ------------------------------------------------------------------ - # Button handlers - # ------------------------------------------------------------------ + def _update_status_strip_rssi(self, rssi_avg: int) -> None: + """Push RSSI to the app-level StatusStrip.""" + with contextlib.suppress(Exception): + self.app.query_one("#status-strip", StatusStrip).rssi = rssi_avg - def on_button_pressed(self, event: Button.Pressed) -> None: - button_id = event.button.id or "" + # -- Monitor button handlers -- - if button_id == "btn-start": - self._handle_start() - elif button_id == "btn-stop": - self._handle_stop() - elif button_id == "btn-lna": - self._handle_lna() - - def _handle_start(self) -> None: - """Start signal monitoring.""" + def _handle_monitor_start(self) -> None: if self._device is None: self.app.notify("No device connected", severity="warning") return - if self._monitoring: return @@ -207,8 +375,7 @@ class SignalScreen(Container): self.query_one("#btn-start", Button).variant = "default" self.query_one("#btn-stop", Button).variant = "warning" - def _handle_stop(self) -> None: - """Stop signal monitoring.""" + def _handle_monitor_stop(self) -> None: self._monitoring = False self.app.notify("Signal monitoring stopped") @@ -216,7 +383,6 @@ class SignalScreen(Container): self.query_one("#btn-stop", Button).variant = "default" def _handle_lna(self) -> None: - """Toggle LNA enable (sends lnbdc odu to set 13V).""" if self._device is None: self.app.notify("No device connected", severity="warning") return @@ -224,7 +390,6 @@ class SignalScreen(Container): @work(thread=True, exclusive=False, group="signal-cmd") def _do_enable_lna(self) -> None: - """Enable LNA in a worker thread (blocks on serial I/O).""" try: self._device.enable_lna() self._lna_enabled = True @@ -235,3 +400,490 @@ class SignalScreen(Container): self.app.call_from_thread( self.app.notify, "LNA enable failed", severity="error" ) + + def _handle_reset_peak(self) -> None: + self._peak_rssi = 0 + self._total_samples = 0 + self._update_monitor_stats() + self.app.notify("Peak and sample counters reset") + + @work(thread=True, exclusive=False, group="signal-cmd") + def _load_receiver_info(self) -> None: + """Fetch channel params and DVB config in a worker thread.""" + if self._device is None: + return + try: + channel = self._device.get_channel_params() + config = self._device.get_dvb_config() + self._receiver_loaded = True + self.app.call_from_thread(self._apply_receiver_info, channel, config) + except Exception: + log.debug("Receiver info load failed", exc_info=True) + + def _apply_receiver_info(self, channel: str, config: str) -> None: + self.query_one("#receiver-info", ReceiverInfo).load_data(channel, config) + + # ================================================================== + # SWEEP MODE (1D AZ scan) + # ================================================================== + + @work(thread=True) + def _do_sweep(self) -> None: + """Execute a 1D AZ sweep -- firmware-accelerated or software fallback.""" + device = self._device + if device is None: + return + + # Check if user forced software mode. + force_software = self.query_one("#sweep-software-mode", Checkbox).value + + if not force_software and hasattr(device, "az_sweep_firmware"): + try: + self._do_sweep_firmware(device) + return + except Exception: + log.warning( + "Firmware sweep failed, falling back to software", + exc_info=True, + ) + self.app.call_from_thread( + self._set_sweep_status, "Firmware sweep failed -- falling back..." + ) + + self._do_sweep_software(device) + + def _do_sweep_firmware(self, device: DeviceLike) -> None: + """Firmware-accelerated sweep via azscanwxp (runs in worker thread).""" + shutdown = self.app.shutdown_event + + az_start = self._read_float("sweep-az-start", 160.0) + az_end = self._read_float("sweep-az-end", 220.0) + az_step = self._read_float("sweep-az-step", 1.5) + el_fixed = self._read_float("sweep-el-fixed", 38.0) + iterations = self._read_int("sweep-iters", 10) + + span = az_end - az_start + if span <= 0 or az_step <= 0: + self.app.call_from_thread( + self._set_sweep_status, "Invalid sweep range -- check parameters" + ) + return + + step_cdeg = max(1, int(az_step * 100)) + + # Set EL once. + self.app.call_from_thread(self._set_sweep_status, "Setting elevation...") + device.move_motor(1, el_fixed) + + # Let EL settle briefly. + shutdown.wait(0.5) + if not self._sweeping or shutdown.is_set(): + return + + self.app.call_from_thread( + self._set_sweep_status, "Firmware scan in progress..." + ) + self.app.call_from_thread( + self._set_sweep_progress, 10, "Firmware scan in progress..." + ) + + results = device.az_sweep_firmware( + start_az=az_start, + span=span, + step_cdeg=step_cdeg, + num_xponders=iterations, + ) + + if not results: + self.app.call_from_thread( + self._set_sweep_status, "Firmware sweep returned no data" + ) + return + + # Populate the plot with all points at once. + sweep_plot = self.query_one("#sweep-plot", SweepPlot) + for pt in results: + az_val = pt["az"] + rssi = pt["rssi"] + self._sweep_data.append((az_val, rssi)) + self.app.call_from_thread(sweep_plot.add_point, az_val, rssi) + + total = len(results) + msg = f"Sweep complete -- {total} points (firmware)" + self.app.call_from_thread(self._set_sweep_progress, 100, msg) + self.app.call_from_thread( + self._set_sweep_status, f"Sweep complete -- {total} points (firmware)" + ) + + def _do_sweep_software(self, device: DeviceLike) -> None: + """Software step-dwell-measure sweep (runs in worker thread).""" + worker = get_current_worker() + shutdown = self.app.shutdown_event + + az_start = self._read_float("sweep-az-start", 160.0) + az_end = self._read_float("sweep-az-end", 220.0) + az_step = self._read_float("sweep-az-step", 1.5) + el_fixed = self._read_float("sweep-el-fixed", 38.0) + iterations = self._read_int("sweep-iters", 10) + + if az_step <= 0: + az_step = 1.0 + + # Build AZ point list. + az_values: list[float] = [] + az = az_start + while az <= az_end + 1e-9: + az_values.append(round(az, 2)) + az += az_step + + total = len(az_values) + if total == 0: + self.app.call_from_thread( + self._set_sweep_status, "No sweep points -- check parameters" + ) + return + + sweep_plot = self.query_one("#sweep-plot", SweepPlot) + + # Set EL once at start, then only move AZ per point. + try: + device.move_motor(1, el_fixed) + except Exception: + log.exception("EL move failed") + + for idx, az_val in enumerate(az_values): + if not self._sweeping or worker.is_cancelled or shutdown.is_set(): + self.app.call_from_thread(self._set_sweep_status, "Sweep stopped") + return + + # Move AZ only (EL already set). + try: + device.move_motor(0, az_val) + except Exception: + log.exception("AZ move failed at %.2f", az_val) + msg = f"Move error at AZ={az_val:.1f}" + self.app.call_from_thread(self._set_sweep_status, msg) + continue + + # Settle time. + shutdown.wait(0.3) + + # Read signal. + try: + rssi_data = device.get_rssi(iterations) + rssi = float(rssi_data.get("average", 0)) + except Exception: + log.exception("get_rssi failed at AZ=%.2f", az_val) + rssi = 0.0 + + self._sweep_data.append((az_val, rssi)) + + # Update widgets. + self.app.call_from_thread(sweep_plot.add_point, az_val, rssi) + self.app.call_from_thread(sweep_plot.set_active, az_val) + + done = idx + 1 + pct = int(done * 100 / total) + status = f"AZ={az_val:.1f} RSSI={rssi:.0f} [{done}/{total}]" + self.app.call_from_thread(self._set_sweep_progress, pct, status) + + self.app.call_from_thread( + self._set_sweep_status, f"Sweep complete -- {total} points" + ) + + # -- Sweep thread-safe callbacks -- + + def _set_sweep_status(self, text: str) -> None: + self.query_one("#sweep-status-text", Static).update(text) + + def _set_sweep_progress(self, pct: int, status_text: str) -> None: + self.query_one("#sweep-progress", ProgressBar).update(progress=pct) + self.query_one("#sweep-status-text", Static).update(status_text) + + # -- Sweep button handlers -- + + def _handle_sweep_start(self) -> None: + if self._device is None: + self.app.notify("No device connected", severity="warning") + return + if self._sweeping: + self.app.notify("Sweep already in progress", severity="warning") + return + + sweep_plot = self.query_one("#sweep-plot", SweepPlot) + sweep_plot.clear() + self._sweep_data.clear() + self.query_one("#sweep-progress", ProgressBar).update(progress=0) + self._set_sweep_status("Starting sweep...") + + self._sweeping = True + self._do_sweep() + + def _handle_sweep_stop(self) -> None: + self._sweeping = False + self._set_sweep_status("Stopping...") + + def _export_sweep_csv(self) -> None: + if not self._sweep_data: + self.app.notify("No sweep data to export", severity="warning") + return + + output = Path("/tmp/birdcage_sweep.csv") + try: + with output.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow(["az", "rssi"]) + for az, rssi in self._sweep_data: + writer.writerow([f"{az:.2f}", f"{rssi:.1f}"]) + self.app.notify(f"Exported {len(self._sweep_data)} points to {output}") + except OSError as exc: + log.exception("Sweep CSV export failed") + self.app.notify(f"Export failed: {exc}", severity="error") + + # ================================================================== + # SKY MAP MODE (2D AZ x EL scan) + # ================================================================== + + @work(thread=True) + def _do_scan(self) -> None: + """Execute the AZ/EL grid scan in a background thread.""" + worker = get_current_worker() + shutdown = self.app.shutdown_event + device = self._device + if device is None: + return + + az_start = self._read_float("scan-az-start", 160.0) + az_end = self._read_float("scan-az-end", 220.0) + az_step = self._read_float("scan-az-step", 1.5) + el_start = self._read_float("scan-el-start", 18.0) + el_end = self._read_float("scan-el-end", 65.0) + el_step = self._read_float("scan-el-step", 5.0) + iterations = self._read_int("xponder-input", 3) + + if az_step <= 0: + az_step = 1.0 + if el_step <= 0: + el_step = 1.0 + + # Build the grid point lists. + el_values: list[float] = [] + el = el_start + while el <= el_end + 1e-9: + el_values.append(round(el, 2)) + el += el_step + + az_values: list[float] = [] + az = az_start + while az <= az_end + 1e-9: + az_values.append(round(az, 2)) + az += az_step + + total_points = len(el_values) * len(az_values) + if total_points == 0: + self.app.call_from_thread( + self._set_scan_status, "No grid points -- check parameters" + ) + return + + heatmap = self.query_one("#heatmap", SkyHeatmap) + spark = self.query_one("#scan-spark", SparklineWidget) + + force_software = self.query_one("#scan-software-mode", Checkbox).value + use_firmware = not force_software and hasattr(device, "az_sweep_firmware") + + done = 0 + az_span_grid = az_end - az_start + 1e-9 + el_span_grid = el_end - el_start + 1e-9 + span_deg = az_end - az_start + step_cdeg = max(1, int(az_step * 100)) + + for _el_idx, el_val in enumerate(el_values): + if not self._scanning or worker.is_cancelled or shutdown.is_set(): + self.app.call_from_thread(self._set_scan_status, "Scan stopped") + return + + # Try firmware sweep for this EL row. + if use_firmware: + try: + device.move_motor(1, el_val) + shutdown.wait(0.3) + + status = f"EL={el_val:.1f} Firmware sweep..." + self.app.call_from_thread(self._set_scan_status, status) + + results = device.az_sweep_firmware( + start_az=az_start, + span=span_deg, + step_cdeg=step_cdeg, + num_xponders=iterations, + ) + + for pt in results: + az_val = pt["az"] + rssi = pt["rssi"] + self._scan_data.append((az_val, el_val, rssi)) + + grid_az = min( + int((az_val - az_start) / az_span_grid * heatmap.az_bins), + heatmap.az_bins - 1, + ) + grid_el = min( + int((el_val - el_start) / el_span_grid * heatmap.el_bins), + heatmap.el_bins - 1, + ) + self.app.call_from_thread( + heatmap.set_point, grid_az, grid_el, rssi + ) + self.app.call_from_thread(spark.push, rssi) + + done += len(az_values) + pct = min(int(done * 100 / total_points), 100) + status = ( + f"EL={el_val:.1f} {len(results)} pts (firmware) " + f"[{done}/{total_points}]" + ) + self.app.call_from_thread(self._set_scan_progress, pct, status) + continue + + except Exception: + log.warning( + "Firmware sweep failed at EL=%.1f, falling back", + el_val, + exc_info=True, + ) + use_firmware = False # Don't retry firmware for remaining rows. + + # Software fallback: point-by-point for this EL row. + try: + device.move_motor(1, el_val) + except Exception: + log.exception("EL move failed at EL=%.2f", el_val) + + for _az_idx, az_val in enumerate(az_values): + if not self._scanning or worker.is_cancelled or shutdown.is_set(): + self.app.call_from_thread(self._set_scan_status, "Scan stopped") + return + + try: + device.move_motor(0, az_val) + except Exception: + log.exception("AZ move failed at AZ=%.2f EL=%.2f", az_val, el_val) + continue + + shutdown.wait(0.3) + + try: + rssi_data = device.get_rssi(iterations) + rssi = float(rssi_data.get("average", 0)) + except Exception: + log.exception("get_rssi failed at AZ=%.2f EL=%.2f", az_val, el_val) + rssi = 0.0 + + self._scan_data.append((az_val, el_val, rssi)) + + grid_az = min( + int((az_val - az_start) / az_span_grid * heatmap.az_bins), + heatmap.az_bins - 1, + ) + grid_el = min( + int((el_val - el_start) / el_span_grid * heatmap.el_bins), + heatmap.el_bins - 1, + ) + + self.app.call_from_thread(heatmap.set_point, grid_az, grid_el, rssi) + self.app.call_from_thread(heatmap.set_active, grid_az, grid_el) + self.app.call_from_thread(spark.push, rssi) + + done += 1 + pct = int(done * 100 / total_points) + status_text = ( + f"AZ={az_val:.1f} EL={el_val:.1f} " + f"RSSI={rssi:.0f} [{done}/{total_points}]" + ) + self.app.call_from_thread(self._set_scan_progress, pct, status_text) + + msg = f"Scan complete -- {total_points} points" + self.app.call_from_thread(self._set_scan_status, msg) + + # -- Scan thread-safe callbacks -- + + def _set_scan_status(self, text: str) -> None: + self.query_one("#scan-status-text", Static).update(text) + + def _set_scan_progress(self, pct: int, status_text: str) -> None: + self.query_one("#scan-progress", ProgressBar).update(progress=pct) + self.query_one("#scan-status-text", Static).update(status_text) + + # -- Scan button handlers -- + + def _handle_scan_start(self) -> None: + if self._device is None: + self.app.notify("No device connected", severity="warning") + return + if self._scanning: + self.app.notify("Scan already in progress", severity="warning") + return + + heatmap = self.query_one("#heatmap", SkyHeatmap) + heatmap.clear() + self._scan_data.clear() + self.query_one("#scan-progress", ProgressBar).update(progress=0) + self._set_scan_status("Starting scan...") + + self._scanning = True + self._do_scan() + + def _handle_scan_stop(self) -> None: + self._scanning = False + self._set_scan_status("Stopping...") + + def _export_scan_csv(self) -> None: + if not self._scan_data: + self.app.notify("No scan data to export", severity="warning") + return + + output = Path("/tmp/birdcage_scan.csv") + try: + with output.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow(["az", "el", "rssi"]) + for az, el, rssi in self._scan_data: + writer.writerow([f"{az:.2f}", f"{el:.2f}", f"{rssi:.1f}"]) + self.app.notify(f"Exported {len(self._scan_data)} points to {output}") + except OSError as exc: + log.exception("Scan CSV export failed") + self.app.notify(f"Export failed: {exc}", severity="error") + + # ================================================================== + # Unified button dispatcher + # ================================================================== + + def on_button_pressed(self, event: Button.Pressed) -> None: + button_id = event.button.id or "" + + # Monitor buttons + if button_id == "btn-start": + self._handle_monitor_start() + elif button_id == "btn-stop": + self._handle_monitor_stop() + elif button_id == "btn-lna": + self._handle_lna() + elif button_id == "btn-reset-peak": + self._handle_reset_peak() + + # Sweep buttons + elif button_id == "btn-start-sweep": + self._handle_sweep_start() + elif button_id == "btn-stop-sweep": + self._handle_sweep_stop() + elif button_id == "btn-export-sweep": + self._export_sweep_csv() + + # Sky Map buttons + elif button_id == "btn-start-scan": + self._handle_scan_start() + elif button_id == "btn-stop-scan": + self._handle_scan_stop() + elif button_id == "btn-export-scan": + self._export_scan_csv()