#!/usr/bin/env python3 """ Apollo FM Downlink Loopback Demo -- SCO round-trip verification. Demonstrates the FM downlink block chain using GNU Radio streaming blocks: TX: dc_sources -> sco_mods -> add -> fm_mod RX: fm_demod -> sco_demods -> recovered voltages All wrapped in the convenience blocks: fm_signal_source -> fm_downlink_receiver Generates FM signal with SCO channels at known DC voltages, demodulates, and compares recovered vs input voltages. Usage: uv run python examples/fm_loopback_demo.py uv run python examples/fm_loopback_demo.py --channels 1 5 9 uv run python examples/fm_loopback_demo.py --snr 30 uv run python examples/fm_loopback_demo.py --samples 1024000 """ import argparse import sys import numpy as np from gnuradio import blocks, gr from apollo.constants import SAMPLE_RATE_BASEBAND, SCO_FREQUENCIES from apollo.fm_downlink_receiver import fm_downlink_receiver from apollo.fm_signal_source import fm_signal_source # Default test voltages: spread across the 0-5V range DEFAULT_VOLTAGES = {1: 1.0, 5: 2.5, 9: 4.0} def main(): parser = argparse.ArgumentParser(description="Apollo FM downlink loopback demo") parser.add_argument( "--channels", type=int, nargs="+", default=[1, 5, 9], help="SCO channel numbers to test (default: 1 5 9)", ) parser.add_argument( "--snr", type=float, default=None, help="SNR in dB (default: no noise)", ) parser.add_argument( "--samples", type=int, default=10 * 102400, help="Number of samples to process (default: 1024000)", ) args = parser.parse_args() channels = args.channels n_samples = args.samples # Assign test voltages: spread evenly across 0-5V range if set(channels) == {1, 5, 9}: test_voltages = dict(DEFAULT_VOLTAGES) else: step = 4.0 / max(1, len(channels) - 1) if len(channels) > 1 else 0 test_voltages = {ch: 0.5 + i * step for i, ch in enumerate(channels)} print("=" * 60) print("Apollo FM Downlink Loopback Demo") print("=" * 60) print(f" Channels: {channels}") print(f" Input voltages:") for ch in channels: v = test_voltages[ch] freq = SCO_FREQUENCIES[ch] print(f" SCO {ch} ({freq:,} Hz): {v:.2f} V") print(f" Samples: {n_samples:,}") print(f" Duration: {n_samples / SAMPLE_RATE_BASEBAND:.3f} s") print(f" SNR: {'clean (no noise)' if args.snr is None else f'{args.snr} dB'}") print() # Build the flowgraph print("Building flowgraph...") tb = gr.top_block() tx = fm_signal_source( channels=channels, test_voltages=test_voltages, snr_db=args.snr, ) head = blocks.head(gr.sizeof_gr_complex, n_samples) rx = fm_downlink_receiver(channels=channels) tb.connect(tx, head, rx) # One vector sink per output channel sinks = [] for idx in range(len(channels)): snk = blocks.vector_sink_f() tb.connect((rx, idx), snk) sinks.append(snk) print("Running flowgraph (TX -> RX)...") print() tb.run() # Analyze results print("-" * 60) print(f" {'Channel':>10} {'Freq':>10} {'Input':>8} {'Recovered':>10} {'Error':>8}") print("-" * 60) max_error = 0.0 for idx, ch in enumerate(channels): data = np.array(sinks[idx].data()) if len(data) == 0: print(f" SCO {ch:>2} {SCO_FREQUENCIES[ch]:>8,} Hz {test_voltages[ch]:>6.2f} V {'NO DATA':>10} {'N/A':>8}") continue # Skip first 20% for filter settling settle = len(data) // 5 settled = data[settle:] if len(settled) == 0: mean_v = np.mean(data) else: mean_v = np.mean(settled) error = abs(test_voltages[ch] - mean_v) max_error = max(max_error, error) print( f" SCO {ch:>2} {SCO_FREQUENCIES[ch]:>8,} Hz " f"{test_voltages[ch]:>6.2f} V {mean_v:>8.3f} V {error:>6.3f} V" ) print("-" * 60) print() if max_error > 0.5: print(f"Max error: {max_error:.3f} V -- EXCESSIVE (> 0.5V)") print("PLL may need more settling time. Try increasing --samples.") sys.exit(1) elif max_error > 0.1: print(f"Max error: {max_error:.3f} V -- MODERATE") print("Consider increasing --samples or --snr for better accuracy.") else: print(f"Max error: {max_error:.3f} V -- GOOD") print("FM loopback complete.") if __name__ == "__main__": main()