gr-apollo/examples/fm_loopback_demo.py
Ryan Malloy 7d48398551 Add FM downlink mode: carrier blocks, convenience wrappers, and loopback demo
FM mode now has the same three-layer architecture as PM mode:
- fm_mod/fm_demod for carrier-level FM modulation
- fm_signal_source/fm_downlink_receiver convenience wrappers
- fm_loopback_demo.py verifying round-trip SCO voltage recovery

Includes GRC YAML for all 4 blocks and doc updates across
blocks reference, SCO guide, and signal architecture pages.
2026-02-24 10:18:42 -07:00

150 lines
4.5 KiB
Python

#!/usr/bin/env python3
"""
Apollo FM Downlink Loopback Demo -- SCO round-trip verification.
Demonstrates the FM downlink block chain using GNU Radio streaming blocks:
TX: dc_sources -> sco_mods -> add -> fm_mod
RX: fm_demod -> sco_demods -> recovered voltages
All wrapped in the convenience blocks:
fm_signal_source -> fm_downlink_receiver
Generates FM signal with SCO channels at known DC voltages, demodulates,
and compares recovered vs input voltages.
Usage:
uv run python examples/fm_loopback_demo.py
uv run python examples/fm_loopback_demo.py --channels 1 5 9
uv run python examples/fm_loopback_demo.py --snr 30
uv run python examples/fm_loopback_demo.py --samples 1024000
"""
import argparse
import sys
import numpy as np
from gnuradio import blocks, gr
from apollo.constants import SAMPLE_RATE_BASEBAND, SCO_FREQUENCIES
from apollo.fm_downlink_receiver import fm_downlink_receiver
from apollo.fm_signal_source import fm_signal_source
# Default test voltages: spread across the 0-5V range
DEFAULT_VOLTAGES = {1: 1.0, 5: 2.5, 9: 4.0}
def main():
parser = argparse.ArgumentParser(description="Apollo FM downlink loopback demo")
parser.add_argument(
"--channels", type=int, nargs="+", default=[1, 5, 9],
help="SCO channel numbers to test (default: 1 5 9)",
)
parser.add_argument(
"--snr", type=float, default=None,
help="SNR in dB (default: no noise)",
)
parser.add_argument(
"--samples", type=int, default=10 * 102400,
help="Number of samples to process (default: 1024000)",
)
args = parser.parse_args()
channels = args.channels
n_samples = args.samples
# Assign test voltages: spread evenly across 0-5V range
if set(channels) == {1, 5, 9}:
test_voltages = dict(DEFAULT_VOLTAGES)
else:
step = 4.0 / max(1, len(channels) - 1) if len(channels) > 1 else 0
test_voltages = {ch: 0.5 + i * step for i, ch in enumerate(channels)}
print("=" * 60)
print("Apollo FM Downlink Loopback Demo")
print("=" * 60)
print(f" Channels: {channels}")
print(f" Input voltages:")
for ch in channels:
v = test_voltages[ch]
freq = SCO_FREQUENCIES[ch]
print(f" SCO {ch} ({freq:,} Hz): {v:.2f} V")
print(f" Samples: {n_samples:,}")
print(f" Duration: {n_samples / SAMPLE_RATE_BASEBAND:.3f} s")
print(f" SNR: {'clean (no noise)' if args.snr is None else f'{args.snr} dB'}")
print()
# Build the flowgraph
print("Building flowgraph...")
tb = gr.top_block()
tx = fm_signal_source(
channels=channels,
test_voltages=test_voltages,
snr_db=args.snr,
)
head = blocks.head(gr.sizeof_gr_complex, n_samples)
rx = fm_downlink_receiver(channels=channels)
tb.connect(tx, head, rx)
# One vector sink per output channel
sinks = []
for idx in range(len(channels)):
snk = blocks.vector_sink_f()
tb.connect((rx, idx), snk)
sinks.append(snk)
print("Running flowgraph (TX -> RX)...")
print()
tb.run()
# Analyze results
print("-" * 60)
print(f" {'Channel':>10} {'Freq':>10} {'Input':>8} {'Recovered':>10} {'Error':>8}")
print("-" * 60)
max_error = 0.0
for idx, ch in enumerate(channels):
data = np.array(sinks[idx].data())
if len(data) == 0:
print(f" SCO {ch:>2} {SCO_FREQUENCIES[ch]:>8,} Hz {test_voltages[ch]:>6.2f} V {'NO DATA':>10} {'N/A':>8}")
continue
# Skip first 20% for filter settling
settle = len(data) // 5
settled = data[settle:]
if len(settled) == 0:
mean_v = np.mean(data)
else:
mean_v = np.mean(settled)
error = abs(test_voltages[ch] - mean_v)
max_error = max(max_error, error)
print(
f" SCO {ch:>2} {SCO_FREQUENCIES[ch]:>8,} Hz "
f"{test_voltages[ch]:>6.2f} V {mean_v:>8.3f} V {error:>6.3f} V"
)
print("-" * 60)
print()
if max_error > 0.5:
print(f"Max error: {max_error:.3f} V -- EXCESSIVE (> 0.5V)")
print("PLL may need more settling time. Try increasing --samples.")
sys.exit(1)
elif max_error > 0.1:
print(f"Max error: {max_error:.3f} V -- MODERATE")
print("Consider increasing --samples or --snr for better accuracy.")
else:
print(f"Max error: {max_error:.3f} V -- GOOD")
print("FM loopback complete.")
if __name__ == "__main__":
main()