skywalker-1/tools/beacon_logger.py
Ryan Malloy bbdcb243dc Normalize line endings to LF across entire repository
Apply .gitattributes normalization to convert all CRLF line
endings inherited from Windows-origin source files to Unix LF.
175 files, zero content changes.
2026-02-20 10:55:50 -07:00

377 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Long-term satellite beacon logger for the Genpix SkyWalker-1.
Locks onto a stable Ku-band transponder and logs SNR/AGC at configurable
intervals for hours, days, or weeks. Produces propagation datasets useful
for rain fade analysis, diurnal thermal drift measurement, antenna mount
stability assessment, and ITU propagation model validation.
Usage:
python beacon_logger.py --freq 12015 --sr 20000 # log to stdout
python beacon_logger.py --freq 12015 --sr 20000 -o log.csv # log to CSV
python beacon_logger.py --freq 12015 --sr 20000 --daemon # background mode
python beacon_logger.py --generate-systemd # print unit file
The tool automatically re-locks on signal loss and logs statistics per
reporting interval (min/max/mean/stddev of SNR over each window).
"""
import sys
import os
import argparse
import time
import csv
import math
import json
import signal
from datetime import datetime, timezone
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from skywalker_lib import SkyWalker1, MODULATIONS, MOD_FEC_GROUP, FEC_RATES
def compute_stats(values: list[float]) -> dict:
"""Compute min/max/mean/stddev for a list of measurements."""
if not values:
return {"min": 0, "max": 0, "mean": 0, "stddev": 0, "count": 0}
n = len(values)
mean = sum(values) / n
variance = sum((v - mean) ** 2 for v in values) / n if n > 1 else 0
return {
"min": round(min(values), 3),
"max": round(max(values), 3),
"mean": round(mean, 3),
"stddev": round(math.sqrt(variance), 3),
"count": n,
}
class BeaconLogger:
"""Persistent signal logger with auto-relock and statistics."""
def __init__(self, sw: SkyWalker1, freq_khz: int, sr_sps: int,
mod_index: int = 0, fec_index: int = 5,
sample_interval: float = 1.0, report_interval: float = 60.0):
self.sw = sw
self.freq_khz = freq_khz
self.sr_sps = sr_sps
self.mod_index = mod_index
self.fec_index = fec_index
self.sample_interval = sample_interval
self.report_interval = report_interval
self._running = False
self._relock_count = 0
self._total_samples = 0
def tune_and_lock(self) -> bool:
"""Tune to the beacon frequency and check for lock."""
self.sw.tune(self.sr_sps, self.freq_khz, self.mod_index, self.fec_index)
time.sleep(0.5)
sig = self.sw.signal_monitor()
return sig.get("locked", False)
def run(self, duration_secs: float, csv_path: str | None = None,
json_path: str | None = None, quiet: bool = False) -> None:
"""Main logging loop.
Samples signal at sample_interval, computes statistics over
report_interval, outputs to CSV/JSON/stdout.
"""
self._running = True
# Register signal handlers for clean shutdown
def _stop(signum, frame):
self._running = False
signal.signal(signal.SIGTERM, _stop)
signal.signal(signal.SIGINT, _stop)
# Initial tune
locked = self.tune_and_lock()
if not locked:
print(f"Warning: no lock at {self.freq_khz} kHz, will keep trying",
file=sys.stderr)
# Open CSV
csv_file = None
csv_writer = None
if csv_path:
csv_file = open(csv_path, 'w', newline='')
csv_writer = csv.writer(csv_file)
csv_writer.writerow([
"timestamp", "elapsed_s", "snr_db", "agc1", "agc2",
"power_db", "locked", "relock_count",
])
# Open JSON log (append mode, one JSON object per report line)
json_file = None
if json_path:
json_file = open(json_path, 'a')
start_time = time.time()
last_report = start_time
window_snr = []
window_power = []
window_agc1 = []
lock_count_window = 0
sample_count_window = 0
try:
while self._running and (time.time() - start_time) < duration_secs:
now = time.time()
elapsed = now - start_time
# Sample
try:
sig = self.sw.signal_monitor()
except Exception as e:
if not quiet:
print(f" USB error: {e}", file=sys.stderr)
time.sleep(self.sample_interval)
continue
self._total_samples += 1
sample_count_window += 1
snr_db = sig["snr_db"]
agc1 = sig["agc1"]
agc2 = sig["agc2"]
power_db = sig["power_db"]
locked = sig["locked"]
if locked:
lock_count_window += 1
window_snr.append(snr_db)
window_power.append(power_db)
window_agc1.append(agc1)
# Write raw sample to CSV
if csv_writer:
csv_writer.writerow([
datetime.now(timezone.utc).isoformat(),
f"{elapsed:.1f}",
f"{snr_db:.3f}",
agc1, agc2,
f"{power_db:.3f}",
int(locked),
self._relock_count,
])
csv_file.flush()
# Auto-relock
if not locked:
if not quiet:
print(f" [{elapsed:.0f}s] Signal lost, attempting relock...",
file=sys.stderr)
if self.tune_and_lock():
self._relock_count += 1
if not quiet:
print(f" [{elapsed:.0f}s] Relocked (count: {self._relock_count})",
file=sys.stderr)
# Periodic report
if now - last_report >= self.report_interval:
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"elapsed_s": round(elapsed, 1),
"samples": sample_count_window,
"lock_pct": round(100 * lock_count_window / max(sample_count_window, 1), 1),
"snr": compute_stats(window_snr),
"power": compute_stats(window_power),
"agc1": compute_stats(window_agc1),
"relock_count": self._relock_count,
}
if not quiet:
snr_s = report["snr"]
print(f" [{elapsed:7.0f}s] SNR {snr_s['mean']:5.1f} dB "
f"(min {snr_s['min']:.1f}, max {snr_s['max']:.1f}, "
f"std {snr_s['stddev']:.2f}) "
f"lock {report['lock_pct']:.0f}% "
f"relocks {self._relock_count}")
if json_file:
json_file.write(json.dumps(report) + "\n")
json_file.flush()
# Reset window
window_snr.clear()
window_power.clear()
window_agc1.clear()
lock_count_window = 0
sample_count_window = 0
last_report = now
time.sleep(self.sample_interval)
finally:
if csv_file:
csv_file.close()
if json_file:
json_file.close()
total_elapsed = time.time() - start_time
if not quiet:
print(f"\n Session complete: {self._total_samples} samples in "
f"{total_elapsed:.0f}s, {self._relock_count} relocks")
def generate_systemd_unit(args) -> str:
"""Generate a systemd unit file for daemon operation."""
cmd_parts = ["python3", os.path.abspath(__file__)]
cmd_parts.extend(["--freq", str(args.freq)])
cmd_parts.extend(["--sr", str(args.sr)])
if args.output:
cmd_parts.extend(["--output", os.path.abspath(args.output)])
if args.json_output:
cmd_parts.extend(["--json-output", os.path.abspath(args.json_output)])
cmd_parts.extend(["--duration", str(args.duration)])
cmd_parts.extend(["--sample-interval", str(args.sample_interval)])
cmd_parts.extend(["--report-interval", str(args.report_interval)])
cmd_parts.append("--quiet")
return f"""[Unit]
Description=SkyWalker-1 Beacon Logger ({args.freq} kHz)
After=network.target
[Service]
Type=simple
ExecStart={' '.join(cmd_parts)}
Restart=on-failure
RestartSec=30
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
"""
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="beacon_logger.py",
description="Long-term satellite beacon logger for SkyWalker-1",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
examples:
%(prog)s --freq 12015 --sr 20000 # Ku-band beacon, stdout
%(prog)s --freq 12015 --sr 20000 -o beacon.csv # log to CSV
%(prog)s --freq 12015 --sr 20000 --json-output beacon.jsonl # per-minute JSON
%(prog)s --freq 12015 --sr 20000 --duration 86400 # 24-hour log
%(prog)s --freq 12015 --sr 20000 --daemon # background
%(prog)s --generate-systemd --freq 12015 --sr 20000 # print unit file
The --freq is in kHz (IF frequency), not MHz. For Ku-band with a universal
LNB at LO 10750 MHz, a transponder at 12015 MHz has IF = 12015 - 10750 = 1265 MHz,
so you'd use --freq 1265000.
For IF frequencies, multiply MHz by 1000 (e.g., 1265 MHz = 1265000 kHz).
""",
)
parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument('--freq', type=int, required=True,
help="IF frequency in kHz (e.g., 1265000 for 1265 MHz)")
parser.add_argument('--sr', type=int, default=20000000,
help="Symbol rate in sps (default: 20000000)")
parser.add_argument('--mod', type=str, default="qpsk",
help="Modulation type (default: qpsk)")
parser.add_argument('--fec', type=str, default="auto",
help="FEC rate (default: auto)")
parser.add_argument('--output', '-o', type=str, default=None,
help="CSV output file (raw samples)")
parser.add_argument('--json-output', type=str, default=None,
help="JSONL output file (per-interval statistics)")
parser.add_argument('--duration', type=float, default=3600,
help="Logging duration in seconds (default: 3600)")
parser.add_argument('--sample-interval', type=float, default=1.0,
help="Seconds between samples (default: 1.0)")
parser.add_argument('--report-interval', type=float, default=60.0,
help="Seconds between summary reports (default: 60)")
parser.add_argument('--pol', type=str, default=None, choices=['H', 'V'],
help="LNB polarization (H=18V, V=13V)")
parser.add_argument('--band', type=str, default=None, choices=['low', 'high'],
help="LNB band (low=no tone, high=22kHz)")
parser.add_argument('--daemon', action='store_true',
help="Run as daemon (suppress stdout)")
parser.add_argument('--quiet', action='store_true',
help="Suppress progress output to stderr")
parser.add_argument('--generate-systemd', action='store_true',
help="Print a systemd unit file and exit")
return parser
def main():
parser = build_parser()
args = parser.parse_args()
if args.generate_systemd:
print(generate_systemd_unit(args))
return
# Resolve modulation/FEC indices
mod_entry = MODULATIONS.get(args.mod)
if mod_entry is None:
print(f"Unknown modulation '{args.mod}'. Valid: {list(MODULATIONS.keys())}",
file=sys.stderr)
sys.exit(1)
mod_idx = mod_entry[0]
fec_group = MOD_FEC_GROUP.get(args.mod, "dvbs")
fec_table = FEC_RATES.get(fec_group, {})
fec_idx = fec_table.get(args.fec, fec_table.get("auto", 0))
quiet = args.daemon or args.quiet
with SkyWalker1(verbose=args.verbose) as sw:
sw.ensure_booted()
# Configure LNB
if args.pol:
sw.set_lnb_voltage(args.pol.upper() in ("H", "L"))
if args.band:
sw.set_22khz_tone(args.band == "high")
freq_mhz = args.freq / 1000.0
sr_msps = args.sr / 1e6
if not quiet:
print(f"Beacon Logger")
print(f" Frequency: {freq_mhz:.3f} MHz IF ({args.freq} kHz)")
print(f" Symbol rate: {sr_msps:.3f} Msps")
print(f" Modulation: {args.mod}, FEC: {args.fec}")
print(f" Sample interval: {args.sample_interval}s")
print(f" Report interval: {args.report_interval}s")
print(f" Duration: {args.duration}s ({args.duration/3600:.1f}h)")
if args.output:
print(f" CSV output: {args.output}")
if args.json_output:
print(f" JSON output: {args.json_output}")
print()
logger = BeaconLogger(
sw, args.freq, args.sr,
mod_index=mod_idx, fec_index=fec_idx,
sample_interval=args.sample_interval,
report_interval=args.report_interval,
)
logger.run(
duration_secs=args.duration,
csv_path=args.output,
json_path=args.json_output,
quiet=quiet,
)
if __name__ == "__main__":
main()