Complete signal processing pipeline from complex baseband to decoded PCM telemetry, verified against the 1965 NAA Study Guide (A-624): Core demod (Phase 1): - PM demodulator with carrier PLL recovery - 1.024 MHz subcarrier extractor (bandpass + downconvert) - BPSK demodulator with Costas loop + symbol sync - Convenience hier_block2 combining subcarrier + BPSK PCM frame processing (Phase 2): - 32-bit frame sync with Hamming distance correlator - SEARCH/VERIFY/LOCKED state machine, complement-on-odd handling - Frame demultiplexer (128-word, A/D voltage scaling) - AGC downlink decoder (15-bit word reassembly from DNTM1/DNTM2) Voice and analog (Phase 3): - 1.25 MHz FM voice subcarrier demod to 8 kHz audio - SCO demodulator for 9 analog sensor channels (14.5-165 kHz) Virtual AGC integration (Phase 4): - TCP bridge client with auto-reconnect and channel filtering - DSKY uplink encoder (VERB/NOUN/DATA command sequences) Top-level receiver (Phase 5): - usb_downlink_receiver hier_block2: one block, complex in, PDUs out - 14 GRC block YAML definitions for GNU Radio Companion - Example scripts for signal analysis and full-chain demo Infrastructure: - constants.py with all timing/frequency/frame parameters - protocol.py for sync word + AGC packet encode/decode - Synthetic USB signal generator for testing - 222 tests passing, ruff lint clean
82 lines
2.8 KiB
Python
82 lines
2.8 KiB
Python
"""Tests for the PM demodulator block."""
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
try:
|
|
from gnuradio import blocks, gr
|
|
|
|
HAS_GNURADIO = True
|
|
except ImportError:
|
|
HAS_GNURADIO = False
|
|
|
|
from apollo.constants import PM_PEAK_DEVIATION_RAD, SAMPLE_RATE_BASEBAND
|
|
|
|
pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
|
|
|
|
|
|
class TestPMDemod:
|
|
"""Test PM demodulation with synthetic signals."""
|
|
|
|
def test_pure_carrier_zero_output(self):
|
|
"""Unmodulated carrier should produce near-zero PM demod output."""
|
|
from apollo.pm_demod import pm_demod
|
|
|
|
tb = gr.top_block()
|
|
n_samples = 50000
|
|
|
|
# Pure carrier (no modulation) = constant complex exponential
|
|
carrier = np.ones(n_samples, dtype=np.complex64)
|
|
src = blocks.vector_source_c(carrier.tolist())
|
|
demod = pm_demod(carrier_pll_bw=0.02, sample_rate=SAMPLE_RATE_BASEBAND)
|
|
snk = blocks.vector_sink_f()
|
|
|
|
tb.connect(src, demod, snk)
|
|
tb.run()
|
|
|
|
output = np.array(snk.data())
|
|
# After PLL settles (skip first 1000 samples), output should be near zero
|
|
settled = output[1000:]
|
|
assert len(settled) > 0
|
|
assert np.std(settled) < 0.1, f"Unmodulated carrier std too high: {np.std(settled)}"
|
|
|
|
def test_known_pm_recovery(self):
|
|
"""PM-modulated signal should recover the modulating waveform."""
|
|
from apollo.pm_demod import pm_demod
|
|
|
|
tb = gr.top_block()
|
|
n_samples = 100000
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
|
|
# Generate a test tone PM signal
|
|
t = np.arange(n_samples, dtype=np.float64) / sample_rate
|
|
tone_freq = 10000 # 10 kHz test tone
|
|
modulating = PM_PEAK_DEVIATION_RAD * np.sin(2 * np.pi * tone_freq * t)
|
|
signal = np.exp(1j * modulating).astype(np.complex64)
|
|
|
|
src = blocks.vector_source_c(signal.tolist())
|
|
demod = pm_demod(carrier_pll_bw=0.02, sample_rate=sample_rate)
|
|
snk = blocks.vector_sink_f()
|
|
|
|
tb.connect(src, demod, snk)
|
|
tb.run()
|
|
|
|
output = np.array(snk.data())
|
|
# After PLL settles, the output should correlate with the modulating signal
|
|
settled_out = output[5000:]
|
|
settled_mod = modulating[5000 : 5000 + len(settled_out)]
|
|
if len(settled_out) > len(settled_mod):
|
|
settled_out = settled_out[: len(settled_mod)]
|
|
|
|
# Normalize both and check correlation
|
|
if np.std(settled_out) > 0.01:
|
|
correlation = np.corrcoef(settled_out, settled_mod)[0, 1]
|
|
assert abs(correlation) > 0.8, f"PM recovery correlation too low: {correlation}"
|
|
|
|
def test_block_instantiation(self):
|
|
"""Block should instantiate with default parameters."""
|
|
from apollo.pm_demod import pm_demod
|
|
|
|
demod = pm_demod()
|
|
assert demod is not None
|