diff --git a/.gitignore b/.gitignore index 589e27d..05149df 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,10 @@ firmware/fx2lib/ firmware/build/ tools/__pycache__/ +# TUI +tui/.venv/ +tui/__pycache__/ + # Documentation site site/node_modules/ site/dist/ diff --git a/tui/pyproject.toml b/tui/pyproject.toml new file mode 100644 index 0000000..bd72fb8 --- /dev/null +++ b/tui/pyproject.toml @@ -0,0 +1,24 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "skywalker-tui" +version = "0.1.0" +description = "Textual TUI for Genpix SkyWalker-1 DVB-S receiver" +requires-python = ">=3.11" +authors = [{name = "Ryan Malloy", email = "ryan@supported.systems"}] +dependencies = [ + "textual>=3.0", + "pyusb>=1.3", +] + +[project.scripts] +skywalker-tui = "skywalker_tui.app:main" + +[tool.hatch.build.targets.wheel] +packages = ["src/skywalker_tui"] + +[tool.ruff] +target-version = "py311" +line-length = 100 diff --git a/tui/src/skywalker_tui/__init__.py b/tui/src/skywalker_tui/__init__.py new file mode 100644 index 0000000..83dd68a --- /dev/null +++ b/tui/src/skywalker_tui/__init__.py @@ -0,0 +1,3 @@ +"""Textual TUI for Genpix SkyWalker-1 DVB-S receiver.""" + +__version__ = "0.1.0" diff --git a/tui/src/skywalker_tui/app.py b/tui/src/skywalker_tui/app.py new file mode 100644 index 0000000..b5da6d1 --- /dev/null +++ b/tui/src/skywalker_tui/app.py @@ -0,0 +1,165 @@ +"""SkyWalker-1 TUI — main application. + +Provides mode switching between 5 RF operating modes via a sidebar and F-key +shortcuts. Each mode is a Screen subclass that manages its own workers. + +Note: We use "rf_mode" terminology for our 5 operating modes to avoid colliding +with Textual's built-in App.mode / _current_mode / _screen_stacks system. +""" + +import argparse +import sys +import os +from pathlib import Path + +# Add tools directory to path for skywalker_lib import +_tools_dir = str(Path(__file__).resolve().parent.parent.parent.parent / "tools") +if _tools_dir not in sys.path: + sys.path.insert(0, _tools_dir) + +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.containers import Horizontal, Vertical +from textual.widgets import Header, Footer, Button, Label, Static, ContentSwitcher + +from skywalker_tui.bridge import USBBridge +from skywalker_tui.demo import DemoDevice +from skywalker_tui.widgets.status_bar import DeviceStatusBar + +from skywalker_tui.screens.spectrum import SpectrumScreen +from skywalker_tui.screens.scan import ScanScreen +from skywalker_tui.screens.monitor import MonitorScreen +from skywalker_tui.screens.lband import LBandScreen +from skywalker_tui.screens.track import TrackScreen + + +MODES = { + "spectrum": ("F1 Spectrum", SpectrumScreen), + "scan": ("F2 Scan", ScanScreen), + "monitor": ("F3 Monitor", MonitorScreen), + "lband": ("F4 L-Band", LBandScreen), + "track": ("F5 Track", TrackScreen), +} + + +class SkyWalkerApp(App): + """Textual TUI for Genpix SkyWalker-1 DVB-S receiver.""" + + TITLE = "SkyWalker-1" + SUB_TITLE = "DVB-S RF Tool" + CSS_PATH = "theme.tcss" + + BINDINGS = [ + Binding("f1", "rf_mode('spectrum')", "Spectrum", show=True), + Binding("f2", "rf_mode('scan')", "Scan", show=True), + Binding("f3", "rf_mode('monitor')", "Monitor", show=True), + Binding("f4", "rf_mode('lband')", "L-Band", show=True), + Binding("f5", "rf_mode('track')", "Track", show=True), + Binding("q", "quit", "Quit", show=True), + Binding("d", "toggle_dark", "Theme", show=True), + ] + + def __init__(self, bridge: USBBridge, initial_mode: str = "spectrum"): + super().__init__() + self._bridge = bridge + self._initial_rf_mode = initial_mode + self._active_rf_mode = initial_mode + self._rf_screens: dict[str, object] = {} + + def compose(self) -> ComposeResult: + yield Header() + with Horizontal(): + with Vertical(id="sidebar"): + yield Label("[bold #00d4aa]SkyWalker-1[/]", classes="sidebar-heading") + yield Label("[#506878]DVB-S RF Tool[/]", classes="sidebar-heading") + yield Static("") + for mode_key, (label, _cls) in MODES.items(): + yield Button(label, id=f"btn-{mode_key}", classes="mode-button") + yield Static("") + yield DeviceStatusBar(self._bridge) + yield ContentSwitcher(id="content-area") + yield Footer() + + def on_mount(self) -> None: + # Initialize status bar + status = self.query_one(DeviceStatusBar) + status.update_status(self._bridge) + + # Install all mode screens into the content switcher + switcher = self.query_one("#content-area", ContentSwitcher) + for mode_key, (_label, cls) in MODES.items(): + screen = cls(self._bridge, id=f"screen-{mode_key}") + self._rf_screens[mode_key] = screen + switcher.mount(screen) + + # Activate initial mode + self.action_rf_mode(self._initial_rf_mode) + + def action_rf_mode(self, mode: str) -> None: + """Switch to a different RF operating mode.""" + if mode not in MODES: + return + + self._active_rf_mode = mode + switcher = self.query_one("#content-area", ContentSwitcher) + switcher.current = f"screen-{mode}" + + # Update sidebar button highlights + for mode_key in MODES: + btn = self.query_one(f"#btn-{mode_key}", Button) + btn.remove_class("-active") + self.query_one(f"#btn-{mode}", Button).add_class("-active") + + self.sub_title = f"DVB-S RF Tool — {MODES[mode][0]}" + + def on_button_pressed(self, event: Button.Pressed) -> None: + """Handle sidebar mode button clicks.""" + btn_id = event.button.id or "" + if btn_id.startswith("btn-"): + mode = btn_id[4:] + if mode in MODES: + self.action_rf_mode(mode) + + def action_toggle_dark(self) -> None: + self.dark = not self.dark + + +def main(): + parser = argparse.ArgumentParser( + prog="skywalker-tui", + description="Textual TUI for Genpix SkyWalker-1 DVB-S receiver", + ) + parser.add_argument( + "--demo", action="store_true", + help="Use synthetic signal data (no hardware required)", + ) + parser.add_argument( + "mode", nargs="?", default="spectrum", + choices=list(MODES.keys()), + help="Initial mode (default: spectrum)", + ) + parser.add_argument( + "-v", "--verbose", action="store_true", + help="Verbose USB logging (hardware mode only)", + ) + args = parser.parse_args() + + if args.demo: + device = DemoDevice() + bridge = USBBridge(device) + else: + try: + from skywalker_lib import SkyWalker1 + device = SkyWalker1(verbose=args.verbose) + device.open() + bridge = USBBridge(device) + except Exception as e: + print(f"Cannot open SkyWalker-1: {e}", file=sys.stderr) + print("Use --demo for synthetic signal data.", file=sys.stderr) + sys.exit(1) + + app = SkyWalkerApp(bridge=bridge, initial_mode=args.mode) + try: + app.run() + finally: + bridge.close() diff --git a/tui/src/skywalker_tui/bridge.py b/tui/src/skywalker_tui/bridge.py new file mode 100644 index 0000000..8a0f78d --- /dev/null +++ b/tui/src/skywalker_tui/bridge.py @@ -0,0 +1,94 @@ +"""Thread-safe bridge between Textual async event loop and blocking pyusb calls. + +All SkyWalker1 (or DemoDevice) methods are blocking I/O — they perform USB control +transfers that can take 2-200ms each. Textual's event loop is asyncio-based, so +calling these directly would freeze the UI. + +The USBBridge wraps every device method behind a threading.Lock to prevent concurrent +USB access (the BCM4500 can't handle overlapping control transfers) and exposes them +as plain synchronous methods meant to be called from Textual @work(thread=True) workers. +""" + +import threading + + +class USBBridge: + """Thread-safe wrapper around SkyWalker1 or DemoDevice.""" + + def __init__(self, device): + self._dev = device + self._lock = threading.Lock() + + @property + def is_demo(self) -> bool: + return hasattr(self._dev, "_demo") + + def open(self): + with self._lock: + if hasattr(self._dev, "open"): + self._dev.open() + + def close(self): + with self._lock: + if hasattr(self._dev, "close"): + self._dev.close() + + def get_fw_version(self) -> dict: + with self._lock: + return self._dev.get_fw_version() + + def get_config(self) -> int: + with self._lock: + return self._dev.get_config() + + def ensure_booted(self): + with self._lock: + self._dev.ensure_booted() + + def signal_monitor(self) -> dict: + with self._lock: + return self._dev.signal_monitor() + + def sweep_spectrum(self, start_mhz: float, stop_mhz: float, + step_mhz: float = 5.0, dwell_ms: int = 10, + sr_ksps: int = 20000, mod_index: int = 0, + fec_index: int = 5, callback=None) -> tuple: + with self._lock: + return self._dev.sweep_spectrum( + start_mhz, stop_mhz, step_mhz, dwell_ms, + sr_ksps, mod_index, fec_index, callback, + ) + + def tune_monitor(self, symbol_rate_sps: int, freq_khz: int, + mod_index: int, fec_index: int, + dwell_ms: int = 10) -> dict: + with self._lock: + return self._dev.tune_monitor( + symbol_rate_sps, freq_khz, mod_index, fec_index, dwell_ms, + ) + + def tune(self, symbol_rate_sps: int, freq_khz: int, + mod_index: int, fec_index: int): + with self._lock: + self._dev.tune(symbol_rate_sps, freq_khz, mod_index, fec_index) + + def set_lnb_voltage(self, high: bool): + with self._lock: + self._dev.set_lnb_voltage(high) + + def set_22khz_tone(self, on: bool): + with self._lock: + self._dev.set_22khz_tone(on) + + def configure_lnb(self, pol=None, band=None, lnb_lo=None, + disable_lnb=False) -> float: + with self._lock: + return self._dev.configure_lnb(pol, band, lnb_lo, disable_lnb) + + def blind_scan(self, freq_khz: int, sr_min: int, sr_max: int, + sr_step: int) -> dict | None: + """Run blind scan at a single frequency. Returns result dict or None.""" + with self._lock: + if hasattr(self._dev, "blind_scan"): + return self._dev.blind_scan(freq_khz, sr_min, sr_max, sr_step) + return None diff --git a/tui/src/skywalker_tui/demo.py b/tui/src/skywalker_tui/demo.py new file mode 100644 index 0000000..cd71a1a --- /dev/null +++ b/tui/src/skywalker_tui/demo.py @@ -0,0 +1,243 @@ +"""Synthetic signal generator for --demo mode. + +DemoDevice mimics the SkyWalker1 API without any USB hardware. It generates +realistic-looking signal data with: + - Gaussian peaks at known satellite transponder positions + - Thermal noise floor with Gaussian jitter + - AGC values inversely correlated to signal strength + - Occasional lock/unlock transitions based on SNR threshold + - Slow drift in SNR to simulate atmospheric effects + +This enables full TUI development and testing without hardware. +""" + +import math +import random +import time + + +# Simulated transponder positions (IF MHz) and relative strengths +_TRANSPONDERS = [ + (1050, -12.0, 27500), # strong Ku-band TP + (1220, -18.0, 22000), # moderate TP + (1480, -25.0, 30000), # weak TP + (1750, -15.0, 20000), # moderate TP + (1950, -22.0, 13000), # weaker TP +] + +_NOISE_FLOOR = -35.0 +_LOCK_THRESHOLD_DB = 3.5 + + +class DemoDevice: + """Drop-in replacement for SkyWalker1 that generates synthetic data.""" + + _demo = True # marker for bridge.is_demo + + def __init__(self): + self._booted = False + self._lnb_on = False + self._lnb_voltage_high = False + self._tone_22khz = False + self._tuned_freq_khz = 0 + self._tuned_sr_sps = 0 + self._start_time = time.monotonic() + self._sample_count = 0 + + def open(self): + pass + + def close(self): + pass + + def __enter__(self): + return self + + def __exit__(self, *exc): + pass + + def get_fw_version(self) -> dict: + return { + "major": 3, + "minor": 2, + "patch": 0, + "version": "3.02.0", + "date": "2025-02-10", + } + + def get_config(self) -> int: + bits = 0 + if self._booted: + bits |= 0x01 # 8PSK started + bits |= 0x02 # FW loaded + if self._lnb_on: + bits |= 0x04 # LNB power + if self._lnb_voltage_high: + bits |= 0x20 # 18V + if self._tone_22khz: + bits |= 0x10 # 22 kHz + if self._tuned_freq_khz > 0: + bits |= 0x40 # tuned + return bits + + def ensure_booted(self): + self._booted = True + self._lnb_on = True + + def set_lnb_voltage(self, high: bool): + self._lnb_voltage_high = high + + def set_22khz_tone(self, on: bool): + self._tone_22khz = on + + def configure_lnb(self, pol=None, band=None, lnb_lo=None, + disable_lnb=False) -> float: + if disable_lnb: + self._lnb_on = False + return 0.0 + if pol: + self._lnb_voltage_high = pol.upper() in ("H", "L") + if band: + self._tone_22khz = band == "high" + if lnb_lo is not None: + return lnb_lo + elif band == "high": + return 10600.0 + else: + return 9750.0 + + def start_intersil(self, on: bool = True): + self._lnb_on = on + + def tune(self, symbol_rate_sps: int, freq_khz: int, + mod_index: int, fec_index: int): + self._tuned_freq_khz = freq_khz + self._tuned_sr_sps = symbol_rate_sps + + def signal_monitor(self) -> dict: + """Generate synthetic signal data at the currently tuned frequency.""" + self._sample_count += 1 + elapsed = time.monotonic() - self._start_time + + freq_mhz = self._tuned_freq_khz / 1000.0 if self._tuned_freq_khz else 1200.0 + power_db = self._power_at(freq_mhz, elapsed) + snr_db = max(0.0, power_db - _NOISE_FLOOR + random.gauss(0, 0.3)) + locked = snr_db > _LOCK_THRESHOLD_DB + + snr_raw = int(snr_db * 256) + agc1, agc2 = self._power_to_agc(power_db) + + return { + "snr_raw": snr_raw, + "snr_db": snr_db, + "snr_pct": min(100.0, snr_raw * 17 / 65535 * 100), + "agc1": agc1, + "agc2": agc2, + "power_db": power_db, + "lock": 0x20 if locked else 0x00, + "locked": locked, + "status": 0x01 if self._booted else 0x00, + } + + def tune_monitor(self, symbol_rate_sps: int, freq_khz: int, + mod_index: int, fec_index: int, + dwell_ms: int = 10) -> dict: + """Simulate a tune + measure at a single frequency.""" + # Simulate dwell time (scaled down for responsiveness) + time.sleep(min(dwell_ms, 5) / 1000.0) + + freq_mhz = freq_khz / 1000.0 + elapsed = time.monotonic() - self._start_time + power_db = self._power_at(freq_mhz, elapsed) + snr_db = max(0.0, power_db - _NOISE_FLOOR + random.gauss(0, 0.2)) + locked = snr_db > _LOCK_THRESHOLD_DB + + snr_raw = int(snr_db * 256) + agc1, agc2 = self._power_to_agc(power_db) + + return { + "snr_raw": snr_raw, + "snr_db": snr_db, + "agc1": agc1, + "agc2": agc2, + "power_db": power_db, + "lock": 0x20 if locked else 0x00, + "locked": locked, + "status": 0x01, + "dwell_ms": dwell_ms, + } + + def sweep_spectrum(self, start_mhz: float, stop_mhz: float, + step_mhz: float = 5.0, dwell_ms: int = 10, + sr_ksps: int = 20000, mod_index: int = 0, + fec_index: int = 5, callback=None) -> tuple: + """Sweep with synthetic data. Same return signature as SkyWalker1.""" + sr_sps = sr_ksps * 1000 + freqs = [] + powers = [] + results = [] + + freq = start_mhz + steps = max(1, int((stop_mhz - start_mhz) / step_mhz) + 1) + step_num = 0 + + while freq <= stop_mhz: + freq_khz = int(freq * 1000) + result = self.tune_monitor(sr_sps, freq_khz, mod_index, fec_index, dwell_ms) + freqs.append(freq) + powers.append(result["power_db"]) + results.append(result) + + if callback: + callback(freq, step_num, steps, result) + + step_num += 1 + freq += step_mhz + + return freqs, powers, results + + def blind_scan(self, freq_khz: int, sr_min: int, sr_max: int, + sr_step: int) -> dict | None: + """Simulate blind scan — lock onto nearby transponders.""" + freq_mhz = freq_khz / 1000.0 + for tp_freq, tp_power, tp_sr in _TRANSPONDERS: + if abs(freq_mhz - tp_freq) < 15: + sr_sps = tp_sr * 1000 + if sr_min <= sr_sps <= sr_max: + return { + "freq_khz": int(tp_freq * 1000), + "sr_sps": sr_sps, + "locked": True, + } + return None + + # --- Internal signal model --- + + def _power_at(self, freq_mhz: float, elapsed: float) -> float: + """Calculate synthetic power at a given frequency and time.""" + # Start with noise floor + jitter + power = _NOISE_FLOOR + random.gauss(0, 0.5) + + # Add Gaussian peaks for each simulated transponder + for tp_freq, tp_peak, _sr in _TRANSPONDERS: + # Bandwidth ~15 MHz sigma + dist = (freq_mhz - tp_freq) + gauss = math.exp(-(dist ** 2) / (2 * 12.0 ** 2)) + # Slow atmospheric drift: +-2 dB over 30s period + drift = 2.0 * math.sin(elapsed / 30.0 * 2 * math.pi + tp_freq / 100.0) + power += (tp_peak - _NOISE_FLOOR + drift) * gauss + + return power + + @staticmethod + def _power_to_agc(power_db: float) -> tuple[int, int]: + """Convert power_db back to simulated AGC register values.""" + # Invert the agc_to_power_db formula: power = -40 * (combined / 65535) + # combined = power / -40 * 65535 + if power_db >= 0: + combined = 0 + else: + combined = int(min(65535, abs(power_db) / 40.0 * 65535)) + agc1 = min(65535, combined) + agc2 = random.randint(0, 255) << 4 # fine adjustment noise + return agc1, agc2 diff --git a/tui/src/skywalker_tui/screens/__init__.py b/tui/src/skywalker_tui/screens/__init__.py new file mode 100644 index 0000000..4934449 --- /dev/null +++ b/tui/src/skywalker_tui/screens/__init__.py @@ -0,0 +1 @@ +"""Mode screens for SkyWalker-1 TUI.""" diff --git a/tui/src/skywalker_tui/screens/lband.py b/tui/src/skywalker_tui/screens/lband.py new file mode 100644 index 0000000..e1a26c8 --- /dev/null +++ b/tui/src/skywalker_tui/screens/lband.py @@ -0,0 +1,221 @@ +"""L-Band screen — direct input spectrum analyzer with allocation annotations. + +Same sweep mechanics as the spectrum screen, but with LNB disabled (direct input) +and band allocation overlays showing what service each frequency range belongs to. +""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "tools")) + +from textual.app import ComposeResult +from textual.containers import Horizontal, Vertical +from textual.screen import Screen +from textual.widgets import Label, Input, Button, Static, ProgressBar, Checkbox +from textual import work +from textual.worker import Worker + +from skywalker_lib import LBAND_ALLOCATIONS + +from skywalker_tui.widgets.spectrum_plot import SpectrumPlot +from skywalker_tui.widgets.waterfall import WaterfallDisplay + + +def _alloc_table(start: float, stop: float) -> str: + """Build a Rich-markup allocation reference for the visible range.""" + lines = ["[#00d4aa bold]L-Band Allocations in range:[/]"] + colors = ["#60a0c0", "#80b060", "#c0a050", "#a06080", "#50a0a0", "#a08060", "#6080a0"] + for i, (lo, hi, name) in enumerate(LBAND_ALLOCATIONS): + if lo < stop and hi > start: + overlap_lo = max(lo, start) + overlap_hi = min(hi, stop) + c = colors[i % len(colors)] + lines.append(f" [{c}]{overlap_lo:.0f}-{overlap_hi:.0f} MHz {name}[/]") + if len(lines) == 1: + lines.append(" [#506878](none in range)[/]") + return "\n".join(lines) + + +class LBandScreen(Screen): + """L-band direct input analyzer with allocation annotations.""" + + DEFAULT_CSS = """ + LBandScreen { + layout: vertical; + } + LBandScreen #lband-main { + height: 1fr; + layout: horizontal; + } + LBandScreen #lband-plot-col { + width: 2fr; + layout: vertical; + } + LBandScreen #lband-info-col { + width: 1fr; + padding: 1; + background: #0e1420; + border-left: solid #1a2a3a; + layout: vertical; + } + LBandScreen #lband-alloc-panel { + height: auto; + padding: 1; + } + LBandScreen #lband-progress { + height: 3; + layout: horizontal; + padding: 0 2; + background: #0e1018; + } + LBandScreen #lband-progress Static { + width: auto; + margin: 1 1 0 0; + } + LBandScreen #lband-progress ProgressBar { + width: 1fr; + margin: 1 1 0 0; + } + LBandScreen #lband-controls { + height: auto; + padding: 1 2; + background: #0e1018; + border-top: solid #1a2a3a; + layout: horizontal; + } + LBandScreen #lband-controls Label { + width: auto; + margin: 1 1 0 0; + color: #506878; + } + LBandScreen #lband-controls Input { + width: 10; + margin: 0 1; + } + LBandScreen #lband-controls Button { + margin: 0 1; + } + """ + + def __init__(self, bridge, **kwargs): + super().__init__(**kwargs) + self._bridge = bridge + self._sweeping = False + self._sweep_worker: Worker | None = None + + def compose(self) -> ComposeResult: + with Horizontal(id="lband-main"): + with Vertical(id="lband-plot-col"): + yield SpectrumPlot(title="L-Band Spectrum (Direct Input)", + id="lband-plot") + yield WaterfallDisplay(title="Waterfall", id="lband-waterfall") + with Vertical(id="lband-info-col"): + yield Static(_alloc_table(950, 2150), id="lband-alloc-panel") + + with Horizontal(id="lband-progress"): + yield Static("[#506878]Ready[/]", id="lband-status") + yield ProgressBar(total=100, show_eta=False, id="lband-pbar") + + with Horizontal(id="lband-controls"): + yield Label("Start:") + yield Input("950", id="lband-start") + yield Label("Stop:") + yield Input("2150", id="lband-stop") + yield Label("Step:") + yield Input("2", id="lband-step") + yield Label("Dwell:") + yield Input("20", id="lband-dwell") + yield Button("23cm", id="lband-23cm-btn") + yield Button("Sweep", id="lband-sweep-btn", variant="success") + yield Button("Stop", id="lband-stop-btn", variant="error") + + def on_mount(self) -> None: + if self._bridge.is_demo: + self._start_sweep() + + def on_unmount(self) -> None: + self._stop_sweep() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "lband-sweep-btn": + self._start_sweep() + elif event.button.id == "lband-stop-btn": + self._stop_sweep() + elif event.button.id == "lband-23cm-btn": + self.query_one("#lband-start", Input).value = "1240" + self.query_one("#lband-stop", Input).value = "1300" + self.query_one("#lband-step", Input).value = "0.5" + # Update allocation display + self.query_one("#lband-alloc-panel", Static).update( + _alloc_table(1240, 1300) + ) + + def _start_sweep(self) -> None: + if self._sweeping: + return + self._sweeping = True + + start = float(self.query_one("#lband-start", Input).value or "950") + stop = float(self.query_one("#lband-stop", Input).value or "2150") + step = float(self.query_one("#lband-step", Input).value or "2") + dwell = int(self.query_one("#lband-dwell", Input).value or "20") + + # Update allocation panel for current range + self.query_one("#lband-alloc-panel", Static).update( + _alloc_table(start, stop) + ) + + self._sweep_worker = self._do_sweep(start, stop, step, dwell) + + def _stop_sweep(self) -> None: + self._sweeping = False + if self._sweep_worker: + self._sweep_worker.cancel() + self._sweep_worker = None + + @work(thread=True) + def _do_sweep(self, start: float, stop: float, step: float, dwell: int) -> None: + """L-band sweep with LNB disabled.""" + try: + self._bridge.ensure_booted() + # Disable LNB for direct input + self._bridge.configure_lnb(disable_lnb=True) + except Exception: + pass + + def progress_cb(freq, step_num, total, result): + pct = (step_num + 1) / total * 100 + self.app.call_from_thread(self._update_progress, pct, freq) + + self.app.call_from_thread(self._set_status, "Sweeping...") + + freqs, powers, results = self._bridge.sweep_spectrum( + start, stop, step, dwell, sr_ksps=20000, callback=progress_cb, + ) + + self.app.call_from_thread(self._show_results, freqs, powers, results) + self._sweeping = False + + def _update_progress(self, pct: float, freq: float) -> None: + if not self.is_mounted: + return + self.query_one("#lband-pbar", ProgressBar).update(progress=pct) + self.query_one("#lband-status", Static).update( + f"[#00d4aa]{freq:.1f} MHz[/]" + ) + + def _set_status(self, msg: str) -> None: + if not self.is_mounted: + return + self.query_one("#lband-status", Static).update(f"[#506878]{msg}[/]") + + def _show_results(self, freqs, powers, results) -> None: + if not self.is_mounted: + return + self.query_one("#lband-plot", SpectrumPlot).update_data( + freqs, powers, results, lnb_lo=0.0, + ) + self.query_one("#lband-waterfall", WaterfallDisplay).add_sweep(powers) + self.query_one("#lband-status", Static).update("[#506878]Complete[/]") + self.query_one("#lband-pbar", ProgressBar).update(progress=100) diff --git a/tui/src/skywalker_tui/screens/monitor.py b/tui/src/skywalker_tui/screens/monitor.py new file mode 100644 index 0000000..a08e193 --- /dev/null +++ b/tui/src/skywalker_tui/screens/monitor.py @@ -0,0 +1,206 @@ +"""Monitor screen — real-time signal strength at a single frequency. + +This is the dish-alignment / signal-monitoring mode. It polls signal_monitor() +at a configurable rate and displays SNR, power, lock state, and a rolling +sparkline history. +""" + +from textual.app import ComposeResult +from textual.containers import Horizontal, Vertical +from textual.screen import Screen +from textual.widgets import Label, Input, Button, Static +from textual import work +from textual.worker import Worker, WorkerState + +from skywalker_tui.widgets.signal_gauge import SignalGauge +from skywalker_tui.widgets.sparkline_widget import SparklineWidget + + +class MonitorScreen(Screen): + """Real-time signal monitor with gauge and sparkline.""" + + DEFAULT_CSS = """ + MonitorScreen { + layout: vertical; + } + MonitorScreen #monitor-main { + height: 1fr; + layout: vertical; + padding: 1 2; + } + MonitorScreen #monitor-controls { + height: auto; + padding: 1 2; + background: #0e1018; + border-top: solid #1a2a3a; + layout: horizontal; + } + MonitorScreen #monitor-controls Label { + width: auto; + margin: 1 1 0 0; + color: #506878; + } + MonitorScreen #monitor-controls Input { + width: 14; + margin: 0 1; + } + MonitorScreen #monitor-controls Button { + margin: 0 1; + } + MonitorScreen #monitor-stats { + height: 3; + layout: horizontal; + padding: 0 2; + } + MonitorScreen #monitor-stats Static { + width: 1fr; + height: 3; + content-align: center middle; + background: #121c2a; + border: round #1a3050; + margin: 0 1 0 0; + } + """ + + BINDINGS = [ + ("space", "toggle_poll", "Start/Stop"), + ] + + def __init__(self, bridge, **kwargs): + super().__init__(**kwargs) + self._bridge = bridge + self._polling = False + self._poll_worker: Worker | None = None + self._sample_count = 0 + self._peak_snr = 0.0 + + def compose(self) -> ComposeResult: + with Vertical(id="monitor-main"): + yield SignalGauge(id="monitor-gauge") + yield SparklineWidget(title="SNR History", color="#00d4aa", + id="snr-sparkline") + yield SparklineWidget(title="Power History", color="#2196f3", + id="power-sparkline") + with Horizontal(id="monitor-stats"): + yield Static("[#506878]Samples:[/] [#00d4aa]0[/]", id="stat-samples") + yield Static("[#506878]Peak SNR:[/] [#00d4aa]0.0 dB[/]", id="stat-peak") + yield Static("[#506878]Status:[/] [#e8a020]Stopped[/]", id="stat-status") + + with Horizontal(id="monitor-controls"): + yield Label("Freq (MHz):") + yield Input("1200", id="mon-freq") + yield Label("SR (ksps):") + yield Input("20000", id="mon-sr") + yield Label("Rate (Hz):") + yield Input("5", id="mon-rate") + yield Button("Start", id="mon-start", variant="success") + yield Button("Stop", id="mon-stop", variant="error") + + def on_mount(self) -> None: + # Auto-start polling in demo mode + if self._bridge.is_demo: + self._start_polling() + + def on_unmount(self) -> None: + self._stop_polling() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "mon-start": + self._start_polling() + elif event.button.id == "mon-stop": + self._stop_polling() + + def action_toggle_poll(self) -> None: + if self._polling: + self._stop_polling() + else: + self._start_polling() + + def _start_polling(self) -> None: + if self._polling: + return + self._polling = True + self._sample_count = 0 + self._peak_snr = 0.0 + + freq_mhz = float(self.query_one("#mon-freq", Input).value or "1200") + sr_ksps = int(self.query_one("#mon-sr", Input).value or "20000") + rate = float(self.query_one("#mon-rate", Input).value or "5") + + self.query_one("#stat-status", Static).update( + "[#506878]Status:[/] [bold #00d4aa]Running[/]" + ) + + self._poll_worker = self._do_poll(freq_mhz, sr_ksps, rate) + + def _stop_polling(self) -> None: + self._polling = False + if self._poll_worker is not None: + self._poll_worker.cancel() + self._poll_worker = None + try: + self.query_one("#stat-status", Static).update( + "[#506878]Status:[/] [#e8a020]Stopped[/]" + ) + except Exception: + pass + + @staticmethod + def _parse_input(input_widget: Input, default: float) -> float: + try: + return float(input_widget.value) + except (ValueError, TypeError): + return default + + @work(thread=True) + def _do_poll(self, freq_mhz: float, sr_ksps: int, rate: float) -> None: + """Background worker that polls signal_monitor() in a thread.""" + import time + + interval = 1.0 / max(0.5, rate) + freq_khz = int(freq_mhz * 1000) + sr_sps = sr_ksps * 1000 + + # Initial tune + try: + self._bridge.ensure_booted() + self._bridge.tune(sr_sps, freq_khz, 0, 5) + time.sleep(0.3) + except Exception: + pass + + while self._polling: + t0 = time.monotonic() + try: + sig = self._bridge.signal_monitor() + except Exception: + time.sleep(interval) + continue + + self._sample_count += 1 + snr_db = sig.get("snr_db", 0.0) + self._peak_snr = max(self._peak_snr, snr_db) + + # Post updates to the UI thread + self.app.call_from_thread(self._update_ui, sig) + + elapsed = time.monotonic() - t0 + sleep = interval - elapsed + if sleep > 0: + time.sleep(sleep) + + def _update_ui(self, sig: dict) -> None: + """Called from the main thread to update widgets.""" + if not self.is_mounted: + return + + self.query_one("#monitor-gauge", SignalGauge).update_signal(sig) + self.query_one("#snr-sparkline", SparklineWidget).push(sig.get("snr_db", 0)) + self.query_one("#power-sparkline", SparklineWidget).push(sig.get("power_db", -40)) + + self.query_one("#stat-samples", Static).update( + f"[#506878]Samples:[/] [#00d4aa]{self._sample_count}[/]" + ) + self.query_one("#stat-peak", Static).update( + f"[#506878]Peak SNR:[/] [#00d4aa]{self._peak_snr:.1f} dB[/]" + ) diff --git a/tui/src/skywalker_tui/screens/scan.py b/tui/src/skywalker_tui/screens/scan.py new file mode 100644 index 0000000..c064934 --- /dev/null +++ b/tui/src/skywalker_tui/screens/scan.py @@ -0,0 +1,262 @@ +"""Scan screen — automated transponder discovery. + +Multi-phase pipeline: coarse sweep → peak detection → fine sweep → blind scan. +Shows progress, spectrum visualization, and a results table. +""" + +import struct +import time + +from textual.app import ComposeResult +from textual.containers import Horizontal, Vertical +from textual.screen import Screen +from textual.widgets import Label, Input, Button, Static, ProgressBar +from textual import work +from textual.worker import Worker + +from skywalker_tui.widgets.spectrum_plot import SpectrumPlot +from skywalker_tui.widgets.frequency_table import FrequencyTable + + +class ScanScreen(Screen): + """Multi-phase transponder scanner with progress and results table.""" + + DEFAULT_CSS = """ + ScanScreen { + layout: vertical; + } + ScanScreen #scan-main { + height: 1fr; + layout: vertical; + } + ScanScreen #scan-upper { + height: 1fr; + layout: horizontal; + } + ScanScreen #scan-spectrum-col { + width: 1fr; + } + ScanScreen #scan-results-col { + width: 1fr; + } + ScanScreen #scan-progress { + height: auto; + padding: 1 2; + background: #0e1018; + layout: vertical; + } + ScanScreen #scan-progress-row { + height: 3; + layout: horizontal; + } + ScanScreen #scan-progress Static { + width: auto; + margin: 0 1 0 0; + } + ScanScreen #scan-progress ProgressBar { + width: 1fr; + margin: 0 1; + } + ScanScreen #scan-controls { + height: auto; + padding: 1 2; + background: #0e1018; + border-top: solid #1a2a3a; + layout: horizontal; + } + ScanScreen #scan-controls Label { + width: auto; + margin: 1 1 0 0; + color: #506878; + } + ScanScreen #scan-controls Input { + width: 10; + margin: 0 1; + } + ScanScreen #scan-controls Button { + margin: 0 1; + } + """ + + def __init__(self, bridge, **kwargs): + super().__init__(**kwargs) + self._bridge = bridge + self._scanning = False + self._scan_worker: Worker | None = None + + def compose(self) -> ComposeResult: + with Vertical(id="scan-main"): + with Horizontal(id="scan-upper"): + with Vertical(id="scan-spectrum-col"): + yield SpectrumPlot(title="Coarse Sweep", id="scan-spectrum") + with Vertical(id="scan-results-col"): + yield Static("[#00d4aa bold]Transponders Found[/]", + id="scan-results-title") + yield FrequencyTable(id="scan-table") + + with Vertical(id="scan-progress"): + yield Static("[#506878]Ready[/]", id="scan-phase") + with Horizontal(id="scan-progress-row"): + yield ProgressBar(total=100, show_eta=False, id="scan-pbar") + + with Horizontal(id="scan-controls"): + yield Label("Start:") + yield Input("950", id="scan-start") + yield Label("Stop:") + yield Input("2150", id="scan-stop") + yield Label("LNB LO:") + yield Input("9750", id="scan-lnb") + yield Label("Threshold:") + yield Input("3", id="scan-thresh") + yield Button("Scan", id="scan-start-btn", variant="success") + yield Button("Stop", id="scan-stop-btn", variant="error") + + def on_unmount(self) -> None: + self._stop_scan() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "scan-start-btn": + self._start_scan() + elif event.button.id == "scan-stop-btn": + self._stop_scan() + + def _start_scan(self) -> None: + if self._scanning: + return + self._scanning = True + + start = float(self.query_one("#scan-start", Input).value or "950") + stop = float(self.query_one("#scan-stop", Input).value or "2150") + lnb_lo = float(self.query_one("#scan-lnb", Input).value or "9750") + threshold = float(self.query_one("#scan-thresh", Input).value or "3") + + # Clear previous results + self.query_one("#scan-table", FrequencyTable).clear_table() + + self._scan_worker = self._do_scan(start, stop, lnb_lo, threshold) + + def _stop_scan(self) -> None: + self._scanning = False + if self._scan_worker: + self._scan_worker.cancel() + self._scan_worker = None + + @work(thread=True) + def _do_scan(self, start: float, stop: float, lnb_lo: float, + threshold: float) -> None: + """Multi-phase scan pipeline in a background thread.""" + import sys + import os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "tools")) + from skywalker_lib import detect_peaks, if_to_rf + + try: + self._bridge.ensure_booted() + except Exception: + pass + + # Phase 1: Coarse sweep + self.app.call_from_thread(self._set_phase, "Phase 1: Coarse sweep", 0) + coarse_step = 10 + total_steps = max(1, int((stop - start) / coarse_step) + 1) + + def coarse_cb(freq, step_num, total, result): + pct = (step_num + 1) / total * 100 + self.app.call_from_thread(self._set_progress, pct) + + freqs, powers, results = self._bridge.sweep_spectrum( + start, stop, coarse_step, dwell_ms=15, sr_ksps=20000, + callback=coarse_cb, + ) + + if not self._scanning: + return + + self.app.call_from_thread(self._update_spectrum, freqs, powers, results, lnb_lo) + + # Phase 2: Peak detection + self.app.call_from_thread(self._set_phase, "Phase 2: Peak detection", 50) + peaks = detect_peaks(freqs, powers, threshold_db=threshold) + + if not peaks: + self.app.call_from_thread(self._set_phase, "No peaks found", 100) + self._scanning = False + return + + # Phase 3: Fine sweep around peaks + self.app.call_from_thread( + self._set_phase, + f"Phase 3: Fine sweep ({len(peaks)} peaks)", 60, + ) + refined = [] + for i, (freq, pwr, idx) in enumerate(peaks): + if not self._scanning: + return + fine_start = max(start, freq - 15) + fine_stop = min(stop, freq + 15) + fine_freqs, fine_powers, fine_results = self._bridge.sweep_spectrum( + fine_start, fine_stop, step_mhz=2.0, dwell_ms=20, sr_ksps=20000, + ) + if fine_powers: + best_idx = fine_powers.index(max(fine_powers)) + refined.append(( + fine_freqs[best_idx], fine_powers[best_idx], + fine_results[best_idx], + )) + pct = 60 + (i + 1) / len(peaks) * 20 + self.app.call_from_thread(self._set_progress, pct) + + # Phase 4: Blind scan + self.app.call_from_thread( + self._set_phase, + f"Phase 4: Blind scan ({len(refined)} candidates)", 80, + ) + sr_min = 1000 * 1000 + sr_max = 30000 * 1000 + sr_step = 500 * 1000 + + for i, (freq, pwr, result) in enumerate(refined): + if not self._scanning: + return + freq_khz = int(freq * 1000) + bs_result = self._bridge.blind_scan(freq_khz, sr_min, sr_max, sr_step) + if bs_result and bs_result.get("locked"): + tp = { + "if_mhz": bs_result.get("freq_khz", freq_khz) / 1000.0, + "rf_mhz": if_to_rf( + bs_result.get("freq_khz", freq_khz) / 1000.0, lnb_lo + ), + "sr_ksps": bs_result.get("sr_sps", 0) // 1000, + "power_db": pwr, + "locked": True, + } + self.app.call_from_thread(self._add_transponder, tp) + + pct = 80 + (i + 1) / len(refined) * 20 + self.app.call_from_thread(self._set_progress, pct) + + self.app.call_from_thread(self._set_phase, "Scan complete", 100) + self._scanning = False + + def _set_phase(self, text: str, progress: float) -> None: + if not self.is_mounted: + return + self.query_one("#scan-phase", Static).update(f"[#00d4aa]{text}[/]") + self.query_one("#scan-pbar", ProgressBar).update(progress=progress) + + def _set_progress(self, pct: float) -> None: + if not self.is_mounted: + return + self.query_one("#scan-pbar", ProgressBar).update(progress=pct) + + def _update_spectrum(self, freqs, powers, results, lnb_lo) -> None: + if not self.is_mounted: + return + self.query_one("#scan-spectrum", SpectrumPlot).update_data( + freqs, powers, results, lnb_lo=lnb_lo, + ) + + def _add_transponder(self, tp: dict) -> None: + if not self.is_mounted: + return + self.query_one("#scan-table", FrequencyTable).add_transponder(tp) diff --git a/tui/src/skywalker_tui/screens/spectrum.py b/tui/src/skywalker_tui/screens/spectrum.py new file mode 100644 index 0000000..c19e9a7 --- /dev/null +++ b/tui/src/skywalker_tui/screens/spectrum.py @@ -0,0 +1,193 @@ +"""Spectrum screen — sweep analyzer across the IF range. + +Displays a bar chart of power vs. frequency, optionally with a rolling +waterfall beneath it. Uses threaded workers for the blocking USB sweep. +""" + +from textual.app import ComposeResult +from textual.containers import Horizontal, Vertical +from textual.screen import Screen +from textual.widgets import Label, Input, Button, Static, ProgressBar, Checkbox +from textual import work +from textual.worker import Worker + +from skywalker_tui.widgets.spectrum_plot import SpectrumPlot +from skywalker_tui.widgets.waterfall import WaterfallDisplay + + +class SpectrumScreen(Screen): + """Spectrum analyzer with bar chart and optional waterfall.""" + + DEFAULT_CSS = """ + SpectrumScreen { + layout: vertical; + } + SpectrumScreen #spec-main { + height: 1fr; + layout: vertical; + } + SpectrumScreen #spec-progress-row { + height: 3; + layout: horizontal; + padding: 0 2; + background: #0e1018; + } + SpectrumScreen #spec-progress-row Static { + width: auto; + margin: 1 1 0 0; + } + SpectrumScreen #spec-progress-row ProgressBar { + width: 1fr; + margin: 1 1 0 0; + } + SpectrumScreen #spec-controls { + height: auto; + padding: 1 2; + background: #0e1018; + border-top: solid #1a2a3a; + layout: horizontal; + } + SpectrumScreen #spec-controls Label { + width: auto; + margin: 1 1 0 0; + color: #506878; + } + SpectrumScreen #spec-controls Input { + width: 10; + margin: 0 1; + } + SpectrumScreen #spec-controls Button { + margin: 0 1; + } + SpectrumScreen #spec-controls Checkbox { + margin: 1 1 0 0; + } + """ + + def __init__(self, bridge, **kwargs): + super().__init__(**kwargs) + self._bridge = bridge + self._sweeping = False + self._sweep_worker: Worker | None = None + + def compose(self) -> ComposeResult: + with Vertical(id="spec-main"): + yield SpectrumPlot(title="Spectrum Analyzer", id="spec-plot") + yield WaterfallDisplay(title="Waterfall", id="spec-waterfall") + + with Horizontal(id="spec-progress-row"): + yield Static("[#506878]Ready[/]", id="spec-status") + yield ProgressBar(total=100, show_eta=False, id="spec-pbar") + + with Horizontal(id="spec-controls"): + yield Label("Start:") + yield Input("950", id="spec-start") + yield Label("Stop:") + yield Input("2150", id="spec-stop") + yield Label("Step:") + yield Input("5", id="spec-step") + yield Label("Dwell:") + yield Input("10", id="spec-dwell") + yield Label("LNB LO:") + yield Input("0", id="spec-lnb") + yield Checkbox("Continuous", id="spec-continuous") + yield Button("Sweep", id="spec-sweep-btn", variant="success") + yield Button("Stop", id="spec-stop-btn", variant="error") + + def on_mount(self) -> None: + if self._bridge.is_demo: + self._start_sweep() + + def on_unmount(self) -> None: + self._stop_sweep() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "spec-sweep-btn": + self._start_sweep() + elif event.button.id == "spec-stop-btn": + self._stop_sweep() + + def _start_sweep(self) -> None: + if self._sweeping: + return + self._sweeping = True + + start = float(self.query_one("#spec-start", Input).value or "950") + stop = float(self.query_one("#spec-stop", Input).value or "2150") + step = float(self.query_one("#spec-step", Input).value or "5") + dwell = int(self.query_one("#spec-dwell", Input).value or "10") + lnb_lo = float(self.query_one("#spec-lnb", Input).value or "0") + continuous = self.query_one("#spec-continuous", Checkbox).value + + self._sweep_worker = self._do_sweep(start, stop, step, dwell, lnb_lo, continuous) + + def _stop_sweep(self) -> None: + self._sweeping = False + if self._sweep_worker: + self._sweep_worker.cancel() + self._sweep_worker = None + + @work(thread=True) + def _do_sweep(self, start: float, stop: float, step: float, + dwell: int, lnb_lo: float, continuous: bool) -> None: + """Background sweep worker.""" + import time + + try: + self._bridge.ensure_booted() + except Exception: + pass + + sweep_num = 0 + while self._sweeping: + sweep_num += 1 + total_steps = max(1, int((stop - start) / step) + 1) + step_count = [0] + + def progress_cb(freq, step_num, total, result): + step_count[0] = step_num + 1 + pct = (step_num + 1) / total * 100 + self.app.call_from_thread(self._update_progress, pct, freq, sweep_num) + + self.app.call_from_thread(self._update_status, f"Sweeping #{sweep_num}...") + + freqs, powers, results = self._bridge.sweep_spectrum( + start, stop, step, dwell, sr_ksps=20000, callback=progress_cb, + ) + + self.app.call_from_thread(self._update_plot, freqs, powers, results, lnb_lo) + self.app.call_from_thread(self._update_waterfall, powers) + + if not continuous: + break + + time.sleep(0.1) + + self._sweeping = False + self.app.call_from_thread(self._update_status, "Complete") + + def _update_progress(self, pct: float, freq: float, sweep_num: int) -> None: + if not self.is_mounted: + return + self.query_one("#spec-pbar", ProgressBar).update(progress=pct) + self.query_one("#spec-status", Static).update( + f"[#00d4aa]Sweep #{sweep_num}[/] [#506878]{freq:.0f} MHz[/]" + ) + + def _update_status(self, msg: str) -> None: + if not self.is_mounted: + return + self.query_one("#spec-status", Static).update(f"[#506878]{msg}[/]") + self.query_one("#spec-pbar", ProgressBar).update(progress=0) + + def _update_plot(self, freqs, powers, results, lnb_lo) -> None: + if not self.is_mounted: + return + self.query_one("#spec-plot", SpectrumPlot).update_data( + freqs, powers, results, lnb_lo=lnb_lo, + ) + + def _update_waterfall(self, powers) -> None: + if not self.is_mounted: + return + self.query_one("#spec-waterfall", WaterfallDisplay).add_sweep(powers) diff --git a/tui/src/skywalker_tui/screens/track.py b/tui/src/skywalker_tui/screens/track.py new file mode 100644 index 0000000..c405da4 --- /dev/null +++ b/tui/src/skywalker_tui/screens/track.py @@ -0,0 +1,301 @@ +"""Track screen — carrier/beacon tracker with logging and export. + +Locks to a single frequency and records SNR, power, lock state over time. +Displays dual sparklines, an event log for lock transitions, and stats. +Supports CSV/JSONL export. +""" + +import csv +import json +import time +from datetime import datetime +from pathlib import Path + +from textual.app import ComposeResult +from textual.containers import Horizontal, Vertical +from textual.screen import Screen +from textual.widgets import Label, Input, Button, Static, RichLog +from textual import work +from textual.worker import Worker + +from skywalker_tui.widgets.signal_gauge import SignalGauge +from skywalker_tui.widgets.sparkline_widget import SparklineWidget + + +class TrackScreen(Screen): + """Long-running carrier tracker with event log and export.""" + + DEFAULT_CSS = """ + TrackScreen { + layout: vertical; + } + TrackScreen #track-main { + height: 1fr; + layout: vertical; + padding: 1 2; + } + TrackScreen #track-upper { + height: auto; + layout: horizontal; + } + TrackScreen #track-gauge-col { + width: 1fr; + } + TrackScreen #track-sparklines { + width: 1fr; + layout: vertical; + } + TrackScreen #track-log-container { + height: 10; + background: #0e1018; + border: round #1a2a3a; + margin: 1 0; + } + TrackScreen #track-log-title { + height: 1; + color: #00d4aa; + text-style: bold; + padding: 0 1; + } + TrackScreen #track-log { + height: 1fr; + } + TrackScreen #track-stats { + height: 3; + layout: horizontal; + } + TrackScreen #track-stats Static { + width: 1fr; + height: 3; + content-align: center middle; + background: #121c2a; + border: round #1a3050; + margin: 0 1 0 0; + } + TrackScreen #track-controls { + height: auto; + padding: 1 2; + background: #0e1018; + border-top: solid #1a2a3a; + layout: horizontal; + } + TrackScreen #track-controls Label { + width: auto; + margin: 1 1 0 0; + color: #506878; + } + TrackScreen #track-controls Input { + width: 14; + margin: 0 1; + } + TrackScreen #track-controls Button { + margin: 0 1; + } + """ + + def __init__(self, bridge, **kwargs): + super().__init__(**kwargs) + self._bridge = bridge + self._tracking = False + self._track_worker: Worker | None = None + self._sample_count = 0 + self._peak_snr = 0.0 + self._start_time = 0.0 + self._was_locked: bool | None = None + self._records: list[dict] = [] + + def compose(self) -> ComposeResult: + with Vertical(id="track-main"): + with Horizontal(id="track-upper"): + with Vertical(id="track-gauge-col"): + yield SignalGauge(id="track-gauge") + with Vertical(id="track-sparklines"): + yield SparklineWidget(title="SNR (dB)", color="#00d4aa", + id="track-snr-spark") + yield SparklineWidget(title="Power (dB)", color="#2196f3", + id="track-power-spark") + + with Vertical(id="track-log-container"): + yield Static("[#00d4aa bold]Event Log[/]", id="track-log-title") + yield RichLog(id="track-log", wrap=True, markup=True) + + with Horizontal(id="track-stats"): + yield Static("[#506878]Samples:[/] [#00d4aa]0[/]", id="trk-samples") + yield Static("[#506878]Elapsed:[/] [#00d4aa]0s[/]", id="trk-elapsed") + yield Static("[#506878]Peak SNR:[/] [#00d4aa]0.0 dB[/]", id="trk-peak") + yield Static("[#506878]Status:[/] [#e8a020]Stopped[/]", id="trk-status") + + with Horizontal(id="track-controls"): + yield Label("Freq (MHz):") + yield Input("1200", id="trk-freq") + yield Label("SR (ksps):") + yield Input("20000", id="trk-sr") + yield Label("Rate (Hz):") + yield Input("1", id="trk-rate") + yield Button("Start", id="trk-start-btn", variant="success") + yield Button("Stop", id="trk-stop-btn", variant="error") + yield Button("Export CSV", id="trk-csv-btn") + yield Button("Export JSONL", id="trk-jsonl-btn") + + def on_mount(self) -> None: + if self._bridge.is_demo: + self._start_tracking() + + def on_unmount(self) -> None: + self._stop_tracking() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "trk-start-btn": + self._start_tracking() + elif event.button.id == "trk-stop-btn": + self._stop_tracking() + elif event.button.id == "trk-csv-btn": + self._export_csv() + elif event.button.id == "trk-jsonl-btn": + self._export_jsonl() + + def _start_tracking(self) -> None: + if self._tracking: + return + self._tracking = True + self._sample_count = 0 + self._peak_snr = 0.0 + self._was_locked = None + self._records.clear() + self._start_time = time.monotonic() + + freq = float(self.query_one("#trk-freq", Input).value or "1200") + sr = int(self.query_one("#trk-sr", Input).value or "20000") + rate = float(self.query_one("#trk-rate", Input).value or "1") + + self.query_one("#trk-status", Static).update( + "[#506878]Status:[/] [bold #00d4aa]Tracking[/]" + ) + log = self.query_one("#track-log", RichLog) + log.clear() + log.write("[#506878]Tracking started[/]") + + self._track_worker = self._do_track(freq, sr, rate) + + def _stop_tracking(self) -> None: + self._tracking = False + if self._track_worker: + self._track_worker.cancel() + self._track_worker = None + try: + self.query_one("#trk-status", Static).update( + "[#506878]Status:[/] [#e8a020]Stopped[/]" + ) + log = self.query_one("#track-log", RichLog) + log.write(f"[#506878]Stopped. {self._sample_count} samples.[/]") + except Exception: + pass + + @work(thread=True) + def _do_track(self, freq_mhz: float, sr_ksps: int, rate: float) -> None: + """Background tracking loop.""" + interval = 1.0 / max(0.1, rate) + freq_khz = int(freq_mhz * 1000) + sr_sps = sr_ksps * 1000 + + try: + self._bridge.ensure_booted() + self._bridge.tune(sr_sps, freq_khz, 0, 5) + time.sleep(0.3) + except Exception: + pass + + while self._tracking: + t0 = time.monotonic() + try: + sig = self._bridge.signal_monitor() + except Exception: + time.sleep(interval) + continue + + self._sample_count += 1 + snr_db = sig.get("snr_db", 0.0) + locked = sig.get("locked", False) + self._peak_snr = max(self._peak_snr, snr_db) + elapsed = time.monotonic() - self._start_time + + record = { + "ts": datetime.now().isoformat(), + "elapsed": round(elapsed, 3), + "snr_db": round(snr_db, 2), + "agc1": sig.get("agc1", 0), + "agc2": sig.get("agc2", 0), + "power_db": round(sig.get("power_db", -40), 2), + "locked": locked, + } + self._records.append(record) + + # Lock transition detection + lock_event = None + if self._was_locked is not None and locked != self._was_locked: + if locked: + lock_event = ("lock", snr_db) + else: + lock_event = ("unlock", snr_db) + self._was_locked = locked + + self.app.call_from_thread( + self._update_ui, sig, elapsed, lock_event, + ) + + sleep = interval - (time.monotonic() - t0) + if sleep > 0: + time.sleep(sleep) + + def _update_ui(self, sig: dict, elapsed: float, + lock_event: tuple | None) -> None: + if not self.is_mounted: + return + + self.query_one("#track-gauge", SignalGauge).update_signal(sig) + self.query_one("#track-snr-spark", SparklineWidget).push(sig.get("snr_db", 0)) + self.query_one("#track-power-spark", SparklineWidget).push(sig.get("power_db", -40)) + + self.query_one("#trk-samples", Static).update( + f"[#506878]Samples:[/] [#00d4aa]{self._sample_count}[/]" + ) + self.query_one("#trk-elapsed", Static).update( + f"[#506878]Elapsed:[/] [#00d4aa]{elapsed:.0f}s[/]" + ) + self.query_one("#trk-peak", Static).update( + f"[#506878]Peak SNR:[/] [#00d4aa]{self._peak_snr:.1f} dB[/]" + ) + + if lock_event: + ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] + log = self.query_one("#track-log", RichLog) + if lock_event[0] == "lock": + log.write( + f"[#506878]{ts}[/] [bold #00e060]LOCK ACQUIRED[/]" + f" SNR {lock_event[1]:.1f} dB" + ) + else: + log.write( + f"[#506878]{ts}[/] [bold #e04040]LOCK LOST[/]" + ) + + def _export_csv(self) -> None: + if not self._records: + return + path = Path(f"skywalker-track-{datetime.now().strftime('%Y%m%d-%H%M%S')}.csv") + with open(path, "w", newline="") as f: + w = csv.DictWriter(f, fieldnames=list(self._records[0].keys())) + w.writeheader() + w.writerows(self._records) + log = self.query_one("#track-log", RichLog) + log.write(f"[#00d4aa]CSV exported: {path}[/]") + + def _export_jsonl(self) -> None: + if not self._records: + return + path = Path(f"skywalker-track-{datetime.now().strftime('%Y%m%d-%H%M%S')}.jsonl") + with open(path, "w") as f: + for rec in self._records: + f.write(json.dumps(rec) + "\n") + log = self.query_one("#track-log", RichLog) + log.write(f"[#00d4aa]JSONL exported: {path}[/]") diff --git a/tui/src/skywalker_tui/theme.tcss b/tui/src/skywalker_tui/theme.tcss new file mode 100644 index 0000000..5919970 --- /dev/null +++ b/tui/src/skywalker_tui/theme.tcss @@ -0,0 +1,305 @@ +/* SkyWalker-1 TUI — dark RF theme + * + * Design: dark background with teal/cyan data accents. + * Signal gradient: blue → green → yellow → red (cold → hot). + * No purple per user preference. + */ + +/* ─── Global ─── */ + +Screen { + background: #0a0a12; + color: #c8d0d8; +} + +Header { + background: #0e1018; + color: #00d4aa; + dock: top; +} + +Footer { + background: #0e1018; + dock: bottom; +} + +/* ─── Sidebar ─── */ + +#sidebar { + width: 26; + background: #0e1420; + border-right: solid #1a2a3a; + padding: 1 1; +} + +#sidebar .mode-button { + width: 100%; + margin: 0 0 1 0; + min-height: 3; + background: #121c2a; + color: #7090a8; + border: round #1a3050; + text-align: center; +} + +#sidebar .mode-button:hover { + background: #1a2a40; + color: #00d4aa; + border: round #00d4aa; +} + +#sidebar .mode-button.-active { + background: #0a2a3a; + color: #00d4aa; + border: round #00d4aa; + text-style: bold; +} + +#sidebar Label.sidebar-heading { + color: #506878; + text-style: bold; + margin: 1 0 0 0; + text-align: center; +} + +/* ─── Content area ─── */ + +#content-area { + background: #0a0a12; +} + +/* ─── Status bar widget ─── */ + +#device-status { + height: 3; + background: #0e1420; + border-top: solid #1a2a3a; + padding: 0 1; + dock: bottom; +} + +#device-status .status-label { + color: #506878; +} + +#device-status .status-value { + color: #00d4aa; +} + +#device-status .status-connected { + color: #00d4aa; + text-style: bold; +} + +#device-status .status-demo { + color: #e8a020; + text-style: bold; +} + +#device-status .status-disconnected { + color: #e04040; + text-style: bold; +} + +/* ─── Signal gauge ─── */ + +.signal-gauge { + height: auto; + padding: 1; +} + +.signal-gauge .snr-value { + color: #00d4aa; + text-style: bold; +} + +.signal-gauge .lock-yes { + color: #00e060; + text-style: bold; +} + +.signal-gauge .lock-no { + color: #e04040; +} + +/* ─── Spectrum plot ─── */ + +.spectrum-plot { + min-height: 12; +} + +/* ─── Panels and containers ─── */ + +.panel { + background: #0e1420; + border: round #1a2a3a; + padding: 1; + margin: 0 0 1 0; +} + +.panel-title { + color: #00d4aa; + text-style: bold; + margin: 0 0 1 0; +} + +/* ─── Controls / Input areas ─── */ + +.controls { + height: auto; + padding: 1; + background: #0e1018; + border-top: solid #1a2a3a; + dock: bottom; +} + +.controls Label { + color: #506878; + width: auto; + margin: 0 1 0 0; +} + +.controls Input { + width: 14; + background: #121c2a; + border: round #1a3050; + color: #c8d0d8; +} + +.controls Input:focus { + border: round #00d4aa; +} + +.controls Button { + margin: 0 1; + background: #1a2a40; + color: #00d4aa; + border: round #1a3050; +} + +.controls Button:hover { + background: #00d4aa; + color: #0a0a12; +} + +/* ─── Data table ─── */ + +DataTable { + background: #0a0a12; +} + +DataTable > .datatable--header { + background: #0e1420; + color: #00d4aa; + text-style: bold; +} + +DataTable > .datatable--cursor { + background: #1a2a40; + color: #ffffff; +} + +/* ─── Progress bar ─── */ + +ProgressBar Bar { + color: #00d4aa; + background: #121c2a; +} + +/* ─── Sparkline ─── */ + +.sparkline-widget { + height: 3; + padding: 0 1; +} + +/* ─── Waterfall ─── */ + +.waterfall { + min-height: 10; +} + +/* ─── Log / event list ─── */ + +#event-log { + height: 8; + background: #0e1018; + border: round #1a2a3a; + padding: 0 1; + overflow-y: auto; +} + +#event-log .log-lock { + color: #00e060; +} + +#event-log .log-unlock { + color: #e04040; +} + +#event-log .log-time { + color: #506878; +} + +/* ─── Stats panel ─── */ + +.stats-grid { + layout: grid; + grid-size: 4; + grid-gutter: 1; + height: auto; + padding: 1; +} + +.stat-box { + height: 3; + background: #121c2a; + border: round #1a3050; + padding: 0 1; + content-align: center middle; +} + +.stat-box .stat-value { + color: #00d4aa; + text-style: bold; +} + +.stat-box .stat-label { + color: #506878; +} + +/* ─── L-band allocation ─── */ + +.alloc-tag { + background: #1a2a40; + color: #60a0c0; + padding: 0 1; + margin: 0 1 0 0; +} + +/* ─── Mode-specific screen layouts ─── */ + +.mode-screen { + layout: vertical; +} + +.top-panel { + height: 1fr; + min-height: 10; +} + +.bottom-panel { + height: auto; +} + +.split-horizontal { + layout: horizontal; +} + +.left-panel { + width: 1fr; +} + +.right-panel { + width: 1fr; +} diff --git a/tui/src/skywalker_tui/widgets/__init__.py b/tui/src/skywalker_tui/widgets/__init__.py new file mode 100644 index 0000000..00f5419 --- /dev/null +++ b/tui/src/skywalker_tui/widgets/__init__.py @@ -0,0 +1 @@ +"""Custom widgets for SkyWalker-1 TUI.""" diff --git a/tui/src/skywalker_tui/widgets/frequency_table.py b/tui/src/skywalker_tui/widgets/frequency_table.py new file mode 100644 index 0000000..442b4f0 --- /dev/null +++ b/tui/src/skywalker_tui/widgets/frequency_table.py @@ -0,0 +1,81 @@ +"""DataTable wrapper for transponder scan results.""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "tools")) + +from textual.widget import Widget +from textual.widgets import DataTable +from textual.app import ComposeResult + +from skywalker_lib import LBAND_ALLOCATIONS + + +def _freq_allocation(freq_mhz: float) -> str: + """Return allocation name for a given frequency.""" + for lo, hi, name in LBAND_ALLOCATIONS: + if lo <= freq_mhz <= hi: + return name + return "" + + +class FrequencyTable(Widget): + """Sortable table of discovered transponders or sweep results.""" + + DEFAULT_CSS = """ + FrequencyTable { + height: 1fr; + min-height: 6; + } + """ + + def __init__(self, show_allocation: bool = False, **kwargs): + super().__init__(**kwargs) + self._show_allocation = show_allocation + self._rows: list[dict] = [] + + def compose(self) -> ComposeResult: + table = DataTable(id="freq-table") + table.cursor_type = "row" + yield table + + def on_mount(self) -> None: + table = self.query_one("#freq-table", DataTable) + cols = ["IF MHz", "RF MHz", "SR ksps", "Power dB", "Locked"] + if self._show_allocation: + cols.append("Allocation") + for col in cols: + table.add_column(col, key=col) + + def add_transponder(self, tp: dict) -> None: + """Add a single transponder result.""" + self._rows.append(tp) + table = self.query_one("#freq-table", DataTable) + + if_mhz = tp.get("if_mhz", 0) + rf_mhz = tp.get("rf_mhz", 0) + sr_ksps = tp.get("sr_ksps", 0) + power_db = tp.get("power_db", 0) + locked = "Yes" if tp.get("locked", False) else "No" + + row = [f"{if_mhz:.1f}", f"{rf_mhz:.0f}", str(sr_ksps), + f"{power_db:.1f}", locked] + if self._show_allocation: + row.append(_freq_allocation(if_mhz)) + + table.add_row(*row) + + def clear_table(self) -> None: + """Remove all rows.""" + self._rows.clear() + table = self.query_one("#freq-table", DataTable) + table.clear() + + def get_selected_transponder(self) -> dict | None: + """Return the currently selected transponder dict.""" + table = self.query_one("#freq-table", DataTable) + cursor_row = table.cursor_row + if cursor_row is not None and 0 <= cursor_row < len(self._rows): + return self._rows[cursor_row] + return None diff --git a/tui/src/skywalker_tui/widgets/signal_gauge.py b/tui/src/skywalker_tui/widgets/signal_gauge.py new file mode 100644 index 0000000..26e3630 --- /dev/null +++ b/tui/src/skywalker_tui/widgets/signal_gauge.py @@ -0,0 +1,120 @@ +"""Large signal strength gauge with SNR bar and lock indicator.""" + +from textual.app import ComposeResult +from textual.widget import Widget +from textual.widgets import Label, Static +from textual.reactive import reactive + + +# Bar characters for sub-block resolution +_BARS = " ▏▎▍▌▋▊▉█" + + +def _snr_color(snr_db: float) -> str: + """Map SNR to a hex color: blue → cyan → green → yellow → red.""" + if snr_db < 2: + return "#1565c0" + elif snr_db < 4: + return "#0097a7" + elif snr_db < 6: + return "#00bfa5" + elif snr_db < 8: + return "#00d4aa" + elif snr_db < 10: + return "#4caf50" + elif snr_db < 12: + return "#8bc34a" + elif snr_db < 14: + return "#cddc39" + elif snr_db < 16: + return "#ffc107" + else: + return "#f44336" + + +def _build_bar(pct: float, width: int = 40) -> str: + """Build a Unicode block bar string with sub-character precision.""" + pct = max(0.0, min(100.0, pct)) + ratio = pct / 100.0 + full = int(ratio * width) + remainder = (ratio * width) - full + partial = int(remainder * (len(_BARS) - 1)) + bar = "█" * full + if full < width: + bar += _BARS[partial] + bar += " " * (width - full - 1) + return bar + + +class SignalGauge(Widget): + """Large signal strength display with SNR, power, and lock state.""" + + DEFAULT_CSS = """ + SignalGauge { + height: auto; + padding: 1 2; + background: #0e1420; + border: round #1a2a3a; + margin: 0 0 1 0; + } + SignalGauge #gauge-header { + height: 1; + margin: 0 0 1 0; + } + SignalGauge #gauge-bar-line { + height: 1; + } + SignalGauge #gauge-details { + height: 1; + margin: 1 0 0 0; + color: #506878; + } + """ + + snr_db = reactive(0.0) + snr_pct = reactive(0.0) + power_db = reactive(-40.0) + locked = reactive(False) + agc1 = reactive(0) + + def compose(self) -> ComposeResult: + yield Static("", id="gauge-header") + yield Static("", id="gauge-bar-line") + yield Static("", id="gauge-details") + + def watch_snr_db(self) -> None: + self._refresh_display() + + def watch_locked(self) -> None: + self._refresh_display() + + def update_signal(self, sig: dict) -> None: + """Update from a signal_monitor() result dict.""" + self.snr_db = sig.get("snr_db", 0.0) + self.snr_pct = sig.get("snr_pct", 0.0) + self.power_db = sig.get("power_db", -40.0) + self.locked = sig.get("locked", False) + self.agc1 = sig.get("agc1", 0) + + def _refresh_display(self) -> None: + if not self.is_mounted: + return + + color = _snr_color(self.snr_db) + lock_str = "[bold #00e060]LOCK[/]" if self.locked else "[#e04040]NO LOCK[/]" + + header = self.query_one("#gauge-header", Static) + header.update( + f" {lock_str} " + f"[bold {color}]{self.snr_db:6.1f} dB[/] " + f"[#506878]{self.snr_pct:5.1f}%[/]" + ) + + bar_str = _build_bar(self.snr_pct, width=50) + bar_line = self.query_one("#gauge-bar-line", Static) + bar_line.update(f" [{color}]{bar_str}[/]") + + details = self.query_one("#gauge-details", Static) + details.update( + f" Power: {self.power_db:6.1f} dB AGC: {self.agc1:5d}" + ) diff --git a/tui/src/skywalker_tui/widgets/sparkline_widget.py b/tui/src/skywalker_tui/widgets/sparkline_widget.py new file mode 100644 index 0000000..b485310 --- /dev/null +++ b/tui/src/skywalker_tui/widgets/sparkline_widget.py @@ -0,0 +1,71 @@ +"""Rolling sparkline time series widget using Unicode spark characters.""" + +from collections import deque + +from textual.widget import Widget +from textual.widgets import Static +from textual.app import ComposeResult + +_SPARKS = "▁▂▃▄▅▆▇█" + + +class SparklineWidget(Widget): + """Rolling time-series display using Unicode block characters.""" + + DEFAULT_CSS = """ + SparklineWidget { + height: 3; + padding: 0 1; + background: #0e1420; + border: round #1a2a3a; + margin: 0 0 1 0; + } + SparklineWidget #spark-label { + height: 1; + color: #506878; + } + SparklineWidget #spark-line { + height: 1; + } + """ + + def __init__(self, title: str = "History", max_width: int = 80, + color: str = "#00d4aa", **kwargs): + super().__init__(**kwargs) + self._title = title + self._max_width = max_width + self._color = color + self._values: deque[float] = deque(maxlen=max_width) + + def compose(self) -> ComposeResult: + yield Static(f"[#506878]{self._title}[/]", id="spark-label") + yield Static("", id="spark-line") + + def push(self, value: float) -> None: + """Add a new data point and refresh the display.""" + self._values.append(value) + self._refresh() + + def clear(self) -> None: + self._values.clear() + if self.is_mounted: + self.query_one("#spark-line", Static).update("") + + def _refresh(self) -> None: + if not self.is_mounted or not self._values: + return + + vals = list(self._values) + mn = min(vals) + mx = max(vals) + rng = mx - mn if mx != mn else 1.0 + + chars = [] + for v in vals: + idx = int((v - mn) / rng * (len(_SPARKS) - 1)) + idx = max(0, min(len(_SPARKS) - 1, idx)) + chars.append(_SPARKS[idx]) + + spark_str = "".join(chars) + line = self.query_one("#spark-line", Static) + line.update(f"[{self._color}]{spark_str}[/] [{mn:.1f} .. {mx:.1f}]") diff --git a/tui/src/skywalker_tui/widgets/spectrum_plot.py b/tui/src/skywalker_tui/widgets/spectrum_plot.py new file mode 100644 index 0000000..647f372 --- /dev/null +++ b/tui/src/skywalker_tui/widgets/spectrum_plot.py @@ -0,0 +1,155 @@ +"""Terminal-native spectrum plot using Unicode block characters and Rich markup. + +Renders a horizontal bar chart where each frequency bin gets a colored bar +proportional to its power level. The color gradient goes from cold (blue) +to hot (red), same concept as the CLI tool's WATERFALL_COLORS but using +Rich style strings instead of raw ANSI escapes. +""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "tools")) + +from textual.widget import Widget +from textual.widgets import Static +from textual.app import ComposeResult +from textual.containers import VerticalScroll + +from skywalker_lib import detect_peaks, if_to_rf + +# Sub-block characters for fractional bar width +_BARS = " ▏▎▍▌▋▊▉█" + +# Power-to-color gradient (16 steps: blue → cyan → green → yellow → red) +_POWER_COLORS = [ + "#1a237e", # dark blue (weakest) + "#1565c0", + "#0277bd", + "#00838f", + "#00897b", # teal + "#2e7d32", # green + "#558b2f", + "#9e9d24", + "#f9a825", # yellow + "#ff8f00", + "#ef6c00", # orange + "#e65100", + "#d84315", + "#c62828", # red + "#b71c1c", + "#880e0e", # dark red (strongest) +] + + +def _power_to_color(power_db: float, floor: float, ceiling: float) -> str: + """Map a power value to a color from the gradient.""" + if ceiling == floor: + return _POWER_COLORS[len(_POWER_COLORS) // 2] + ratio = (power_db - floor) / (ceiling - floor) + ratio = max(0.0, min(1.0, ratio)) + idx = int(ratio * (len(_POWER_COLORS) - 1)) + return _POWER_COLORS[idx] + + +class SpectrumPlot(Widget): + """Bar-chart spectrum display rendered with Unicode blocks and Rich styles.""" + + DEFAULT_CSS = """ + SpectrumPlot { + height: 1fr; + min-height: 12; + background: #0a0a12; + padding: 0 1; + } + SpectrumPlot #spectrum-title { + height: 1; + color: #00d4aa; + text-style: bold; + } + SpectrumPlot #spectrum-body { + height: 1fr; + } + """ + + def __init__(self, title: str = "Spectrum", bar_width: int = 40, + lnb_lo: float = 0.0, **kwargs): + super().__init__(**kwargs) + self._title = title + self._bar_width = bar_width + self._lnb_lo = lnb_lo + self._freqs: list[float] = [] + self._powers: list[float] = [] + self._results: list[dict] = [] + + def compose(self) -> ComposeResult: + yield Static(f"[#00d4aa bold]{self._title}[/]", id="spectrum-title") + yield VerticalScroll(Static("", id="spectrum-lines"), id="spectrum-body") + + def update_data(self, freqs: list[float], powers: list[float], + results: list[dict] | None = None, lnb_lo: float | None = None): + """Update with new sweep data and redraw.""" + self._freqs = freqs + self._powers = powers + self._results = results or [{} for _ in freqs] + if lnb_lo is not None: + self._lnb_lo = lnb_lo + self._refresh() + + def _refresh(self) -> None: + if not self.is_mounted or not self._freqs: + return + + p_min = min(self._powers) + p_max = max(self._powers) + p_range = p_max - p_min if p_max != p_min else 1.0 + + # Detect peaks for markers + peaks_set = set() + peaks = detect_peaks(self._freqs, self._powers, threshold_db=3.0) + for _f, _p, idx in peaks: + peaks_set.add(idx) + + lines = [] + for i, (f, p) in enumerate(zip(self._freqs, self._powers)): + # Frequency label (RF or IF) + if self._lnb_lo > 0: + label = f"{if_to_rf(f, self._lnb_lo):7.0f}" + else: + label = f"{f:7.1f}" + + # Bar + ratio = max(0.0, min(1.0, (p - p_min) / p_range)) + full = int(ratio * self._bar_width) + remainder = (ratio * self._bar_width) - full + partial = int(remainder * (len(_BARS) - 1)) + color = _power_to_color(p, p_min, p_max) + + bar = "█" * full + if full < self._bar_width: + bar += _BARS[partial] + bar += " " * (self._bar_width - full - 1) + + locked = self._results[i].get("locked", False) if i < len(self._results) else False + lock_mark = " [bold #00e060]*[/]" if locked else "" + peak_mark = " [bold #f44336]^[/]" if i in peaks_set else "" + + lines.append( + f"[#506878]{label}[/] [{color}]{bar}[/] [#7090a8]{p:6.1f}[/]{lock_mark}{peak_mark}" + ) + + # Peak summary at bottom + if peaks: + lines.append("") + lines.append(f"[#00d4aa bold]Peaks ({len(peaks)}):[/]") + for freq, pwr, idx in peaks: + if self._lnb_lo > 0: + fl = f"{if_to_rf(freq, self._lnb_lo):.0f} MHz RF" + else: + fl = f"{freq:.1f} MHz" + locked = self._results[idx].get("locked", False) if idx < len(self._results) else False + lock_s = " [bold #00e060]LOCKED[/]" if locked else "" + lines.append(f" [#c8d0d8]{fl} {pwr:.1f} dB{lock_s}[/]") + + body = self.query_one("#spectrum-lines", Static) + body.update("\n".join(lines)) diff --git a/tui/src/skywalker_tui/widgets/status_bar.py b/tui/src/skywalker_tui/widgets/status_bar.py new file mode 100644 index 0000000..a771eae --- /dev/null +++ b/tui/src/skywalker_tui/widgets/status_bar.py @@ -0,0 +1,71 @@ +"""Device status bar — connection state, firmware version, config bits.""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "tools")) + +from textual.app import ComposeResult +from textual.containers import Horizontal +from textual.widget import Widget +from textual.widgets import Label + +from skywalker_lib import format_config_bits + + +class DeviceStatusBar(Widget): + """Bottom status bar showing device connection and configuration.""" + + DEFAULT_CSS = """ + DeviceStatusBar { + height: 3; + background: #0e1420; + border-top: solid #1a2a3a; + padding: 0 1; + dock: bottom; + layout: horizontal; + } + DeviceStatusBar Label { + width: auto; + margin: 1 2 0 0; + } + """ + + def __init__(self, bridge=None): + super().__init__(id="device-status") + self._bridge = bridge + + def compose(self) -> ComposeResult: + yield Label("", id="status-conn") + yield Label("", id="status-fw") + yield Label("", id="status-config") + + def update_status(self, bridge=None): + if bridge is not None: + self._bridge = bridge + if self._bridge is None: + return + + conn_label = self.query_one("#status-conn", Label) + fw_label = self.query_one("#status-fw", Label) + config_label = self.query_one("#status-config", Label) + + if self._bridge.is_demo: + conn_label.update("[bold #e8a020]DEMO[/]") + else: + conn_label.update("[bold #00d4aa]CONNECTED[/]") + + try: + fw = self._bridge.get_fw_version() + fw_label.update(f"[#506878]FW:[/] [#c8d0d8]{fw['version']}[/]") + except Exception: + fw_label.update("[#506878]FW:[/] [#e04040]error[/]") + + try: + config = self._bridge.get_config() + bits = format_config_bits(config) + active = [name for name, is_set in bits if is_set] + config_str = " | ".join(active) if active else "idle" + config_label.update(f"[#506878]Config:[/] [#7090a8]{config_str}[/]") + except Exception: + config_label.update("[#506878]Config:[/] [#e04040]error[/]") diff --git a/tui/src/skywalker_tui/widgets/waterfall.py b/tui/src/skywalker_tui/widgets/waterfall.py new file mode 100644 index 0000000..49f8787 --- /dev/null +++ b/tui/src/skywalker_tui/widgets/waterfall.py @@ -0,0 +1,98 @@ +"""Rolling waterfall display — each row is one sweep, color = power level. + +The waterfall auto-scrolls: new sweeps appear at the top, older rows shift down. +Uses Rich markup with the same 16-color power gradient as the spectrum plot. +""" + +from collections import deque +from datetime import datetime + +from textual.widget import Widget +from textual.widgets import Static +from textual.app import ComposeResult +from textual.containers import VerticalScroll + + +# Same gradient as spectrum_plot +_WATERFALL_COLORS = [ + "#1a237e", "#1565c0", "#0277bd", "#00838f", + "#00897b", "#2e7d32", "#558b2f", "#9e9d24", + "#f9a825", "#ff8f00", "#ef6c00", "#e65100", + "#d84315", "#c62828", "#b71c1c", "#880e0e", +] + + +class WaterfallDisplay(Widget): + """Scrolling waterfall of spectrum sweeps.""" + + DEFAULT_CSS = """ + WaterfallDisplay { + height: 1fr; + min-height: 8; + background: #0a0a12; + padding: 0 1; + } + WaterfallDisplay #waterfall-title { + height: 1; + color: #00d4aa; + text-style: bold; + } + WaterfallDisplay #waterfall-body { + height: 1fr; + } + """ + + def __init__(self, title: str = "Waterfall", max_rows: int = 50, **kwargs): + super().__init__(**kwargs) + self._title = title + self._max_rows = max_rows + self._rows: deque[tuple[str, list[float]]] = deque(maxlen=max_rows) + self._global_min: float = -40.0 + self._global_max: float = 0.0 + + def compose(self) -> ComposeResult: + yield Static(f"[#00d4aa bold]{self._title}[/]", id="waterfall-title") + yield VerticalScroll(Static("", id="waterfall-lines"), id="waterfall-body") + + def add_sweep(self, powers: list[float]) -> None: + """Add a new sweep row and refresh.""" + ts = datetime.now().strftime("%H:%M:%S") + self._rows.appendleft((ts, list(powers))) + + # Update global range for consistent coloring + if powers: + self._global_min = min(self._global_min, min(powers)) + self._global_max = max(self._global_max, max(powers)) + + self._refresh() + + def clear(self) -> None: + self._rows.clear() + self._global_min = -40.0 + self._global_max = 0.0 + if self.is_mounted: + self.query_one("#waterfall-lines", Static).update("") + + def _refresh(self) -> None: + if not self.is_mounted or not self._rows: + return + + rng = self._global_max - self._global_min + if rng == 0: + rng = 1.0 + + lines = [] + for ts, powers in self._rows: + chars = [] + for p in powers: + ratio = (p - self._global_min) / rng + ratio = max(0.0, min(1.0, ratio)) + idx = int(ratio * (len(_WATERFALL_COLORS) - 1)) + color = _WATERFALL_COLORS[idx] + chars.append(f"[{color}]█[/]") + + line = "".join(chars) + lines.append(f"[#506878]{ts}[/] {line}") + + body = self.query_one("#waterfall-lines", Static) + body.update("\n".join(lines)) diff --git a/tui/uv.lock b/tui/uv.lock new file mode 100644 index 0000000..15101ac --- /dev/null +++ b/tui/uv.lock @@ -0,0 +1,143 @@ +version = 1 +revision = 3 +requires-python = ">=3.11" + +[[package]] +name = "linkify-it-py" +version = "2.0.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "uc-micro-py" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/2a/ae/bb56c6828e4797ba5a4821eec7c43b8bf40f69cda4d4f5f8c8a2810ec96a/linkify-it-py-2.0.3.tar.gz", hash = "sha256:68cda27e162e9215c17d786649d1da0021a451bdc436ef9e0fa0ba5234b9b048", size = 27946, upload-time = "2024-02-04T14:48:04.179Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/04/1e/b832de447dee8b582cac175871d2f6c3d5077cc56d5575cadba1fd1cccfa/linkify_it_py-2.0.3-py3-none-any.whl", hash = "sha256:6bcbc417b0ac14323382aef5c5192c0075bf8a9d6b41820a2b66371eac6b6d79", size = 19820, upload-time = "2024-02-04T14:48:02.496Z" }, +] + +[[package]] +name = "markdown-it-py" +version = "4.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "mdurl" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5b/f5/4ec618ed16cc4f8fb3b701563655a69816155e79e24a17b651541804721d/markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3", size = 73070, upload-time = "2025-08-11T12:57:52.854Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" }, +] + +[package.optional-dependencies] +linkify = [ + { name = "linkify-it-py" }, +] + +[[package]] +name = "mdit-py-plugins" +version = "0.5.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b2/fd/a756d36c0bfba5f6e39a1cdbdbfdd448dc02692467d83816dff4592a1ebc/mdit_py_plugins-0.5.0.tar.gz", hash = "sha256:f4918cb50119f50446560513a8e311d574ff6aaed72606ddae6d35716fe809c6", size = 44655, upload-time = "2025-08-11T07:25:49.083Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fb/86/dd6e5db36df29e76c7a7699123569a4a18c1623ce68d826ed96c62643cae/mdit_py_plugins-0.5.0-py3-none-any.whl", hash = "sha256:07a08422fc1936a5d26d146759e9155ea466e842f5ab2f7d2266dd084c8dab1f", size = 57205, upload-time = "2025-08-11T07:25:47.597Z" }, +] + +[[package]] +name = "mdurl" +version = "0.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729, upload-time = "2022-08-14T12:40:10.846Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" }, +] + +[[package]] +name = "platformdirs" +version = "4.7.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/71/25/ccd8e88fcd16a4eb6343a8b4b9635e6f3928a7ebcd82822a14d20e3ca29f/platformdirs-4.7.0.tar.gz", hash = "sha256:fd1a5f8599c85d49b9ac7d6e450bc2f1aaf4a23f1fe86d09952fe20ad365cf36", size = 23118, upload-time = "2026-02-12T22:21:53.764Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cb/e3/1eddccb2c39ecfbe09b3add42a04abcc3fa5b468aa4224998ffb8a7e9c8f/platformdirs-4.7.0-py3-none-any.whl", hash = "sha256:1ed8db354e344c5bb6039cd727f096af975194b508e37177719d562b2b540ee6", size = 18983, upload-time = "2026-02-12T22:21:52.237Z" }, +] + +[[package]] +name = "pygments" +version = "2.19.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" }, +] + +[[package]] +name = "pyusb" +version = "1.3.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/00/6b/ce3727395e52b7b76dfcf0c665e37d223b680b9becc60710d4bc08b7b7cb/pyusb-1.3.1.tar.gz", hash = "sha256:3af070b607467c1c164f49d5b0caabe8ac78dbed9298d703a8dbf9df4052d17e", size = 77281, upload-time = "2025-01-08T23:45:01.866Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/28/b8/27e6312e86408a44fe16bd28ee12dd98608b39f7e7e57884a24e8f29b573/pyusb-1.3.1-py3-none-any.whl", hash = "sha256:bf9b754557af4717fe80c2b07cc2b923a9151f5c08d17bdb5345dac09d6a0430", size = 58465, upload-time = "2025-01-08T23:45:00.029Z" }, +] + +[[package]] +name = "rich" +version = "14.3.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py" }, + { name = "pygments" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/74/99/a4cab2acbb884f80e558b0771e97e21e939c5dfb460f488d19df485e8298/rich-14.3.2.tar.gz", hash = "sha256:e712f11c1a562a11843306f5ed999475f09ac31ffb64281f73ab29ffdda8b3b8", size = 230143, upload-time = "2026-02-01T16:20:47.908Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ef/45/615f5babd880b4bd7d405cc0dc348234c5ffb6ed1ea33e152ede08b2072d/rich-14.3.2-py3-none-any.whl", hash = "sha256:08e67c3e90884651da3239ea668222d19bea7b589149d8014a21c633420dbb69", size = 309963, upload-time = "2026-02-01T16:20:46.078Z" }, +] + +[[package]] +name = "skywalker-tui" +version = "0.1.0" +source = { editable = "." } +dependencies = [ + { name = "pyusb" }, + { name = "textual" }, +] + +[package.metadata] +requires-dist = [ + { name = "pyusb", specifier = ">=1.3" }, + { name = "textual", specifier = ">=3.0" }, +] + +[[package]] +name = "textual" +version = "7.5.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py", extra = ["linkify"] }, + { name = "mdit-py-plugins" }, + { name = "platformdirs" }, + { name = "pygments" }, + { name = "rich" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/9f/38/7d169a765993efde5095c70a668bf4f5831bb7ac099e932f2783e9b71abf/textual-7.5.0.tar.gz", hash = "sha256:c730cba1e3d704e8f1ca915b6a3af01451e3bca380114baacf6abf87e9dac8b6", size = 1592319, upload-time = "2026-01-30T13:46:39.881Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/9c/78/96ddb99933e11d91bc6e05edae23d2687e44213066bcbaca338898c73c47/textual-7.5.0-py3-none-any.whl", hash = "sha256:849dfee9d705eab3b2d07b33152b7bd74fb1f5056e002873cc448bce500c6374", size = 718164, upload-time = "2026-01-30T13:46:37.635Z" }, +] + +[[package]] +name = "typing-extensions" +version = "4.15.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/72/94/1a15dd82efb362ac84269196e94cf00f187f7ed21c242792a923cdb1c61f/typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466", size = 109391, upload-time = "2025-08-25T13:49:26.313Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" }, +] + +[[package]] +name = "uc-micro-py" +version = "1.0.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/91/7a/146a99696aee0609e3712f2b44c6274566bc368dfe8375191278045186b8/uc-micro-py-1.0.3.tar.gz", hash = "sha256:d321b92cff673ec58027c04015fcaa8bb1e005478643ff4a500882eaab88c48a", size = 6043, upload-time = "2024-02-09T16:52:01.654Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/37/87/1f677586e8ac487e29672e4b17455758fce261de06a0d086167bb760361a/uc_micro_py-1.0.3-py3-none-any.whl", hash = "sha256:db1dffff340817673d7b466ec86114a9dc0e9d4d9b5ba229d9d60e5c12600cd5", size = 6229, upload-time = "2024-02-09T16:52:00.371Z" }, +]