#!/usr/bin/env python3 """ Carrier survey CLI for the Genpix SkyWalker-1. Subcommands: full-scan Full six-stage carrier survey quick-scan Fast sweep + peak detection only diff Compare two saved survey catalogs export Export a survey to CSV, JSON, or text view View the latest or a specified survey qo100 QO-100 narrowband transponder survey with optimized params """ import sys import os import argparse import csv import io import json import time # Ensure the tools directory is on the import path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from skywalker_lib import SkyWalker1 from signal_analysis import adaptive_noise_floor, detect_peaks_enhanced, classify_carrier from carrier_catalog import CarrierCatalog, CarrierEntry, CatalogDiff, CATALOG_DIR from survey_engine import SurveyEngine def progress_callback(verbose: bool): """Return a callback function for SurveyEngine progress reporting.""" def cb(stage, pct, msg): if verbose: print(f" [{stage:>17s}] {pct:5.1f}% {msg}", file=sys.stderr) else: sys.stderr.write(f"\r {stage}: {pct:.0f}% {msg[:60]:<60s}") sys.stderr.flush() return cb # -- Subcommand handlers -- def cmd_full_scan(args: argparse.Namespace) -> None: """Run a full six-stage carrier survey.""" print(f"SkyWalker-1 Full Carrier Survey") print(f" Range: {args.start}-{args.stop} MHz") print(f" Coarse step: {args.coarse_step} MHz, Fine step: {args.fine_step} MHz") print(f" SR range: {args.sr_min / 1e6:.1f} - {args.sr_max / 1e6:.1f} Msps") print() cb = progress_callback(args.verbose) with SkyWalker1(verbose=args.verbose) as dev: dev.ensure_booted() if args.pol or args.band: dev.configure_lnb(pol=args.pol, band=args.band) engine = SurveyEngine(dev, callback=cb) catalog = engine.run_full_scan( start_mhz=args.start, stop_mhz=args.stop, coarse_step=args.coarse_step, fine_step=args.fine_step, sr_min=args.sr_min, sr_max=args.sr_max, sr_step=args.sr_step, ) if not args.verbose: sys.stderr.write("\r" + " " * 80 + "\r") sys.stderr.flush() # Set catalog metadata catalog.band = args.band or "" catalog.pol = args.pol or "" if args.name: catalog.name = args.name # Save if args.output: path = catalog.save(args.output) else: path = catalog.save() print() print(catalog.summary()) print() print(f"Saved to: {path}") def cmd_quick_scan(args: argparse.Namespace) -> None: """Quick sweep + peak detection, no blind scan.""" print(f"SkyWalker-1 Quick Scan") print(f" Range: {args.start}-{args.stop} MHz, step: {args.step} MHz") print() cb = progress_callback(args.verbose) with SkyWalker1(verbose=args.verbose) as dev: dev.ensure_booted() if args.pol or args.band: dev.configure_lnb(pol=args.pol, band=args.band) engine = SurveyEngine(dev, callback=cb) peaks = engine.run_quick_scan( start_mhz=args.start, stop_mhz=args.stop, step=args.step, ) if not args.verbose: sys.stderr.write("\r" + " " * 80 + "\r") sys.stderr.flush() if not peaks: print("No peaks detected above noise floor.") return print(f"\nDetected {len(peaks)} carrier(s):\n") print(f" {'#':>3} {'Freq (MHz)':>10} {'Power (dB)':>10} " f"{'BW (MHz)':>8} {'Prominence':>10} Quality") print(f" {'---':>3} {'----------':>10} {'----------':>10} " f"{'--------':>8} {'----------':>10} -------") for i, p in enumerate(sorted(peaks, key=lambda x: x["freq"]), 1): cls = p.get("classification", classify_carrier(p["width_mhz"], p["power"])) quality = cls.get("signal_quality", "?") print(f" {i:3d} {p['freq']:>10.1f} {p['power']:>+10.1f} " f"{p['width_mhz']:>8.1f} {p['prominence_db']:>+10.1f} {quality}") def cmd_diff(args: argparse.Namespace) -> None: """Compare two survey catalog files.""" try: old_cat = CarrierCatalog.load(args.file1) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"Cannot load {args.file1}: {e}", file=sys.stderr) sys.exit(1) try: new_cat = CarrierCatalog.load(args.file2) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"Cannot load {args.file2}: {e}", file=sys.stderr) sys.exit(1) print(f"Comparing surveys:") print(f" Old: {args.file1} ({old_cat.created})") print(f" New: {args.file2} ({new_cat.created})") print() diff = CatalogDiff.diff(old_cat, new_cat) print(CatalogDiff.format_diff(diff)) if args.output: with open(args.output, 'w') as f: json.dump(diff, f, indent=2) print(f"\nDiff saved to: {args.output}") def cmd_export(args: argparse.Namespace) -> None: """Export a survey catalog to CSV, JSON, or text.""" try: catalog = CarrierCatalog.load(args.file) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"Cannot load {args.file}: {e}", file=sys.stderr) sys.exit(1) fmt = args.format if fmt == "json": output = json.dumps(catalog.to_dict(), indent=2) elif fmt == "csv": output = _catalog_to_csv(catalog) else: output = catalog.summary() if args.output: with open(args.output, 'w') as f: f.write(output) print(f"Exported to: {args.output}") else: print(output) def cmd_view(args: argparse.Namespace) -> None: """View a specific survey or the latest one.""" if args.file: filename = args.file else: surveys = CarrierCatalog.list_surveys() if not surveys: print(f"No surveys found in {CATALOG_DIR}") sys.exit(1) filename = surveys[0]["path"] print(f"(Showing latest: {surveys[0]['filename']})\n") try: catalog = CarrierCatalog.load(filename) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"Cannot load {filename}: {e}", file=sys.stderr) sys.exit(1) print(catalog.summary()) if args.verbose and catalog.carriers: print(f"\nDetailed carrier info:") for i, c in enumerate(sorted(catalog.carriers, key=lambda x: x.freq_khz), 1): print(f"\n --- Carrier {i} ---") print(f" Frequency: {c.freq_mhz:.3f} MHz ({c.freq_khz} kHz)") print(f" Power: {c.power_db:+.1f} dB") print(f" SNR: {c.snr_db:.1f} dB") if c.sr_sps: print(f" Symbol rate: {c.sr_sps} sps ({c.sr_sps / 1e6:.3f} Msps)") if c.modulation: print(f" Modulation: {c.modulation}") if c.fec: print(f" FEC: {c.fec}") print(f" Locked: {c.locked}") print(f" Bandwidth: {c.bw_mhz:.1f} MHz") if c.services: print(f" Services: {', '.join(c.services)}") print(f" First seen: {c.first_seen}") print(f" Last seen: {c.last_seen}") print(f" Scan count: {c.scan_count}") if c.classification: cls = c.classification if "estimated_sr_range" in cls: sr_lo, sr_hi = cls["estimated_sr_range"] print(f" Est. SR: {sr_lo / 1e6:.1f} - {sr_hi / 1e6:.1f} Msps") if "likely_modulation" in cls: print(f" Likely mod: {', '.join(cls['likely_modulation'])}") if "signal_quality" in cls: print(f" Quality: {cls['signal_quality']}") def cmd_qo100(args: argparse.Namespace) -> None: """ QO-100 narrowband transponder survey with optimized parameters. QO-100 (Es'hail-2) narrowband transponder: 10489.500 - 10489.800 MHz With a typical LNB LO of 9750 MHz, the IF range is ~739.5 - 739.8 MHz. Since most QO-100 NB signals are very narrow (< 3 kHz audio, 1-2.7 ksps digital), this mode uses the finest practical sweep resolution and restricted SR range. """ lnb_lo = args.lnb_lo # QO-100 NB transponder: 10489.500 - 10489.800 MHz rf_start = 10489.5 rf_stop = 10489.8 if_start = rf_start - lnb_lo if_stop = rf_stop - lnb_lo # Validate IF range is within device capability if if_start < 950 or if_stop > 2150: print(f"QO-100 IF range ({if_start:.1f} - {if_stop:.1f} MHz) is outside " f"the 950-2150 MHz hardware range with LNB LO={lnb_lo} MHz.", file=sys.stderr) print(f"Check your LNB LO frequency.", file=sys.stderr) sys.exit(1) print(f"QO-100 Narrowband Transponder Survey") print(f" LNB LO: {lnb_lo} MHz") print(f" RF range: {rf_start:.3f} - {rf_stop:.3f} MHz") print(f" IF range: {if_start:.3f} - {if_stop:.3f} MHz") print() # QO-100 NB uses very low symbol rates (1-33 ksps typical for DVB-S) # The SkyWalker-1 minimum is 256 ksps, so we set a narrow range sr_min = 256_000 sr_max = 2_000_000 sr_step = 100_000 cb = progress_callback(args.verbose) with SkyWalker1(verbose=args.verbose) as dev: dev.ensure_booted() # QO-100 is H-pol on most setups, high band for 10 GHz dev.configure_lnb(pol="H", band="high", lnb_lo=lnb_lo) engine = SurveyEngine(dev, callback=cb) catalog = engine.run_full_scan( start_mhz=if_start, stop_mhz=if_stop, coarse_step=0.5, # 500 kHz steps for the narrow band fine_step=0.1, # 100 kHz fine resolution sr_min=sr_min, sr_max=sr_max, sr_step=sr_step, ) if not args.verbose: sys.stderr.write("\r" + " " * 80 + "\r") sys.stderr.flush() catalog.name = "QO-100 Narrowband" catalog.band = "high" catalog.pol = "H" catalog.lnb_lo_mhz = lnb_lo catalog.notes = (f"QO-100 Es'hail-2 narrowband transponder. " f"RF {rf_start}-{rf_stop} MHz, LNB LO {lnb_lo} MHz.") if args.output: path = catalog.save(args.output) else: path = catalog.save(f"survey-qo100-nb-{time.strftime('%Y-%m-%d')}.json") print() print(catalog.summary()) print() print(f"Saved to: {path}") # -- Helpers -- def _catalog_to_csv(catalog: CarrierCatalog) -> str: """Convert a catalog to CSV format.""" buf = io.StringIO() writer = csv.writer(buf) writer.writerow([ "freq_khz", "freq_mhz", "sr_sps", "modulation", "fec", "power_db", "snr_db", "locked", "bw_mhz", "services", "first_seen", "last_seen", "scan_count", ]) for c in sorted(catalog.carriers, key=lambda x: x.freq_khz): writer.writerow([ c.freq_khz, f"{c.freq_mhz:.3f}", c.sr_sps, c.modulation, c.fec, f"{c.power_db:.1f}", f"{c.snr_db:.1f}", c.locked, f"{c.bw_mhz:.1f}", "|".join(c.services), c.first_seen, c.last_seen, c.scan_count, ]) return buf.getvalue() # -- CLI -- def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Carrier survey tool for the Genpix SkyWalker-1", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""\ examples: %(prog)s full-scan %(prog)s full-scan --start 1100 --stop 1200 --output my-scan.json %(prog)s quick-scan %(prog)s diff survey-2026-02-14-low-V.json survey-2026-02-15-low-V.json %(prog)s export survey-2026-02-15-low-V.json --format csv %(prog)s view %(prog)s qo100 --lnb-lo 9750 """) parser.add_argument('-v', '--verbose', action='store_true', help="Verbose progress and debug output") sub = parser.add_subparsers(dest='command') # full-scan p_full = sub.add_parser('full-scan', help="Full six-stage carrier survey") p_full.add_argument('--start', type=float, default=950, help="Start frequency in MHz (default: 950)") p_full.add_argument('--stop', type=float, default=2150, help="Stop frequency in MHz (default: 2150)") p_full.add_argument('--coarse-step', type=float, default=5.0, help="Coarse sweep step in MHz (default: 5.0)") p_full.add_argument('--fine-step', type=float, default=1.0, help="Fine sweep step in MHz (default: 1.0)") p_full.add_argument('--sr-min', type=int, default=1_000_000, help="Min symbol rate for blind scan in sps (default: 1000000)") p_full.add_argument('--sr-max', type=int, default=30_000_000, help="Max symbol rate for blind scan in sps (default: 30000000)") p_full.add_argument('--sr-step', type=int, default=1_000_000, help="Symbol rate step for blind scan in sps (default: 1000000)") p_full.add_argument('--pol', choices=['H', 'V', 'L', 'R'], help="LNB polarization") p_full.add_argument('--band', choices=['low', 'high'], help="LNB band (low/high)") p_full.add_argument('--name', type=str, default="", help="Survey name/label") p_full.add_argument('--output', '-o', type=str, default=None, help="Output filename (default: auto-generated)") # quick-scan p_quick = sub.add_parser('quick-scan', help="Quick sweep + peak detection") p_quick.add_argument('--start', type=float, default=950, help="Start frequency in MHz (default: 950)") p_quick.add_argument('--stop', type=float, default=2150, help="Stop frequency in MHz (default: 2150)") p_quick.add_argument('--step', type=float, default=5.0, help="Sweep step in MHz (default: 5.0)") p_quick.add_argument('--pol', choices=['H', 'V', 'L', 'R'], help="LNB polarization") p_quick.add_argument('--band', choices=['low', 'high'], help="LNB band (low/high)") # diff p_diff = sub.add_parser('diff', help="Compare two survey catalogs") p_diff.add_argument('file1', help="Older survey file") p_diff.add_argument('file2', help="Newer survey file") p_diff.add_argument('--output', '-o', type=str, default=None, help="Save diff as JSON to this file") # export p_export = sub.add_parser('export', help="Export survey to CSV/JSON/text") p_export.add_argument('file', help="Survey file to export") p_export.add_argument('--format', '-f', choices=['csv', 'json', 'text'], default='text', help="Output format (default: text)") p_export.add_argument('--output', '-o', type=str, default=None, help="Output file (default: stdout)") # view p_view = sub.add_parser('view', help="View a survey (latest if no file given)") p_view.add_argument('file', nargs='?', default=None, help="Survey file to view (default: latest)") # qo100 p_qo100 = sub.add_parser('qo100', help="QO-100 narrowband transponder survey") p_qo100.add_argument('--lnb-lo', type=float, required=True, help="LNB local oscillator frequency in MHz " "(e.g., 9750 for universal LNB low band)") p_qo100.add_argument('--output', '-o', type=str, default=None, help="Output filename (default: auto-generated)") return parser def main(): parser = build_parser() args = parser.parse_args() if not args.command: parser.print_help() sys.exit(1) dispatch = { 'full-scan': cmd_full_scan, 'quick-scan': cmd_quick_scan, 'diff': cmd_diff, 'export': cmd_export, 'view': cmd_view, 'qo100': cmd_qo100, } handler = dispatch.get(args.command) if handler is None: parser.print_help() sys.exit(1) handler(args) if __name__ == '__main__': main()