"""F3 Scan Screen -- AZ sweep heatmap, sky mapping with configurable parameters. Grid-based sky scan: iterates over AZ/EL range, moves the dish to each point, reads RSSI, and paints the result into a 2D heatmap. Supports CSV export of raw (az, el, rssi) data for offline analysis. """ import csv import logging from pathlib import Path from textual import work from textual.containers import Container, Horizontal, Vertical from textual.widgets import Button, Input, ProgressBar, Static from textual.worker import get_current_worker from birdcage_tui.widgets.sky_heatmap import SkyHeatmap from birdcage_tui.widgets.sparkline_widget import SparklineWidget log = logging.getLogger(__name__) # Type alias -- SerialBridge and DemoDevice share the same duck-typed interface. DeviceLike = object class ScanScreen(Container): """F3: Sky scan and RF mapping.""" def __init__(self, **kwargs) -> None: super().__init__(**kwargs) self._device: DeviceLike | None = None self._scanning = False self._scan_data: list[tuple[float, float, float]] = [] def compose(self): with Container(classes="screen-container"): with Vertical(classes="panel"): yield Static("Sky Scan", classes="panel-title") yield SkyHeatmap(az_bins=40, el_bins=10, id="heatmap") yield SparklineWidget( max_points=80, label="Sweep RSSI", color="#00d4aa", id="sweep-spark" ) with Horizontal(classes="scan-status"): yield Static("Idle", id="scan-status-text") yield ProgressBar(id="scan-progress", total=100, show_eta=False) with Horizontal(classes="bottom-controls"): yield Static("AZ ", classes="label") yield Input(value="160", id="az-start", type="number") yield Static("-", classes="label") yield Input(value="220", id="az-end", type="number") yield Static(" Step ", classes="label") yield Input(value="1.5", id="az-step", type="number") yield Static(" EL ", classes="label") yield Input(value="18", id="el-start", type="number") yield Static("-", classes="label") yield Input(value="65", id="el-end", type="number") yield Static(" Step ", classes="label") yield Input(value="5.0", id="el-step", type="number") with Horizontal(classes="bottom-controls"): yield Static("Transponders ", classes="label") yield Input(value="3", id="xponder-input", type="integer") yield Button("Start Scan", id="btn-start-scan", variant="primary") yield Button("Stop", id="btn-stop-scan") yield Button("Export CSV", id="btn-export") # ------------------------------------------------------------------ # Device wiring # ------------------------------------------------------------------ def set_device(self, device: DeviceLike) -> None: """Store the device reference (SerialBridge or DemoDevice).""" self._device = device # ------------------------------------------------------------------ # Input helpers # ------------------------------------------------------------------ def _read_float(self, widget_id: str, fallback: float) -> float: """Read a float from an Input widget, returning *fallback* on parse error.""" try: return float(self.query_one(f"#{widget_id}", Input).value) except (ValueError, TypeError): return fallback def _read_int(self, widget_id: str, fallback: int) -> int: """Read an int from an Input widget, returning *fallback* on parse error.""" try: return int(self.query_one(f"#{widget_id}", Input).value) except (ValueError, TypeError): return fallback # ------------------------------------------------------------------ # Scan worker # ------------------------------------------------------------------ @work(thread=True) def _do_scan(self) -> None: """Execute the AZ/EL grid scan in a background thread.""" worker = get_current_worker() shutdown = self.app.shutdown_event device = self._device if device is None: return # Read scan parameters (widget access must happen via call_from_thread # for Input.value, but Textual Input.value is a reactive that is safe # to read from threads as a string snapshot). az_start = self._read_float("az-start", 160.0) az_end = self._read_float("az-end", 220.0) az_step = self._read_float("az-step", 1.5) el_start = self._read_float("el-start", 18.0) el_end = self._read_float("el-end", 65.0) el_step = self._read_float("el-step", 5.0) iterations = self._read_int("xponder-input", 3) # Clamp step sizes to something sane. if az_step <= 0: az_step = 1.0 if el_step <= 0: el_step = 1.0 # Build the grid point list. el_values: list[float] = [] el = el_start while el <= el_end + 1e-9: el_values.append(round(el, 2)) el += el_step az_values: list[float] = [] az = az_start while az <= az_end + 1e-9: az_values.append(round(az, 2)) az += az_step total_points = len(el_values) * len(az_values) if total_points == 0: self.app.call_from_thread( self._set_status, "No grid points -- check parameters" ) return heatmap = self.query_one("#heatmap", SkyHeatmap) spark = self.query_one("#sweep-spark", SparklineWidget) done = 0 for _el_idx, el_val in enumerate(el_values): for _az_idx, az_val in enumerate(az_values): if not self._scanning or worker.is_cancelled or shutdown.is_set(): self.app.call_from_thread(self._set_status, "Scan stopped") return # Move dish. try: device.move_to(az_val, el_val) except Exception: log.exception("move_to failed at AZ=%.2f EL=%.2f", az_val, el_val) msg = f"Move error at AZ={az_val:.1f} EL={el_val:.1f}" self.app.call_from_thread(self._set_status, msg) continue # Settle time -- let the motor stop and vibrations damp. shutdown.wait(0.3) # Read signal. try: rssi_data = device.get_rssi(iterations) rssi = float(rssi_data.get("average", 0)) except Exception: log.exception("get_rssi failed at AZ=%.2f EL=%.2f", az_val, el_val) rssi = 0.0 # Record raw data. self._scan_data.append((az_val, el_val, rssi)) # Map to heatmap grid indices -- fit into fixed-size bins. az_span = az_end - az_start + 1e-9 el_span = el_end - el_start + 1e-9 grid_az = min( int((az_val - az_start) / az_span * heatmap.az_bins), heatmap.az_bins - 1, ) grid_el = min( int((el_val - el_start) / el_span * heatmap.el_bins), heatmap.el_bins - 1, ) # Update widgets. self.app.call_from_thread(heatmap.set_point, grid_az, grid_el, rssi) self.app.call_from_thread(heatmap.set_active, grid_az, grid_el) self.app.call_from_thread(spark.push, rssi) done += 1 pct = int(done * 100 / total_points) status_text = ( f"Scanning AZ={az_val:.1f} EL={el_val:.1f} " f"RSSI={rssi:.0f} [{done}/{total_points}]" ) self.app.call_from_thread(self._set_progress, pct, status_text) msg = f"Scan complete -- {total_points} points" self.app.call_from_thread(self._set_status, msg) # ------------------------------------------------------------------ # Widget update helpers (called via call_from_thread) # ------------------------------------------------------------------ def _set_status(self, text: str) -> None: """Update the scan status text label.""" self.query_one("#scan-status-text", Static).update(text) def _set_progress(self, pct: int, status_text: str) -> None: """Update both progress bar and status text.""" self.query_one("#scan-progress", ProgressBar).update(progress=pct) self.query_one("#scan-status-text", Static).update(status_text) # ------------------------------------------------------------------ # Button handlers # ------------------------------------------------------------------ def on_button_pressed(self, event: Button.Pressed) -> None: button_id = event.button.id or "" if button_id == "btn-start-scan": self._start_scan() elif button_id == "btn-stop-scan": self._stop_scan() elif button_id == "btn-export": self._export_csv() def _start_scan(self) -> None: """Clear state and kick off the scan worker.""" if self._device is None: self.app.notify("No device connected", severity="warning") return if self._scanning: self.app.notify("Scan already in progress", severity="warning") return # Reset. heatmap = self.query_one("#heatmap", SkyHeatmap) heatmap.clear() self._scan_data.clear() self.query_one("#scan-progress", ProgressBar).update(progress=0) self._set_status("Starting scan...") self._scanning = True self._do_scan() def _stop_scan(self) -> None: """Signal the scan worker to stop.""" self._scanning = False self._set_status("Stopping...") def _export_csv(self) -> None: """Write scan data to /tmp/birdcage_scan.csv.""" if not self._scan_data: self.app.notify("No scan data to export", severity="warning") return output = Path("/tmp/birdcage_scan.csv") try: with output.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow(["az", "el", "rssi"]) for az, el, rssi in self._scan_data: writer.writerow([f"{az:.2f}", f"{el:.2f}", f"{rssi:.1f}"]) self.app.notify(f"Exported {len(self._scan_data)} points to {output}") except OSError as exc: log.exception("CSV export failed") self.app.notify(f"Export failed: {exc}", severity="error")