skywalker-1/tools/survey.py
Ryan Malloy bbdcb243dc Normalize line endings to LF across entire repository
Apply .gitattributes normalization to convert all CRLF line
endings inherited from Windows-origin source files to Unix LF.
175 files, zero content changes.
2026-02-20 10:55:50 -07:00

456 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Carrier survey CLI for the Genpix SkyWalker-1.
Subcommands:
full-scan Full six-stage carrier survey
quick-scan Fast sweep + peak detection only
diff Compare two saved survey catalogs
export Export a survey to CSV, JSON, or text
view View the latest or a specified survey
qo100 QO-100 narrowband transponder survey with optimized params
"""
import sys
import os
import argparse
import csv
import io
import json
import time
# Ensure the tools directory is on the import path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from skywalker_lib import SkyWalker1
from signal_analysis import adaptive_noise_floor, detect_peaks_enhanced, classify_carrier
from carrier_catalog import CarrierCatalog, CarrierEntry, CatalogDiff, CATALOG_DIR
from survey_engine import SurveyEngine
def progress_callback(verbose: bool):
"""Return a callback function for SurveyEngine progress reporting."""
def cb(stage, pct, msg):
if verbose:
print(f" [{stage:>17s}] {pct:5.1f}% {msg}", file=sys.stderr)
else:
sys.stderr.write(f"\r {stage}: {pct:.0f}% {msg[:60]:<60s}")
sys.stderr.flush()
return cb
# -- Subcommand handlers --
def cmd_full_scan(args: argparse.Namespace) -> None:
"""Run a full six-stage carrier survey."""
print(f"SkyWalker-1 Full Carrier Survey")
print(f" Range: {args.start}-{args.stop} MHz")
print(f" Coarse step: {args.coarse_step} MHz, Fine step: {args.fine_step} MHz")
print(f" SR range: {args.sr_min / 1e6:.1f} - {args.sr_max / 1e6:.1f} Msps")
print()
cb = progress_callback(args.verbose)
with SkyWalker1(verbose=args.verbose) as dev:
dev.ensure_booted()
if args.pol or args.band:
dev.configure_lnb(pol=args.pol, band=args.band)
engine = SurveyEngine(dev, callback=cb)
catalog = engine.run_full_scan(
start_mhz=args.start,
stop_mhz=args.stop,
coarse_step=args.coarse_step,
fine_step=args.fine_step,
sr_min=args.sr_min,
sr_max=args.sr_max,
sr_step=args.sr_step,
)
if not args.verbose:
sys.stderr.write("\r" + " " * 80 + "\r")
sys.stderr.flush()
# Set catalog metadata
catalog.band = args.band or ""
catalog.pol = args.pol or ""
if args.name:
catalog.name = args.name
# Save
if args.output:
path = catalog.save(args.output)
else:
path = catalog.save()
print()
print(catalog.summary())
print()
print(f"Saved to: {path}")
def cmd_quick_scan(args: argparse.Namespace) -> None:
"""Quick sweep + peak detection, no blind scan."""
print(f"SkyWalker-1 Quick Scan")
print(f" Range: {args.start}-{args.stop} MHz, step: {args.step} MHz")
print()
cb = progress_callback(args.verbose)
with SkyWalker1(verbose=args.verbose) as dev:
dev.ensure_booted()
if args.pol or args.band:
dev.configure_lnb(pol=args.pol, band=args.band)
engine = SurveyEngine(dev, callback=cb)
peaks = engine.run_quick_scan(
start_mhz=args.start,
stop_mhz=args.stop,
step=args.step,
)
if not args.verbose:
sys.stderr.write("\r" + " " * 80 + "\r")
sys.stderr.flush()
if not peaks:
print("No peaks detected above noise floor.")
return
print(f"\nDetected {len(peaks)} carrier(s):\n")
print(f" {'#':>3} {'Freq (MHz)':>10} {'Power (dB)':>10} "
f"{'BW (MHz)':>8} {'Prominence':>10} Quality")
print(f" {'---':>3} {'----------':>10} {'----------':>10} "
f"{'--------':>8} {'----------':>10} -------")
for i, p in enumerate(sorted(peaks, key=lambda x: x["freq"]), 1):
cls = p.get("classification", classify_carrier(p["width_mhz"], p["power"]))
quality = cls.get("signal_quality", "?")
print(f" {i:3d} {p['freq']:>10.1f} {p['power']:>+10.1f} "
f"{p['width_mhz']:>8.1f} {p['prominence_db']:>+10.1f} {quality}")
def cmd_diff(args: argparse.Namespace) -> None:
"""Compare two survey catalog files."""
try:
old_cat = CarrierCatalog.load(args.file1)
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"Cannot load {args.file1}: {e}", file=sys.stderr)
sys.exit(1)
try:
new_cat = CarrierCatalog.load(args.file2)
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"Cannot load {args.file2}: {e}", file=sys.stderr)
sys.exit(1)
print(f"Comparing surveys:")
print(f" Old: {args.file1} ({old_cat.created})")
print(f" New: {args.file2} ({new_cat.created})")
print()
diff = CatalogDiff.diff(old_cat, new_cat)
print(CatalogDiff.format_diff(diff))
if args.output:
with open(args.output, 'w') as f:
json.dump(diff, f, indent=2)
print(f"\nDiff saved to: {args.output}")
def cmd_export(args: argparse.Namespace) -> None:
"""Export a survey catalog to CSV, JSON, or text."""
try:
catalog = CarrierCatalog.load(args.file)
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"Cannot load {args.file}: {e}", file=sys.stderr)
sys.exit(1)
fmt = args.format
if fmt == "json":
output = json.dumps(catalog.to_dict(), indent=2)
elif fmt == "csv":
output = _catalog_to_csv(catalog)
else:
output = catalog.summary()
if args.output:
with open(args.output, 'w') as f:
f.write(output)
print(f"Exported to: {args.output}")
else:
print(output)
def cmd_view(args: argparse.Namespace) -> None:
"""View a specific survey or the latest one."""
if args.file:
filename = args.file
else:
surveys = CarrierCatalog.list_surveys()
if not surveys:
print(f"No surveys found in {CATALOG_DIR}")
sys.exit(1)
filename = surveys[0]["path"]
print(f"(Showing latest: {surveys[0]['filename']})\n")
try:
catalog = CarrierCatalog.load(filename)
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"Cannot load {filename}: {e}", file=sys.stderr)
sys.exit(1)
print(catalog.summary())
if args.verbose and catalog.carriers:
print(f"\nDetailed carrier info:")
for i, c in enumerate(sorted(catalog.carriers, key=lambda x: x.freq_khz), 1):
print(f"\n --- Carrier {i} ---")
print(f" Frequency: {c.freq_mhz:.3f} MHz ({c.freq_khz} kHz)")
print(f" Power: {c.power_db:+.1f} dB")
print(f" SNR: {c.snr_db:.1f} dB")
if c.sr_sps:
print(f" Symbol rate: {c.sr_sps} sps ({c.sr_sps / 1e6:.3f} Msps)")
if c.modulation:
print(f" Modulation: {c.modulation}")
if c.fec:
print(f" FEC: {c.fec}")
print(f" Locked: {c.locked}")
print(f" Bandwidth: {c.bw_mhz:.1f} MHz")
if c.services:
print(f" Services: {', '.join(c.services)}")
print(f" First seen: {c.first_seen}")
print(f" Last seen: {c.last_seen}")
print(f" Scan count: {c.scan_count}")
if c.classification:
cls = c.classification
if "estimated_sr_range" in cls:
sr_lo, sr_hi = cls["estimated_sr_range"]
print(f" Est. SR: {sr_lo / 1e6:.1f} - {sr_hi / 1e6:.1f} Msps")
if "likely_modulation" in cls:
print(f" Likely mod: {', '.join(cls['likely_modulation'])}")
if "signal_quality" in cls:
print(f" Quality: {cls['signal_quality']}")
def cmd_qo100(args: argparse.Namespace) -> None:
"""
QO-100 narrowband transponder survey with optimized parameters.
QO-100 (Es'hail-2) narrowband transponder: 10489.500 - 10489.800 MHz
With a typical LNB LO of 9750 MHz, the IF range is ~739.5 - 739.8 MHz.
Since most QO-100 NB signals are very narrow (< 3 kHz audio, 1-2.7 ksps
digital), this mode uses the finest practical sweep resolution and
restricted SR range.
"""
lnb_lo = args.lnb_lo
# QO-100 NB transponder: 10489.500 - 10489.800 MHz
rf_start = 10489.5
rf_stop = 10489.8
if_start = rf_start - lnb_lo
if_stop = rf_stop - lnb_lo
# Validate IF range is within device capability
if if_start < 950 or if_stop > 2150:
print(f"QO-100 IF range ({if_start:.1f} - {if_stop:.1f} MHz) is outside "
f"the 950-2150 MHz hardware range with LNB LO={lnb_lo} MHz.",
file=sys.stderr)
print(f"Check your LNB LO frequency.", file=sys.stderr)
sys.exit(1)
print(f"QO-100 Narrowband Transponder Survey")
print(f" LNB LO: {lnb_lo} MHz")
print(f" RF range: {rf_start:.3f} - {rf_stop:.3f} MHz")
print(f" IF range: {if_start:.3f} - {if_stop:.3f} MHz")
print()
# QO-100 NB uses very low symbol rates (1-33 ksps typical for DVB-S)
# The SkyWalker-1 minimum is 256 ksps, so we set a narrow range
sr_min = 256_000
sr_max = 2_000_000
sr_step = 100_000
cb = progress_callback(args.verbose)
with SkyWalker1(verbose=args.verbose) as dev:
dev.ensure_booted()
# QO-100 is H-pol on most setups, high band for 10 GHz
dev.configure_lnb(pol="H", band="high", lnb_lo=lnb_lo)
engine = SurveyEngine(dev, callback=cb)
catalog = engine.run_full_scan(
start_mhz=if_start,
stop_mhz=if_stop,
coarse_step=0.5, # 500 kHz steps for the narrow band
fine_step=0.1, # 100 kHz fine resolution
sr_min=sr_min,
sr_max=sr_max,
sr_step=sr_step,
)
if not args.verbose:
sys.stderr.write("\r" + " " * 80 + "\r")
sys.stderr.flush()
catalog.name = "QO-100 Narrowband"
catalog.band = "high"
catalog.pol = "H"
catalog.lnb_lo_mhz = lnb_lo
catalog.notes = (f"QO-100 Es'hail-2 narrowband transponder. "
f"RF {rf_start}-{rf_stop} MHz, LNB LO {lnb_lo} MHz.")
if args.output:
path = catalog.save(args.output)
else:
path = catalog.save(f"survey-qo100-nb-{time.strftime('%Y-%m-%d')}.json")
print()
print(catalog.summary())
print()
print(f"Saved to: {path}")
# -- Helpers --
def _catalog_to_csv(catalog: CarrierCatalog) -> str:
"""Convert a catalog to CSV format."""
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow([
"freq_khz", "freq_mhz", "sr_sps", "modulation", "fec",
"power_db", "snr_db", "locked", "bw_mhz", "services",
"first_seen", "last_seen", "scan_count",
])
for c in sorted(catalog.carriers, key=lambda x: x.freq_khz):
writer.writerow([
c.freq_khz, f"{c.freq_mhz:.3f}", c.sr_sps,
c.modulation, c.fec,
f"{c.power_db:.1f}", f"{c.snr_db:.1f}",
c.locked, f"{c.bw_mhz:.1f}",
"|".join(c.services),
c.first_seen, c.last_seen, c.scan_count,
])
return buf.getvalue()
# -- CLI --
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Carrier survey tool for the Genpix SkyWalker-1",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
examples:
%(prog)s full-scan
%(prog)s full-scan --start 1100 --stop 1200 --output my-scan.json
%(prog)s quick-scan
%(prog)s diff survey-2026-02-14-low-V.json survey-2026-02-15-low-V.json
%(prog)s export survey-2026-02-15-low-V.json --format csv
%(prog)s view
%(prog)s qo100 --lnb-lo 9750
""")
parser.add_argument('-v', '--verbose', action='store_true',
help="Verbose progress and debug output")
sub = parser.add_subparsers(dest='command')
# full-scan
p_full = sub.add_parser('full-scan', help="Full six-stage carrier survey")
p_full.add_argument('--start', type=float, default=950,
help="Start frequency in MHz (default: 950)")
p_full.add_argument('--stop', type=float, default=2150,
help="Stop frequency in MHz (default: 2150)")
p_full.add_argument('--coarse-step', type=float, default=5.0,
help="Coarse sweep step in MHz (default: 5.0)")
p_full.add_argument('--fine-step', type=float, default=1.0,
help="Fine sweep step in MHz (default: 1.0)")
p_full.add_argument('--sr-min', type=int, default=1_000_000,
help="Min symbol rate for blind scan in sps (default: 1000000)")
p_full.add_argument('--sr-max', type=int, default=30_000_000,
help="Max symbol rate for blind scan in sps (default: 30000000)")
p_full.add_argument('--sr-step', type=int, default=1_000_000,
help="Symbol rate step for blind scan in sps (default: 1000000)")
p_full.add_argument('--pol', choices=['H', 'V', 'L', 'R'],
help="LNB polarization")
p_full.add_argument('--band', choices=['low', 'high'],
help="LNB band (low/high)")
p_full.add_argument('--name', type=str, default="",
help="Survey name/label")
p_full.add_argument('--output', '-o', type=str, default=None,
help="Output filename (default: auto-generated)")
# quick-scan
p_quick = sub.add_parser('quick-scan', help="Quick sweep + peak detection")
p_quick.add_argument('--start', type=float, default=950,
help="Start frequency in MHz (default: 950)")
p_quick.add_argument('--stop', type=float, default=2150,
help="Stop frequency in MHz (default: 2150)")
p_quick.add_argument('--step', type=float, default=5.0,
help="Sweep step in MHz (default: 5.0)")
p_quick.add_argument('--pol', choices=['H', 'V', 'L', 'R'],
help="LNB polarization")
p_quick.add_argument('--band', choices=['low', 'high'],
help="LNB band (low/high)")
# diff
p_diff = sub.add_parser('diff', help="Compare two survey catalogs")
p_diff.add_argument('file1', help="Older survey file")
p_diff.add_argument('file2', help="Newer survey file")
p_diff.add_argument('--output', '-o', type=str, default=None,
help="Save diff as JSON to this file")
# export
p_export = sub.add_parser('export', help="Export survey to CSV/JSON/text")
p_export.add_argument('file', help="Survey file to export")
p_export.add_argument('--format', '-f', choices=['csv', 'json', 'text'],
default='text', help="Output format (default: text)")
p_export.add_argument('--output', '-o', type=str, default=None,
help="Output file (default: stdout)")
# view
p_view = sub.add_parser('view', help="View a survey (latest if no file given)")
p_view.add_argument('file', nargs='?', default=None,
help="Survey file to view (default: latest)")
# qo100
p_qo100 = sub.add_parser('qo100',
help="QO-100 narrowband transponder survey")
p_qo100.add_argument('--lnb-lo', type=float, required=True,
help="LNB local oscillator frequency in MHz "
"(e.g., 9750 for universal LNB low band)")
p_qo100.add_argument('--output', '-o', type=str, default=None,
help="Output filename (default: auto-generated)")
return parser
def main():
parser = build_parser()
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
dispatch = {
'full-scan': cmd_full_scan,
'quick-scan': cmd_quick_scan,
'diff': cmd_diff,
'export': cmd_export,
'view': cmd_view,
'qo100': cmd_qo100,
}
handler = dispatch.get(args.command)
if handler is None:
parser.print_help()
sys.exit(1)
handler(args)
if __name__ == '__main__':
main()