Uplink word codec (uplink_word_codec.py): - UplinkSerializerEngine: (channel, value) pairs to 15-bit NRZ bit stream with configurable inter-word gap for UPRUPT timing - UplinkDeserializerEngine: two-phase state machine (acquisition + fixed framing) recovers words from NRZ bits, handles leading-zero data words - GR wrappers: uplink_word_serializer (sync_block source) and uplink_word_deserializer (basic_block sink with message output) TX source (usb_uplink_source.py): - hier_block2 wiring: word_serializer -> nrz_encoder -> FM mod (4 kHz dev) -> 70 kHz upconvert -> complex_to_real -> PM mod (1.0 rad) -> [AWGN] - Message input "words" forwards PDUs from uplink_encoder RX receiver (usb_uplink_receiver.py): - hier_block2 wiring: PM demod -> subcarrier_extract (70 kHz, 20 kHz BW) -> quadrature_demod -> matched filter -> decimate -> slicer -> deserializer - Message output "commands" emits recovered (channel, value) PDUs GRC block definitions for both source and receiver. Loopback demo (uplink_loopback_demo.py): - Encodes V16N36E, serializes with pure-Python engine, runs through GR RF chain (FM + PM + noise + demod), deserializes, compares TX vs RX words
251 lines
7.9 KiB
Python
251 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Apollo Uplink Loopback Demo -- encode V16N36E, modulate, demodulate, verify.
|
|
|
|
Demonstrates the full uplink signal chain using a mix of pure-Python engines
|
|
(for bit-level serialization/deserialization) and GNU Radio blocks (for the
|
|
RF modulation/demodulation path):
|
|
|
|
TX (ground station):
|
|
UplinkEncoder -> UplinkSerializerEngine -> [bits]
|
|
-> GR: nrz_encoder -> FM mod -> 70 kHz upconvert -> PM mod
|
|
|
|
RX (spacecraft):
|
|
GR: PM demod -> 70 kHz extract -> FM demod -> matched filter -> slicer
|
|
-> [bits] -> UplinkDeserializerEngine
|
|
|
|
The pure-Python engines handle word<->bit conversion at the endpoints, while
|
|
the GR streaming chain proves the RF modulation path works end-to-end.
|
|
|
|
Usage:
|
|
uv run python examples/uplink_loopback_demo.py
|
|
uv run python examples/uplink_loopback_demo.py --snr 20
|
|
uv run python examples/uplink_loopback_demo.py --snr 10 --verb 37 --noun 0
|
|
"""
|
|
|
|
import argparse
|
|
import math
|
|
import sys
|
|
|
|
import numpy as np
|
|
from gnuradio import analog, blocks, digital, filter, gr
|
|
|
|
from apollo.constants import (
|
|
SAMPLE_RATE_BASEBAND,
|
|
UPLINK_DATA_SUBCARRIER_HZ,
|
|
)
|
|
from apollo.nrz_encoder import nrz_encoder
|
|
from apollo.pm_demod import pm_demod
|
|
from apollo.pm_mod import pm_mod
|
|
from apollo.subcarrier_extract import subcarrier_extract
|
|
from apollo.uplink_encoder import UplinkEncoder
|
|
from apollo.uplink_word_codec import (
|
|
UPLINK_WORD_BITS,
|
|
UplinkDeserializerEngine,
|
|
UplinkSerializerEngine,
|
|
)
|
|
|
|
# Uplink parameters (local definitions)
|
|
UPLINK_PM_DEVIATION_RAD = 1.0
|
|
UPLINK_DATA_BIT_RATE = 2_000
|
|
UPLINK_DATA_FM_DEVIATION_HZ = 4_000
|
|
UPLINK_INTER_WORD_GAP = 3
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Apollo uplink loopback demo")
|
|
parser.add_argument(
|
|
"--verb", type=int, default=16, help="Verb number (default: 16)"
|
|
)
|
|
parser.add_argument(
|
|
"--noun", type=int, default=36, help="Noun number (default: 36)"
|
|
)
|
|
parser.add_argument(
|
|
"--snr", type=float, default=None, help="SNR in dB (None = no noise)"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
bit_rate = UPLINK_DATA_BIT_RATE
|
|
|
|
# --- Encode the command ---
|
|
|
|
encoder = UplinkEncoder()
|
|
tx_pairs = encoder.encode_verb_noun(verb=args.verb, noun=args.noun)
|
|
|
|
print("=" * 60)
|
|
print("Apollo Uplink Loopback Demo")
|
|
print("=" * 60)
|
|
print(f" Command: V{args.verb:02d}N{args.noun:02d}E")
|
|
print(f" Uplink words: {len(tx_pairs)}")
|
|
print(f" SNR: {'clean (no noise)' if args.snr is None else f'{args.snr} dB'}")
|
|
print()
|
|
|
|
print("TX word sequence:")
|
|
for i, (ch, val) in enumerate(tx_pairs):
|
|
print(f" [{i}] ch={ch:03o} val={val:05o} ({val:>5d}) "
|
|
f"bits={val:015b}")
|
|
print()
|
|
|
|
# --- Serialize to bits using pure-Python engine ---
|
|
|
|
serializer = UplinkSerializerEngine(inter_word_gap=UPLINK_INTER_WORD_GAP)
|
|
serializer.add_words(tx_pairs)
|
|
|
|
bits_per_word = UPLINK_WORD_BITS + UPLINK_INTER_WORD_GAP
|
|
total_data_bits = len(tx_pairs) * bits_per_word
|
|
|
|
# Add leading and trailing idle for PLL settling
|
|
pll_settle_bits = int(bit_rate * 0.5) # 0.5 seconds of idle
|
|
total_bits = pll_settle_bits + total_data_bits + pll_settle_bits
|
|
|
|
tx_bits = serializer.next_bits(total_bits)
|
|
tx_bytes = np.array(tx_bits, dtype=np.byte)
|
|
|
|
samples_per_bit = int(sample_rate / bit_rate)
|
|
n_samples = total_bits * samples_per_bit
|
|
|
|
print(f" Total bits: {total_bits} ({total_data_bits} data + "
|
|
f"{2 * pll_settle_bits} idle)")
|
|
print(f" Samples per bit: {samples_per_bit}")
|
|
print(f" Total samples: {n_samples:,}")
|
|
print(f" Duration: {n_samples / sample_rate:.3f} s")
|
|
print()
|
|
|
|
# --- Build GR flowgraph for the RF path ---
|
|
#
|
|
# TX: vector_source_b -> nrz -> FM mod -> upconvert 70 kHz -> to_real -> PM mod
|
|
# RX: PM demod -> extract 70 kHz -> FM demod -> matched filter
|
|
# -> decimate -> slicer -> vector_sink
|
|
|
|
print("Building flowgraph...")
|
|
tb = gr.top_block()
|
|
|
|
# TX chain
|
|
src = blocks.vector_source_b(tx_bytes.tolist(), False)
|
|
nrz = nrz_encoder(bit_rate=bit_rate, sample_rate=sample_rate)
|
|
|
|
fm_sensitivity = 2.0 * math.pi * UPLINK_DATA_FM_DEVIATION_HZ / sample_rate
|
|
fm_mod = analog.frequency_modulator_fc(fm_sensitivity)
|
|
|
|
lo = analog.sig_source_c(
|
|
sample_rate, analog.GR_COS_WAVE,
|
|
UPLINK_DATA_SUBCARRIER_HZ, 1.0, 0,
|
|
)
|
|
mixer = blocks.multiply_cc(1)
|
|
to_real = blocks.complex_to_real(1)
|
|
|
|
pm = pm_mod(pm_deviation=UPLINK_PM_DEVIATION_RAD, sample_rate=sample_rate)
|
|
|
|
# RX chain
|
|
pm_rx = pm_demod(carrier_pll_bw=0.02, sample_rate=sample_rate)
|
|
|
|
sc_extract = subcarrier_extract(
|
|
center_freq=UPLINK_DATA_SUBCARRIER_HZ,
|
|
bandwidth=20_000,
|
|
sample_rate=sample_rate,
|
|
)
|
|
|
|
fm_gain = sample_rate / (2.0 * math.pi * UPLINK_DATA_FM_DEVIATION_HZ)
|
|
fm_demod = analog.quadrature_demod_cf(fm_gain)
|
|
|
|
matched_taps = [1.0 / samples_per_bit] * samples_per_bit
|
|
matched = filter.fir_filter_fff(1, matched_taps)
|
|
decim = blocks.keep_one_in_n(gr.sizeof_float, samples_per_bit)
|
|
slicer = digital.binary_slicer_fb()
|
|
|
|
snk = blocks.vector_sink_b()
|
|
|
|
# Optional noise
|
|
if args.snr is not None:
|
|
noise_power = 1.0 / (10.0 ** (args.snr / 10.0))
|
|
noise_amplitude = math.sqrt(noise_power / 2.0)
|
|
noise = analog.noise_source_c(analog.GR_GAUSSIAN, noise_amplitude, 0)
|
|
add_noise = blocks.add_cc(1)
|
|
|
|
tb.connect(pm, (add_noise, 0))
|
|
tb.connect(noise, (add_noise, 1))
|
|
noise_out = add_noise
|
|
else:
|
|
noise_out = pm
|
|
|
|
# Wire TX
|
|
tb.connect(src, nrz, fm_mod, (mixer, 0))
|
|
tb.connect(lo, (mixer, 1))
|
|
tb.connect(mixer, to_real, pm)
|
|
|
|
# Wire RX
|
|
tb.connect(noise_out, pm_rx, sc_extract, fm_demod, matched, decim, slicer, snk)
|
|
|
|
print("Running flowgraph (TX -> RX)...")
|
|
tb.run()
|
|
print()
|
|
|
|
# --- Deserialize recovered bits ---
|
|
|
|
rx_bits = list(snk.data())
|
|
print(f"Recovered {len(rx_bits)} bits from slicer")
|
|
|
|
deserializer = UplinkDeserializerEngine()
|
|
rx_pairs = deserializer.process_bits(rx_bits)
|
|
|
|
print(f"Recovered {len(rx_pairs)} words (expected {len(tx_pairs)})")
|
|
print()
|
|
|
|
if not rx_pairs:
|
|
print("No words recovered. PLL may need more settling time or")
|
|
print("the subcarrier filter bandwidth may need adjustment.")
|
|
sys.exit(1)
|
|
|
|
# --- Compare TX vs RX ---
|
|
|
|
print("RX word sequence:")
|
|
for i, (ch, val) in enumerate(rx_pairs):
|
|
print(f" [{i}] ch={ch:03o} val={val:05o} ({val:>5d}) "
|
|
f"bits={val:015b}")
|
|
print()
|
|
|
|
# Match comparison
|
|
matches = 0
|
|
n_compare = min(len(tx_pairs), len(rx_pairs))
|
|
errors = []
|
|
|
|
for i in range(n_compare):
|
|
tx_ch, tx_val = tx_pairs[i]
|
|
rx_ch, rx_val = rx_pairs[i]
|
|
if tx_val == rx_val:
|
|
matches += 1
|
|
else:
|
|
errors.append((i, tx_val, rx_val))
|
|
|
|
print("-" * 60)
|
|
print(f" Words transmitted: {len(tx_pairs)}")
|
|
print(f" Words recovered: {len(rx_pairs)}")
|
|
print(f" Matches: {matches}/{n_compare}")
|
|
|
|
if errors:
|
|
print(f" Errors: {len(errors)}")
|
|
for idx, tx_v, rx_v in errors:
|
|
# Count differing bits
|
|
diff = tx_v ^ rx_v
|
|
n_bit_err = bin(diff).count("1")
|
|
print(f" Word {idx}: TX={tx_v:05o} RX={rx_v:05o} "
|
|
f"({n_bit_err} bit errors)")
|
|
|
|
if n_compare > 0:
|
|
wer = 1.0 - (matches / n_compare)
|
|
print(f" Word error rate: {wer:.1%}")
|
|
print("-" * 60)
|
|
|
|
if matches == n_compare and len(rx_pairs) == len(tx_pairs):
|
|
print()
|
|
print(f"V{args.verb:02d}N{args.noun:02d}E round-trip: all {matches} words match.")
|
|
elif matches == n_compare:
|
|
print()
|
|
print(f"All compared words match, but word count differs "
|
|
f"({len(rx_pairs)} recovered vs {len(tx_pairs)} sent).")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|