Wire firmware-accelerated AZ sweep via azscanwxp

Adds send_with_timeout() to CarryoutG2Protocol for long-running
commands, and az_sweep_firmware() to both SerialBridge and DemoDevice.
Sweep and Sky Map modes now try the firmware path first (single
azscanwxp command, streaming results) and fall back to software
step-dwell-measure on error or when "Software mode" checkbox is
checked. Software sweep fixed to set EL once and move AZ only.
This commit is contained in:
Ryan Malloy 2026-02-14 16:40:53 -07:00
parent ba8859cc31
commit 3cd6424168
4 changed files with 973 additions and 69 deletions

View File

@ -283,6 +283,38 @@ class CarryoutG2Protocol(FirmwareProtocol):
time.sleep(0.001) # brief settle before next command
return resp_data.decode("utf-8", errors="ignore")
def _probe_prompt(self) -> str:
"""Send bare CR and read the prompt string.
Returns the prompt text (e.g. ``'\\r\\nTRK>'`` or ``'\\r\\nMOT>'``).
Used to detect which submenu the firmware is in without sending
a command that could have side effects.
"""
if not self._serial:
raise RuntimeError("Not connected")
self._serial.write(b"\r")
resp = bytearray()
while True:
byte = self._serial.read(1)
if len(byte) == 0:
break # timeout — return what we have
resp.append(byte[0])
if byte[0] == self.PROMPT_CHAR:
break
return resp.decode("utf-8", errors="ignore")
def reset_to_root(self) -> None:
"""Return to TRK> root without killing the shell.
At TRK>, the ``q`` command terminates the UART shell entirely
(requires power cycle). This probes the current prompt first
and only sends ``q`` if we're in a submenu.
"""
prompt = self._probe_prompt()
if "TRK>" in prompt:
return # already at root
self._send("q")
def initialize(self, callback: Callable[[str], None] | None = None) -> None:
"""Prepare G2 for motor commands.
@ -293,22 +325,18 @@ class CarryoutG2Protocol(FirmwareProtocol):
logger.info(
"Initializing Carryout G2 (tracker must be pre-disabled via NVS 20)"
)
self._send("q")
self.reset_to_root()
self.enter_motor_menu()
logger.info("Carryout G2 initialized and ready")
def enter_motor_menu(self) -> None:
self._send("q")
self.reset_to_root()
self._send(self.MOTOR_COMMAND)
def kill_search(self) -> None:
"""No-op — G2 search is disabled permanently via NVS index 20."""
logger.debug("G2 search kill is a no-op (NVS 20 disables tracker)")
def reset_to_root(self) -> None:
"""Return to the firmware root menu."""
self._send("q")
def get_position(self) -> Position:
"""Query dish position.
@ -351,7 +379,7 @@ class CarryoutG2Protocol(FirmwareProtocol):
def enter_dvb_menu(self) -> None:
"""Enter DVB signal analysis submenu (must be at root menu)."""
self._send("q") # ensure root
self.reset_to_root()
self._send("dvb")
def enable_lna(self) -> None:
@ -392,6 +420,21 @@ class CarryoutG2Protocol(FirmwareProtocol):
raise ValueError(f"Could not parse RSSI from: {response!r}")
def send_with_timeout(self, cmd: str, timeout: float = 90) -> str:
"""Send a command with a custom serial timeout.
Used for long-running firmware commands (azscanwxp, azscan) that stream
output over tens of seconds before the final prompt.
"""
if not self._serial:
raise RuntimeError("Not connected")
original = self._serial.timeout
self._serial.timeout = timeout
try:
return self._send(cmd)
finally:
self._serial.timeout = original
def send_raw(self, cmd: str) -> str:
"""Send arbitrary command, return raw prompt-terminated response."""
return self._send(cmd)

View File

@ -66,6 +66,32 @@ class SerialBridge:
self._menu = Menu.UNKNOWN
self._connected = False
# ------------------------------------------------------------------
# Menu prompt → string mapping for status display
# ------------------------------------------------------------------
_MENU_PROMPTS: dict[Menu, str] = {
Menu.ROOT: "TRK>",
Menu.MOT: "MOT>",
Menu.DVB: "DVB>",
Menu.NVS: "NVS>",
Menu.A3981: "A3981>",
Menu.ADC: "ADC>",
Menu.OS: "OS>",
Menu.STEP: "STEP>",
Menu.PEAK: "PEAK>",
Menu.EEPROM: "EE>",
Menu.GPIO: "GPIO>",
Menu.LATLON: "LATLON>",
Menu.DIPSWITCH: "DIPSWITCH>",
Menu.UNKNOWN: "???",
}
@property
def current_menu(self) -> str:
"""Current firmware prompt string for status display."""
return self._MENU_PROMPTS.get(self._menu, "???")
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@ -82,6 +108,21 @@ class SerialBridge:
self._proto.reset_to_root()
self._menu = Menu.ROOT
def _detect_menu(self) -> Menu:
"""Probe firmware prompt and return the corresponding Menu enum.
Caller must hold ``_lock``.
"""
prompt = self._proto._probe_prompt()
upper = prompt.upper()
# Check against known prompt strings (handles EE> vs EEPROM>, etc.)
for menu, prompt_str in self._MENU_PROMPTS.items():
if menu == Menu.UNKNOWN:
continue
if prompt_str.upper() in upper:
return menu
return Menu.UNKNOWN
def _ensure_menu(self, target: Menu) -> None:
"""Navigate to *target* submenu if not already there.
@ -114,7 +155,7 @@ class SerialBridge:
with self._lock:
self._proto.connect(port, baudrate)
self._connected = True
self._menu = Menu.UNKNOWN
self._menu = self._detect_menu()
def disconnect(self) -> None:
"""Close the serial connection."""
@ -139,7 +180,8 @@ class SerialBridge:
with self._lock:
if not skip_init:
self._proto.initialize()
self._menu = Menu.MOT # initialize() ends in MOT>
self._menu = Menu.MOT # initialize() ends in MOT>
# else: leave _menu as whatever connect() detected
# ------------------------------------------------------------------
# Motor (MOT>)
@ -296,6 +338,116 @@ class SerialBridge:
return result
def get_pid_gains(self) -> dict[str, dict[str, float]]:
"""Read PID gains for both motor axes.
Firmware returns: ``Kp=600 Kv=60 Ki=1`` per motor.
Returns:
``{"az": {"kp": 600, "kv": 60, "ki": 1},
"el": {"kp": 250, "kv": 50, "ki": 1}}``
"""
with self._lock:
self._ensure_menu(Menu.MOT)
response = self._send("pid")
# Parse "Kp=600 Kv=60 Ki=1" patterns. The pid command without args
# shows both motors. We look for two sets of Kp/Kv/Ki values.
kp_matches = re.findall(r"Kp[=:]?\s*(\d+)", response)
kv_matches = re.findall(r"Kv[=:]?\s*(\d+)", response)
ki_matches = re.findall(r"Ki[=:]?\s*(\d+)", response)
result = {
"az": {"kp": 600.0, "kv": 60.0, "ki": 1.0},
"el": {"kp": 250.0, "kv": 50.0, "ki": 1.0},
}
if len(kp_matches) >= 2:
result["az"]["kp"] = float(kp_matches[0])
result["el"]["kp"] = float(kp_matches[1])
elif len(kp_matches) == 1:
result["az"]["kp"] = float(kp_matches[0])
if len(kv_matches) >= 2:
result["az"]["kv"] = float(kv_matches[0])
result["el"]["kv"] = float(kv_matches[1])
elif len(kv_matches) == 1:
result["az"]["kv"] = float(kv_matches[0])
if len(ki_matches) >= 2:
result["az"]["ki"] = float(ki_matches[0])
result["el"]["ki"] = float(ki_matches[1])
elif len(ki_matches) == 1:
result["az"]["ki"] = float(ki_matches[0])
return result
def set_pid_gains(self, motor_id: int, kp: float, kv: float, ki: float) -> None:
"""Write PID gains for a single motor axis.
Args:
motor_id: 0 for AZ, 1 for EL.
kp: Proportional gain.
kv: Velocity gain.
ki: Integral gain.
"""
with self._lock:
self._ensure_menu(Menu.MOT)
self._send(f"pid {motor_id} {int(kp)} {int(kv)} {int(ki)}")
def az_sweep_firmware(
self,
start_az: float,
span: float,
step_cdeg: int,
num_xponders: int,
timeout: float = 120,
) -> list[dict[str, float]]:
"""Execute a firmware-accelerated AZ sweep via azscanwxp.
Moves to *start_az* first, then runs the firmware sweep command which
handles motor movement and RSSI measurement atomically no per-point
serial round-trips.
Args:
start_az: Starting azimuth in degrees.
span: Total sweep width in degrees.
step_cdeg: Step size in centidegrees (100 = 1.00°).
num_xponders: Number of transponders to cycle per position.
timeout: Serial read timeout for the long-running command.
Returns:
List of dicts with keys: az, rssi, lock, snr.
"""
with self._lock:
self._ensure_menu(Menu.MOT)
# Move to start position and wait for prompt.
self._send(f"a 0 {start_az}")
# Execute firmware sweep with extended timeout.
response = self._proto.send_with_timeout(
f"azscanwxp 0 {span} {step_cdeg} {num_xponders}",
timeout=timeout,
)
# Parse streaming output lines.
# Motor:<id> Angle:<cdeg> RSSI:<adc> Lock:<0/1> SNR:<dB>
results: list[dict[str, float]] = []
for match in re.finditer(
r"Angle:(-?\d+)\s+RSSI:(\d+)\s+Lock:(\d)\s+SNR:(-?\d+\.?\d*)",
response,
):
results.append(
{
"az": int(match.group(1)) / 100.0,
"rssi": float(match.group(2)),
"lock": float(match.group(3)),
"snr": float(match.group(4)),
}
)
logger.info("Firmware sweep returned %d points", len(results))
return results
# ------------------------------------------------------------------
# Signal (DVB>)
# ------------------------------------------------------------------

View File

@ -333,6 +333,53 @@ class DemoDevice:
"el_steps": int(self._el * 24960 / 360),
}
def get_pid_gains(self) -> dict[str, dict[str, float]]:
return {
"az": {"kp": 600.0, "kv": 60.0, "ki": 1.0},
"el": {"kp": 250.0, "kv": 50.0, "ki": 1.0},
}
def set_pid_gains(self, motor_id: int, kp: float, kv: float, ki: float) -> None:
pass # No-op in demo mode.
def az_sweep_firmware(
self,
start_az: float,
span: float,
step_cdeg: int,
num_xponders: int,
timeout: float = 120,
) -> list[dict[str, float]]:
"""Simulate a firmware azscanwxp sweep with Gaussian signal peak."""
step_deg = step_cdeg / 100.0
if step_deg <= 0:
step_deg = 1.0
results: list[dict[str, float]] = []
az = start_az
end_az = start_az + span
while az <= end_az + 1e-9:
dist_sq = (az - _SAT_AZ) ** 2 + (self._el - _SAT_EL) ** 2
signal = _RSSI_PEAK * math.exp(-dist_sq / _RSSI_BEAM_WIDTH)
rssi = _RSSI_NOISE_FLOOR + signal + random.gauss(0.0, 30.0)
locked = 1 if rssi > 1500 else 0
snr = max(0.0, (rssi - _RSSI_NOISE_FLOOR) / 50.0) + random.gauss(0.0, 0.5)
results.append(
{
"az": round(az, 2),
"rssi": round(rssi),
"lock": float(locked),
"snr": round(max(0.0, snr), 1),
}
)
az += step_deg
# Brief delay to simulate firmware execution time.
time.sleep(0.5)
self._target_az = end_az
self._last_move_time = time.monotonic()
return results
# ------------------------------------------------------------------
# Signal (DVB>)
# ------------------------------------------------------------------
@ -530,6 +577,16 @@ class DemoDevice:
az_steps = int(self._az * 40000 / 360)
el_steps = int(self._el * 24960 / 360)
return f"Position[0] = {az_steps} Position[1] = {el_steps}\nMOT>"
if cmd == "pid" or cmd.startswith("pid "):
parts = cmd.split()
if len(parts) == 1:
return (
"Motor 0: Kp=600 Kv=60 Ki=1\nMotor 1: Kp=250 Kv=50 Ki=1\nMOT>"
)
elif len(parts) >= 4:
motor_id = parts[1]
return f"PID set for motor {motor_id}\nMOT>"
return "Usage: pid [motor] [Kp] [Kv] [Ki]\nMOT>"
return f"Unknown command: {cmd}\nMOT>"
def _handle_dvb(self, cmd: str) -> str:

View File

@ -1,83 +1,261 @@
"""F2 Signal screen -- RSSI monitoring, sparklines, LNB control.
"""F3 Signal screen -- Find and Measure.
Widget container for ContentSwitcher. Provides start/stop signal
monitoring with configurable iteration count and poll rate, dual
sparklines for DVB and ADC RSSI, peak tracking, and LNA toggle.
Three sub-modes via ModeBar + ContentSwitcher:
Monitor -- RSSI monitoring with dual sparklines, signal gauge, receiver info
Sweep -- 1D azimuth-vs-RSSI bar chart across an AZ range at fixed EL
Sky Map -- 2D AZ x EL grid scan with heatmap visualization and CSV export
Merges the old Signal + Scan screens into a single tab.
"""
import contextlib
import csv
import logging
import re
from pathlib import Path
from textual import work
from textual.app import ComposeResult
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import Button, Input, Static
from textual.worker import Worker
from textual.widgets import (
Button,
Checkbox,
ContentSwitcher,
Input,
ProgressBar,
Static,
)
from textual.worker import Worker, get_current_worker
from birdcage_tui.widgets.mode_bar import ModeBar
from birdcage_tui.widgets.receiver_info import ReceiverInfo
from birdcage_tui.widgets.signal_gauge import SignalGauge
from birdcage_tui.widgets.sky_heatmap import SkyHeatmap
from birdcage_tui.widgets.sparkline_widget import SparklineWidget
from birdcage_tui.widgets.status_strip import StatusStrip
from birdcage_tui.widgets.sweep_plot import SweepPlot
log = logging.getLogger(__name__)
# Type alias -- SerialBridge and DemoDevice share the same duck-typed interface.
DeviceLike = object
class SignalScreen(Container):
"""F2: Signal monitoring and RSSI display."""
"""F3: Find and Measure -- signal monitoring, sweep, and sky mapping."""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._device: object = None
self._device: DeviceLike | None = None
# Monitor state
self._monitoring = False
self._lna_enabled = False
self._peak_rssi = 0
self._total_samples = 0
self._signal_worker: Worker | None = None
self._receiver_loaded = False
# Sweep state
self._sweeping = False
self._sweep_data: list[tuple[float, float]] = []
# Sky Map state
self._scanning = False
self._scan_data: list[tuple[float, float, float]] = []
# ------------------------------------------------------------------
# Compose
# ------------------------------------------------------------------
def compose(self) -> ComposeResult:
with Container(classes="screen-container"):
with Vertical(classes="panel"):
yield Static("Signal Strength", classes="panel-title")
yield SignalGauge(id="signal-gauge")
with Vertical():
yield SparklineWidget(
max_points=80, label="DVB RSSI", color="#00d4aa", id="dvb-spark"
)
yield SparklineWidget(
max_points=80, label="ADC RSSI", color="#2080d0", id="adc-spark"
)
with Horizontal(classes="panel"):
yield Static("Samples: 0", id="sample-count", classes="label")
yield Static(" Peak: 0", id="peak-value", classes="label")
yield Static(" LNA: OFF", id="lna-status", classes="label")
yield Static(" Lock: NO", id="lock-status", classes="label")
with Horizontal(classes="bottom-controls"):
yield Static("Iters ", classes="label")
yield Input(value="10", id="iter-input", type="integer")
yield Static(" Rate ", classes="label")
yield Input(value="2", id="rate-input", type="integer")
yield Button("Start", id="btn-start", variant="primary")
yield Button("Stop", id="btn-stop")
yield Button("Enable LNA", id="btn-lna")
yield ModeBar(
modes={
"monitor": "Monitor",
"sweep": "Sweep",
"skymap": "Sky Map",
},
initial="monitor",
classes="mode-bar",
)
with ContentSwitcher(id="signal-modes", initial="monitor"):
# -- Monitor mode -----------------------------------------
with Container(id="monitor"):
with Horizontal(classes="top-row"):
with Vertical(classes="panel"):
yield Static("Signal Strength", classes="panel-title")
yield SignalGauge(id="signal-gauge")
yield SparklineWidget(
max_points=80,
label="DVB RSSI",
color="#00d4aa",
id="dvb-spark",
)
yield SparklineWidget(
max_points=80,
label="ADC RSSI",
color="#2080d0",
id="adc-spark",
)
with Vertical(classes="panel"):
yield Static("Receiver", classes="panel-title")
yield ReceiverInfo(id="receiver-info")
with Horizontal(classes="panel"):
yield Static("Samples: 0", id="sample-count", classes="label")
yield Static(" Peak: 0", id="peak-value", classes="label")
yield Static(" LNA: OFF", id="lna-status", classes="label")
yield Static(" Lock: NO", id="lock-status", classes="label")
with Horizontal(classes="bottom-controls"):
yield Static("Iters ", classes="label")
yield Input(value="10", id="iter-input", type="integer")
yield Static(" Rate ", classes="label")
yield Input(value="2", id="rate-input", type="integer")
yield Static(" Hz", classes="label")
yield Button("Start", id="btn-start", variant="primary")
yield Button("Stop", id="btn-stop")
yield Button("Enable LNA", id="btn-lna")
yield Button("Reset Peak", id="btn-reset-peak")
# -- Sweep mode -------------------------------------------
with Container(id="sweep"):
with Vertical(classes="panel"):
yield Static("AZ Sweep", classes="panel-title")
yield SweepPlot(id="sweep-plot")
with Horizontal(classes="scan-status"):
yield Static("Idle", id="sweep-status-text")
yield ProgressBar(
id="sweep-progress", total=100, show_eta=False
)
with Horizontal(classes="bottom-controls"):
yield Static("AZ Start ", classes="label")
yield Input(value="160", id="sweep-az-start", type="number")
yield Static(" AZ End ", classes="label")
yield Input(value="220", id="sweep-az-end", type="number")
yield Static(" Step ", classes="label")
yield Input(value="1.5", id="sweep-az-step", type="number")
yield Static(" EL (fixed) ", classes="label")
yield Input(value="38.0", id="sweep-el-fixed", type="number")
yield Static(" Iters ", classes="label")
yield Input(value="10", id="sweep-iters", type="integer")
with Horizontal(classes="bottom-controls"):
yield Button(
"Start Sweep", id="btn-start-sweep", variant="primary"
)
yield Button("Stop", id="btn-stop-sweep")
yield Button("Export CSV", id="btn-export-sweep")
yield Checkbox(
"Software mode",
id="sweep-software-mode",
value=False,
)
# -- Sky Map mode -----------------------------------------
with Container(id="skymap"):
with Vertical(classes="panel"):
yield Static("Sky Scan", classes="panel-title")
yield SkyHeatmap(az_bins=40, el_bins=10, id="heatmap")
yield SparklineWidget(
max_points=80,
label="Sweep RSSI",
color="#00d4aa",
id="scan-spark",
)
with Horizontal(classes="scan-status"):
yield Static("Idle", id="scan-status-text")
yield ProgressBar(id="scan-progress", total=100, show_eta=False)
with Horizontal(classes="bottom-controls"):
yield Static("AZ ", classes="label")
yield Input(value="160", id="scan-az-start", type="number")
yield Static("-", classes="label")
yield Input(value="220", id="scan-az-end", type="number")
yield Static(" Step ", classes="label")
yield Input(value="1.5", id="scan-az-step", type="number")
yield Static(" EL ", classes="label")
yield Input(value="18", id="scan-el-start", type="number")
yield Static("-", classes="label")
yield Input(value="65", id="scan-el-end", type="number")
yield Static(" Step ", classes="label")
yield Input(value="5.0", id="scan-el-step", type="number")
with Horizontal(classes="bottom-controls"):
yield Static("Transponders ", classes="label")
yield Input(value="3", id="xponder-input", type="integer")
yield Button(
"Start Scan", id="btn-start-scan", variant="primary"
)
yield Button("Stop", id="btn-stop-scan")
yield Button("Export CSV", id="btn-export-scan")
yield Checkbox(
"Software mode",
id="scan-software-mode",
value=False,
)
# ------------------------------------------------------------------
# Device lifecycle
# ------------------------------------------------------------------
def set_device(self, device: object) -> None:
def set_device(self, device: DeviceLike) -> None:
"""Store the device reference."""
self._device = device
def on_show(self) -> None:
"""Called when this screen becomes visible."""
pass # Monitoring is explicit via Start/Stop buttons.
"""Load receiver info on first show (requires device)."""
if not self._receiver_loaded and self._device is not None:
self._load_receiver_info()
def on_position_update(self, az: float, el: float) -> None:
"""Called by app-level position poll. Signal screen doesn't use this."""
def on_unmount(self) -> None:
"""Stop monitoring thread on teardown."""
"""Stop all workers on teardown."""
self._monitoring = False
self._sweeping = False
self._scanning = False
# ------------------------------------------------------------------
# Signal poll worker
# ModeBar switching
# ------------------------------------------------------------------
def on_mode_bar_mode_changed(self, event: ModeBar.ModeChanged) -> None:
"""Switch ContentSwitcher when ModeBar selection changes."""
switcher = self.query_one("#signal-modes", ContentSwitcher)
switcher.current = event.mode
def switch_mode(self, mode_key: str) -> None:
"""Programmatic mode switch (called by app.py for QuickActions)."""
switcher = self.query_one("#signal-modes", ContentSwitcher)
switcher.current = mode_key
# Update the ModeBar visual state to match.
mode_bar = self.query_one(ModeBar)
for btn in mode_bar.query(".mode-btn"):
btn.remove_class("active")
with contextlib.suppress(Exception):
mode_bar.query_one(f"#mode-{mode_key}").add_class("active")
# ------------------------------------------------------------------
# Input helpers
# ------------------------------------------------------------------
def _read_float(self, widget_id: str, fallback: float) -> float:
"""Read a float from an Input widget, returning *fallback* on error."""
try:
return float(self.query_one(f"#{widget_id}", Input).value)
except (ValueError, TypeError):
return fallback
def _read_int(self, widget_id: str, fallback: int) -> int:
"""Read an int from an Input widget, returning *fallback* on error."""
try:
return int(self.query_one(f"#{widget_id}", Input).value)
except (ValueError, TypeError):
return fallback
# ==================================================================
# MONITOR MODE
# ==================================================================
@work(thread=True, exclusive=True, group="signal-poll")
def _do_signal_poll(self) -> None:
"""Poll RSSI at the configured rate while monitoring is active."""
@ -117,7 +295,8 @@ class SignalScreen(Container):
self.app.call_from_thread(self._update_gauge, rssi_avg, rssi_cur, reads)
self.app.call_from_thread(self._push_dvb_spark, float(rssi_avg))
self.app.call_from_thread(self._update_stats)
self.app.call_from_thread(self._update_monitor_stats)
self.app.call_from_thread(self._update_status_strip_rssi, rssi_avg)
except Exception:
log.debug("DVB RSSI poll failed", exc_info=True)
@ -143,9 +322,7 @@ class SignalScreen(Container):
shutdown.wait(1.0 / rate)
# ------------------------------------------------------------------
# Thread-safe widget update callbacks
# ------------------------------------------------------------------
# -- Monitor thread-safe callbacks --
def _read_input(self, input_id: str) -> str:
"""Read an Input widget's value (must run on main thread)."""
@ -163,7 +340,7 @@ class SignalScreen(Container):
def _push_adc_spark(self, value: float) -> None:
self.query_one("#adc-spark", SparklineWidget).push(value)
def _update_stats(self) -> None:
def _update_monitor_stats(self) -> None:
self.query_one("#sample-count", Static).update(
f"Samples: {self._total_samples}"
)
@ -177,26 +354,17 @@ class SignalScreen(Container):
label = "ON" if self._lna_enabled else "OFF"
self.query_one("#lna-status", Static).update(f" LNA: {label}")
# ------------------------------------------------------------------
# Button handlers
# ------------------------------------------------------------------
def _update_status_strip_rssi(self, rssi_avg: int) -> None:
"""Push RSSI to the app-level StatusStrip."""
with contextlib.suppress(Exception):
self.app.query_one("#status-strip", StatusStrip).rssi = rssi_avg
def on_button_pressed(self, event: Button.Pressed) -> None:
button_id = event.button.id or ""
# -- Monitor button handlers --
if button_id == "btn-start":
self._handle_start()
elif button_id == "btn-stop":
self._handle_stop()
elif button_id == "btn-lna":
self._handle_lna()
def _handle_start(self) -> None:
"""Start signal monitoring."""
def _handle_monitor_start(self) -> None:
if self._device is None:
self.app.notify("No device connected", severity="warning")
return
if self._monitoring:
return
@ -207,8 +375,7 @@ class SignalScreen(Container):
self.query_one("#btn-start", Button).variant = "default"
self.query_one("#btn-stop", Button).variant = "warning"
def _handle_stop(self) -> None:
"""Stop signal monitoring."""
def _handle_monitor_stop(self) -> None:
self._monitoring = False
self.app.notify("Signal monitoring stopped")
@ -216,7 +383,6 @@ class SignalScreen(Container):
self.query_one("#btn-stop", Button).variant = "default"
def _handle_lna(self) -> None:
"""Toggle LNA enable (sends lnbdc odu to set 13V)."""
if self._device is None:
self.app.notify("No device connected", severity="warning")
return
@ -224,7 +390,6 @@ class SignalScreen(Container):
@work(thread=True, exclusive=False, group="signal-cmd")
def _do_enable_lna(self) -> None:
"""Enable LNA in a worker thread (blocks on serial I/O)."""
try:
self._device.enable_lna()
self._lna_enabled = True
@ -235,3 +400,490 @@ class SignalScreen(Container):
self.app.call_from_thread(
self.app.notify, "LNA enable failed", severity="error"
)
def _handle_reset_peak(self) -> None:
self._peak_rssi = 0
self._total_samples = 0
self._update_monitor_stats()
self.app.notify("Peak and sample counters reset")
@work(thread=True, exclusive=False, group="signal-cmd")
def _load_receiver_info(self) -> None:
"""Fetch channel params and DVB config in a worker thread."""
if self._device is None:
return
try:
channel = self._device.get_channel_params()
config = self._device.get_dvb_config()
self._receiver_loaded = True
self.app.call_from_thread(self._apply_receiver_info, channel, config)
except Exception:
log.debug("Receiver info load failed", exc_info=True)
def _apply_receiver_info(self, channel: str, config: str) -> None:
self.query_one("#receiver-info", ReceiverInfo).load_data(channel, config)
# ==================================================================
# SWEEP MODE (1D AZ scan)
# ==================================================================
@work(thread=True)
def _do_sweep(self) -> None:
"""Execute a 1D AZ sweep -- firmware-accelerated or software fallback."""
device = self._device
if device is None:
return
# Check if user forced software mode.
force_software = self.query_one("#sweep-software-mode", Checkbox).value
if not force_software and hasattr(device, "az_sweep_firmware"):
try:
self._do_sweep_firmware(device)
return
except Exception:
log.warning(
"Firmware sweep failed, falling back to software",
exc_info=True,
)
self.app.call_from_thread(
self._set_sweep_status, "Firmware sweep failed -- falling back..."
)
self._do_sweep_software(device)
def _do_sweep_firmware(self, device: DeviceLike) -> None:
"""Firmware-accelerated sweep via azscanwxp (runs in worker thread)."""
shutdown = self.app.shutdown_event
az_start = self._read_float("sweep-az-start", 160.0)
az_end = self._read_float("sweep-az-end", 220.0)
az_step = self._read_float("sweep-az-step", 1.5)
el_fixed = self._read_float("sweep-el-fixed", 38.0)
iterations = self._read_int("sweep-iters", 10)
span = az_end - az_start
if span <= 0 or az_step <= 0:
self.app.call_from_thread(
self._set_sweep_status, "Invalid sweep range -- check parameters"
)
return
step_cdeg = max(1, int(az_step * 100))
# Set EL once.
self.app.call_from_thread(self._set_sweep_status, "Setting elevation...")
device.move_motor(1, el_fixed)
# Let EL settle briefly.
shutdown.wait(0.5)
if not self._sweeping or shutdown.is_set():
return
self.app.call_from_thread(
self._set_sweep_status, "Firmware scan in progress..."
)
self.app.call_from_thread(
self._set_sweep_progress, 10, "Firmware scan in progress..."
)
results = device.az_sweep_firmware(
start_az=az_start,
span=span,
step_cdeg=step_cdeg,
num_xponders=iterations,
)
if not results:
self.app.call_from_thread(
self._set_sweep_status, "Firmware sweep returned no data"
)
return
# Populate the plot with all points at once.
sweep_plot = self.query_one("#sweep-plot", SweepPlot)
for pt in results:
az_val = pt["az"]
rssi = pt["rssi"]
self._sweep_data.append((az_val, rssi))
self.app.call_from_thread(sweep_plot.add_point, az_val, rssi)
total = len(results)
msg = f"Sweep complete -- {total} points (firmware)"
self.app.call_from_thread(self._set_sweep_progress, 100, msg)
self.app.call_from_thread(
self._set_sweep_status, f"Sweep complete -- {total} points (firmware)"
)
def _do_sweep_software(self, device: DeviceLike) -> None:
"""Software step-dwell-measure sweep (runs in worker thread)."""
worker = get_current_worker()
shutdown = self.app.shutdown_event
az_start = self._read_float("sweep-az-start", 160.0)
az_end = self._read_float("sweep-az-end", 220.0)
az_step = self._read_float("sweep-az-step", 1.5)
el_fixed = self._read_float("sweep-el-fixed", 38.0)
iterations = self._read_int("sweep-iters", 10)
if az_step <= 0:
az_step = 1.0
# Build AZ point list.
az_values: list[float] = []
az = az_start
while az <= az_end + 1e-9:
az_values.append(round(az, 2))
az += az_step
total = len(az_values)
if total == 0:
self.app.call_from_thread(
self._set_sweep_status, "No sweep points -- check parameters"
)
return
sweep_plot = self.query_one("#sweep-plot", SweepPlot)
# Set EL once at start, then only move AZ per point.
try:
device.move_motor(1, el_fixed)
except Exception:
log.exception("EL move failed")
for idx, az_val in enumerate(az_values):
if not self._sweeping or worker.is_cancelled or shutdown.is_set():
self.app.call_from_thread(self._set_sweep_status, "Sweep stopped")
return
# Move AZ only (EL already set).
try:
device.move_motor(0, az_val)
except Exception:
log.exception("AZ move failed at %.2f", az_val)
msg = f"Move error at AZ={az_val:.1f}"
self.app.call_from_thread(self._set_sweep_status, msg)
continue
# Settle time.
shutdown.wait(0.3)
# Read signal.
try:
rssi_data = device.get_rssi(iterations)
rssi = float(rssi_data.get("average", 0))
except Exception:
log.exception("get_rssi failed at AZ=%.2f", az_val)
rssi = 0.0
self._sweep_data.append((az_val, rssi))
# Update widgets.
self.app.call_from_thread(sweep_plot.add_point, az_val, rssi)
self.app.call_from_thread(sweep_plot.set_active, az_val)
done = idx + 1
pct = int(done * 100 / total)
status = f"AZ={az_val:.1f} RSSI={rssi:.0f} [{done}/{total}]"
self.app.call_from_thread(self._set_sweep_progress, pct, status)
self.app.call_from_thread(
self._set_sweep_status, f"Sweep complete -- {total} points"
)
# -- Sweep thread-safe callbacks --
def _set_sweep_status(self, text: str) -> None:
self.query_one("#sweep-status-text", Static).update(text)
def _set_sweep_progress(self, pct: int, status_text: str) -> None:
self.query_one("#sweep-progress", ProgressBar).update(progress=pct)
self.query_one("#sweep-status-text", Static).update(status_text)
# -- Sweep button handlers --
def _handle_sweep_start(self) -> None:
if self._device is None:
self.app.notify("No device connected", severity="warning")
return
if self._sweeping:
self.app.notify("Sweep already in progress", severity="warning")
return
sweep_plot = self.query_one("#sweep-plot", SweepPlot)
sweep_plot.clear()
self._sweep_data.clear()
self.query_one("#sweep-progress", ProgressBar).update(progress=0)
self._set_sweep_status("Starting sweep...")
self._sweeping = True
self._do_sweep()
def _handle_sweep_stop(self) -> None:
self._sweeping = False
self._set_sweep_status("Stopping...")
def _export_sweep_csv(self) -> None:
if not self._sweep_data:
self.app.notify("No sweep data to export", severity="warning")
return
output = Path("/tmp/birdcage_sweep.csv")
try:
with output.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(["az", "rssi"])
for az, rssi in self._sweep_data:
writer.writerow([f"{az:.2f}", f"{rssi:.1f}"])
self.app.notify(f"Exported {len(self._sweep_data)} points to {output}")
except OSError as exc:
log.exception("Sweep CSV export failed")
self.app.notify(f"Export failed: {exc}", severity="error")
# ==================================================================
# SKY MAP MODE (2D AZ x EL scan)
# ==================================================================
@work(thread=True)
def _do_scan(self) -> None:
"""Execute the AZ/EL grid scan in a background thread."""
worker = get_current_worker()
shutdown = self.app.shutdown_event
device = self._device
if device is None:
return
az_start = self._read_float("scan-az-start", 160.0)
az_end = self._read_float("scan-az-end", 220.0)
az_step = self._read_float("scan-az-step", 1.5)
el_start = self._read_float("scan-el-start", 18.0)
el_end = self._read_float("scan-el-end", 65.0)
el_step = self._read_float("scan-el-step", 5.0)
iterations = self._read_int("xponder-input", 3)
if az_step <= 0:
az_step = 1.0
if el_step <= 0:
el_step = 1.0
# Build the grid point lists.
el_values: list[float] = []
el = el_start
while el <= el_end + 1e-9:
el_values.append(round(el, 2))
el += el_step
az_values: list[float] = []
az = az_start
while az <= az_end + 1e-9:
az_values.append(round(az, 2))
az += az_step
total_points = len(el_values) * len(az_values)
if total_points == 0:
self.app.call_from_thread(
self._set_scan_status, "No grid points -- check parameters"
)
return
heatmap = self.query_one("#heatmap", SkyHeatmap)
spark = self.query_one("#scan-spark", SparklineWidget)
force_software = self.query_one("#scan-software-mode", Checkbox).value
use_firmware = not force_software and hasattr(device, "az_sweep_firmware")
done = 0
az_span_grid = az_end - az_start + 1e-9
el_span_grid = el_end - el_start + 1e-9
span_deg = az_end - az_start
step_cdeg = max(1, int(az_step * 100))
for _el_idx, el_val in enumerate(el_values):
if not self._scanning or worker.is_cancelled or shutdown.is_set():
self.app.call_from_thread(self._set_scan_status, "Scan stopped")
return
# Try firmware sweep for this EL row.
if use_firmware:
try:
device.move_motor(1, el_val)
shutdown.wait(0.3)
status = f"EL={el_val:.1f} Firmware sweep..."
self.app.call_from_thread(self._set_scan_status, status)
results = device.az_sweep_firmware(
start_az=az_start,
span=span_deg,
step_cdeg=step_cdeg,
num_xponders=iterations,
)
for pt in results:
az_val = pt["az"]
rssi = pt["rssi"]
self._scan_data.append((az_val, el_val, rssi))
grid_az = min(
int((az_val - az_start) / az_span_grid * heatmap.az_bins),
heatmap.az_bins - 1,
)
grid_el = min(
int((el_val - el_start) / el_span_grid * heatmap.el_bins),
heatmap.el_bins - 1,
)
self.app.call_from_thread(
heatmap.set_point, grid_az, grid_el, rssi
)
self.app.call_from_thread(spark.push, rssi)
done += len(az_values)
pct = min(int(done * 100 / total_points), 100)
status = (
f"EL={el_val:.1f} {len(results)} pts (firmware) "
f"[{done}/{total_points}]"
)
self.app.call_from_thread(self._set_scan_progress, pct, status)
continue
except Exception:
log.warning(
"Firmware sweep failed at EL=%.1f, falling back",
el_val,
exc_info=True,
)
use_firmware = False # Don't retry firmware for remaining rows.
# Software fallback: point-by-point for this EL row.
try:
device.move_motor(1, el_val)
except Exception:
log.exception("EL move failed at EL=%.2f", el_val)
for _az_idx, az_val in enumerate(az_values):
if not self._scanning or worker.is_cancelled or shutdown.is_set():
self.app.call_from_thread(self._set_scan_status, "Scan stopped")
return
try:
device.move_motor(0, az_val)
except Exception:
log.exception("AZ move failed at AZ=%.2f EL=%.2f", az_val, el_val)
continue
shutdown.wait(0.3)
try:
rssi_data = device.get_rssi(iterations)
rssi = float(rssi_data.get("average", 0))
except Exception:
log.exception("get_rssi failed at AZ=%.2f EL=%.2f", az_val, el_val)
rssi = 0.0
self._scan_data.append((az_val, el_val, rssi))
grid_az = min(
int((az_val - az_start) / az_span_grid * heatmap.az_bins),
heatmap.az_bins - 1,
)
grid_el = min(
int((el_val - el_start) / el_span_grid * heatmap.el_bins),
heatmap.el_bins - 1,
)
self.app.call_from_thread(heatmap.set_point, grid_az, grid_el, rssi)
self.app.call_from_thread(heatmap.set_active, grid_az, grid_el)
self.app.call_from_thread(spark.push, rssi)
done += 1
pct = int(done * 100 / total_points)
status_text = (
f"AZ={az_val:.1f} EL={el_val:.1f} "
f"RSSI={rssi:.0f} [{done}/{total_points}]"
)
self.app.call_from_thread(self._set_scan_progress, pct, status_text)
msg = f"Scan complete -- {total_points} points"
self.app.call_from_thread(self._set_scan_status, msg)
# -- Scan thread-safe callbacks --
def _set_scan_status(self, text: str) -> None:
self.query_one("#scan-status-text", Static).update(text)
def _set_scan_progress(self, pct: int, status_text: str) -> None:
self.query_one("#scan-progress", ProgressBar).update(progress=pct)
self.query_one("#scan-status-text", Static).update(status_text)
# -- Scan button handlers --
def _handle_scan_start(self) -> None:
if self._device is None:
self.app.notify("No device connected", severity="warning")
return
if self._scanning:
self.app.notify("Scan already in progress", severity="warning")
return
heatmap = self.query_one("#heatmap", SkyHeatmap)
heatmap.clear()
self._scan_data.clear()
self.query_one("#scan-progress", ProgressBar).update(progress=0)
self._set_scan_status("Starting scan...")
self._scanning = True
self._do_scan()
def _handle_scan_stop(self) -> None:
self._scanning = False
self._set_scan_status("Stopping...")
def _export_scan_csv(self) -> None:
if not self._scan_data:
self.app.notify("No scan data to export", severity="warning")
return
output = Path("/tmp/birdcage_scan.csv")
try:
with output.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(["az", "el", "rssi"])
for az, el, rssi in self._scan_data:
writer.writerow([f"{az:.2f}", f"{el:.2f}", f"{rssi:.1f}"])
self.app.notify(f"Exported {len(self._scan_data)} points to {output}")
except OSError as exc:
log.exception("Scan CSV export failed")
self.app.notify(f"Export failed: {exc}", severity="error")
# ==================================================================
# Unified button dispatcher
# ==================================================================
def on_button_pressed(self, event: Button.Pressed) -> None:
button_id = event.button.id or ""
# Monitor buttons
if button_id == "btn-start":
self._handle_monitor_start()
elif button_id == "btn-stop":
self._handle_monitor_stop()
elif button_id == "btn-lna":
self._handle_lna()
elif button_id == "btn-reset-peak":
self._handle_reset_peak()
# Sweep buttons
elif button_id == "btn-start-sweep":
self._handle_sweep_start()
elif button_id == "btn-stop-sweep":
self._handle_sweep_stop()
elif button_id == "btn-export-sweep":
self._export_sweep_csv()
# Sky Map buttons
elif button_id == "btn-start-scan":
self._handle_scan_start()
elif button_id == "btn-stop-scan":
self._handle_scan_stop()
elif button_id == "btn-export-scan":
self._export_scan_csv()