gr-apollo/examples/loopback_demo.py
Ryan Malloy cd3a8cc6be Add SCO modulator, external audio input, and demo scripts
- sco_mod: 9-channel FM subcarrier oscillator modulator (inverse of sco_demod),
  with round-trip tests proving voltage recovery across all channels
- fm_voice_subcarrier_mod: add audio_input parameter to accept external float
  streams (e.g., Apollo mission voice recordings) instead of internal test tone
- loopback_demo.py: streaming TX->RX round-trip with CLI for noise/voice/frames
- agc_loopback_demo.py: full Virtual AGC integration via TCP bridge
2026-02-22 13:01:48 -07:00

134 lines
4.2 KiB
Python

#!/usr/bin/env python3
"""
Apollo USB Loopback Demo -- streaming TX -> RX round-trip.
Demonstrates the full gr-apollo block chain using GNU Radio streaming blocks:
TX: pcm_frame_source -> nrz_encoder -> bpsk_subcarrier_mod -> pm_mod
RX: pm_demod -> subcarrier_extract -> bpsk_demod -> pcm_frame_sync -> pcm_demux
All wrapped in the convenience blocks:
usb_signal_source -> usb_downlink_receiver
Prints decoded frames as they arrive, including sync word analysis.
Usage:
uv run python examples/loopback_demo.py
uv run python examples/loopback_demo.py --voice # include voice subcarrier
uv run python examples/loopback_demo.py --snr 20 # add noise at 20 dB SNR
uv run python examples/loopback_demo.py --frames 20 # generate 20 frames
"""
import argparse
import sys
import pmt
from gnuradio import blocks, gr
from apollo.constants import (
PCM_HIGH_BIT_RATE,
PCM_HIGH_WORDS_PER_FRAME,
PCM_WORD_LENGTH,
SAMPLE_RATE_BASEBAND,
)
from apollo.pcm_demux import DemuxEngine
from apollo.usb_downlink_receiver import usb_downlink_receiver
from apollo.usb_signal_source import usb_signal_source
def main():
parser = argparse.ArgumentParser(description="Apollo USB loopback demo")
parser.add_argument("--frames", type=int, default=10, help="Number of frames to generate")
parser.add_argument("--snr", type=float, default=None, help="SNR in dB (None = no noise)")
parser.add_argument("--voice", action="store_true", help="Enable voice subcarrier")
args = parser.parse_args()
bits_per_frame = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
samples_per_frame = int(bits_per_frame * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
n_samples = args.frames * samples_per_frame
print("=" * 60)
print("Apollo USB Loopback Demo")
print("=" * 60)
print(f" Frames to transmit: {args.frames}")
print(f" Samples per frame: {samples_per_frame:,}")
print(f" Total samples: {n_samples:,}")
print(f" Duration: {n_samples / SAMPLE_RATE_BASEBAND:.3f} s")
print(f" SNR: {'clean (no noise)' if args.snr is None else f'{args.snr} dB'}")
print(f" Voice subcarrier: {'enabled' if args.voice else 'disabled'}")
print()
# Build the flowgraph
print("Building flowgraph...")
tb = gr.top_block()
tx = usb_signal_source(
sample_rate=SAMPLE_RATE_BASEBAND,
bit_rate=PCM_HIGH_BIT_RATE,
snr_db=args.snr,
voice_enabled=args.voice,
)
head = blocks.head(gr.sizeof_gr_complex, n_samples)
rx = usb_downlink_receiver(
sample_rate=SAMPLE_RATE_BASEBAND,
bit_rate=PCM_HIGH_BIT_RATE,
output_format="raw",
)
snk = blocks.message_debug()
tb.connect(tx, head, rx)
tb.msg_connect(rx, "frames", snk, "store")
print("Running flowgraph (TX -> RX)...")
print()
tb.run()
n_recovered = snk.num_messages()
print(f"Recovered {n_recovered} frames from {args.frames} transmitted")
print()
if n_recovered == 0:
print("No frames recovered. PLL may need more settling time.")
print("Try increasing --frames to give the receiver more data.")
sys.exit(1)
# Decode and display each recovered frame
demux = DemuxEngine(output_format="raw")
print("-" * 60)
for i in range(n_recovered):
msg = snk.get_message(i)
payload = pmt.cdr(msg) if pmt.is_pair(msg) else msg
frame_bytes = bytes(pmt.u8vector_elements(payload))
result = demux.process_frame(frame_bytes)
sync = result.get("sync", {})
frame_id = sync.get("frame_id", 0)
parity = "odd" if (frame_id % 2 == 1) else "even"
words = result.get("words", [])
n_words = len(words)
# Show first few data words as hex
word_preview = " ".join(
f"{w['raw_value']:02X}" for w in words[:8]
)
print(
f" Frame {i + 1:3d}: "
f"ID={frame_id:>2} ({parity:4s}), "
f"sync=0x{sync.get('word', 0):08X}, "
f"{n_words} words "
f"[{word_preview} ...]"
)
print("-" * 60)
print()
print(f"Recovery rate: {n_recovered}/{args.frames} "
f"({100 * n_recovered / args.frames:.0f}%)")
if __name__ == "__main__":
main()