Ryan Malloy 64c33985a3 Add Textual TUI for SkyWalker-1 RF tool
Separate entry point (skywalker-tui) that reuses skywalker_lib.py
unchanged. Five RF modes: spectrum, scan, monitor, lband, track —
each with threaded USB bridge workers for non-blocking I/O.

Includes --demo mode with synthetic signal generation (Gaussian
peaks, noise floor, AGC simulation) for development without hardware.

Custom widgets: spectrum bar chart, rolling waterfall, signal gauge,
sparkline history, transponder table, device status bar.
2026-02-13 04:39:55 -07:00

95 lines
3.3 KiB
Python

"""Thread-safe bridge between Textual async event loop and blocking pyusb calls.
All SkyWalker1 (or DemoDevice) methods are blocking I/O — they perform USB control
transfers that can take 2-200ms each. Textual's event loop is asyncio-based, so
calling these directly would freeze the UI.
The USBBridge wraps every device method behind a threading.Lock to prevent concurrent
USB access (the BCM4500 can't handle overlapping control transfers) and exposes them
as plain synchronous methods meant to be called from Textual @work(thread=True) workers.
"""
import threading
class USBBridge:
"""Thread-safe wrapper around SkyWalker1 or DemoDevice."""
def __init__(self, device):
self._dev = device
self._lock = threading.Lock()
@property
def is_demo(self) -> bool:
return hasattr(self._dev, "_demo")
def open(self):
with self._lock:
if hasattr(self._dev, "open"):
self._dev.open()
def close(self):
with self._lock:
if hasattr(self._dev, "close"):
self._dev.close()
def get_fw_version(self) -> dict:
with self._lock:
return self._dev.get_fw_version()
def get_config(self) -> int:
with self._lock:
return self._dev.get_config()
def ensure_booted(self):
with self._lock:
self._dev.ensure_booted()
def signal_monitor(self) -> dict:
with self._lock:
return self._dev.signal_monitor()
def sweep_spectrum(self, start_mhz: float, stop_mhz: float,
step_mhz: float = 5.0, dwell_ms: int = 10,
sr_ksps: int = 20000, mod_index: int = 0,
fec_index: int = 5, callback=None) -> tuple:
with self._lock:
return self._dev.sweep_spectrum(
start_mhz, stop_mhz, step_mhz, dwell_ms,
sr_ksps, mod_index, fec_index, callback,
)
def tune_monitor(self, symbol_rate_sps: int, freq_khz: int,
mod_index: int, fec_index: int,
dwell_ms: int = 10) -> dict:
with self._lock:
return self._dev.tune_monitor(
symbol_rate_sps, freq_khz, mod_index, fec_index, dwell_ms,
)
def tune(self, symbol_rate_sps: int, freq_khz: int,
mod_index: int, fec_index: int):
with self._lock:
self._dev.tune(symbol_rate_sps, freq_khz, mod_index, fec_index)
def set_lnb_voltage(self, high: bool):
with self._lock:
self._dev.set_lnb_voltage(high)
def set_22khz_tone(self, on: bool):
with self._lock:
self._dev.set_22khz_tone(on)
def configure_lnb(self, pol=None, band=None, lnb_lo=None,
disable_lnb=False) -> float:
with self._lock:
return self._dev.configure_lnb(pol, band, lnb_lo, disable_lnb)
def blind_scan(self, freq_khz: int, sr_min: int, sr_max: int,
sr_step: int) -> dict | None:
"""Run blind scan at a single frequency. Returns result dict or None."""
with self._lock:
if hasattr(self._dev, "blind_scan"):
return self._dev.blind_scan(freq_khz, sr_min, sr_max, sr_step)
return None