Complete signal processing pipeline from complex baseband to decoded PCM telemetry, verified against the 1965 NAA Study Guide (A-624): Core demod (Phase 1): - PM demodulator with carrier PLL recovery - 1.024 MHz subcarrier extractor (bandpass + downconvert) - BPSK demodulator with Costas loop + symbol sync - Convenience hier_block2 combining subcarrier + BPSK PCM frame processing (Phase 2): - 32-bit frame sync with Hamming distance correlator - SEARCH/VERIFY/LOCKED state machine, complement-on-odd handling - Frame demultiplexer (128-word, A/D voltage scaling) - AGC downlink decoder (15-bit word reassembly from DNTM1/DNTM2) Voice and analog (Phase 3): - 1.25 MHz FM voice subcarrier demod to 8 kHz audio - SCO demodulator for 9 analog sensor channels (14.5-165 kHz) Virtual AGC integration (Phase 4): - TCP bridge client with auto-reconnect and channel filtering - DSKY uplink encoder (VERB/NOUN/DATA command sequences) Top-level receiver (Phase 5): - usb_downlink_receiver hier_block2: one block, complex in, PDUs out - 14 GRC block YAML definitions for GNU Radio Companion - Example scripts for signal analysis and full-chain demo Infrastructure: - constants.py with all timing/frequency/frame parameters - protocol.py for sync word + AGC packet encode/decode - Synthetic USB signal generator for testing - 222 tests passing, ruff lint clean
246 lines
8.9 KiB
Python
246 lines
8.9 KiB
Python
"""Tests for Apollo AGC downlink decoder."""
|
|
|
|
|
|
from apollo.constants import (
|
|
AGC_CH_DNTM1,
|
|
AGC_CH_DNTM2,
|
|
AGC_CH_OUTLINK,
|
|
DL_CM_COAST_ALIGN,
|
|
DL_CM_POWERED_LIST,
|
|
DL_LM_DESCENT_ASCENT,
|
|
DL_LM_ORBITAL_MANEUVERS,
|
|
)
|
|
from apollo.downlink_decoder import (
|
|
DL_LIST_NAMES,
|
|
DownlinkEngine,
|
|
identify_list_type,
|
|
reassemble_agc_word,
|
|
)
|
|
|
|
|
|
class TestAGCWordReassembly:
|
|
"""Test 15-bit AGC word reassembly from channel 34/35 byte pairs."""
|
|
|
|
def test_zero_word(self):
|
|
"""Both channels zero should produce word 0."""
|
|
assert reassemble_agc_word(0, 0) == 0
|
|
|
|
def test_max_word(self):
|
|
"""Maximum values: DNTM1=0x7F, DNTM2=0xFF -> 0x7FFF = 32767."""
|
|
assert reassemble_agc_word(0x7F, 0xFF) == 0x7FFF
|
|
|
|
def test_high_byte_only(self):
|
|
"""DNTM1=0x01, DNTM2=0x00 -> 0x0100 = 256."""
|
|
assert reassemble_agc_word(0x01, 0x00) == 0x0100
|
|
|
|
def test_low_byte_only(self):
|
|
"""DNTM1=0x00, DNTM2=0xFF -> 0x00FF = 255."""
|
|
assert reassemble_agc_word(0x00, 0xFF) == 0x00FF
|
|
|
|
def test_known_value(self):
|
|
"""Specific test case: 0x2A high, 0x55 low -> 0x2A55."""
|
|
result = reassemble_agc_word(0x2A, 0x55)
|
|
assert result == (0x2A << 8) | 0x55
|
|
|
|
def test_high_byte_mask(self):
|
|
"""Only the lower 7 bits of DNTM1 are used (15-bit word, not 16)."""
|
|
# 0xFF has bit 7 set, which should be masked off
|
|
result_masked = reassemble_agc_word(0xFF, 0x00)
|
|
result_clean = reassemble_agc_word(0x7F, 0x00)
|
|
assert result_masked == result_clean
|
|
|
|
def test_roundtrip_encoding(self):
|
|
"""Encoding then decoding should preserve the original value."""
|
|
for original in [0, 1, 127, 255, 1000, 16383, 32767]:
|
|
high = (original >> 8) & 0x7F
|
|
low = original & 0xFF
|
|
recovered = reassemble_agc_word(high, low)
|
|
assert recovered == original, f"Failed roundtrip for {original}"
|
|
|
|
|
|
class TestDownlinkListIdentification:
|
|
"""Test downlink list type identification from first buffer word."""
|
|
|
|
def test_cm_powered_list(self):
|
|
list_id, name = identify_list_type(DL_CM_POWERED_LIST)
|
|
assert list_id == 0
|
|
assert name == "CM Powered Flight"
|
|
|
|
def test_lm_orbital(self):
|
|
list_id, name = identify_list_type(DL_LM_ORBITAL_MANEUVERS)
|
|
assert list_id == 1
|
|
assert name == "LM Orbital Maneuvers"
|
|
|
|
def test_cm_coast_align(self):
|
|
list_id, name = identify_list_type(DL_CM_COAST_ALIGN)
|
|
assert list_id == 2
|
|
assert name == "CM Coast/Alignment"
|
|
|
|
def test_lm_descent_ascent(self):
|
|
# DL_LM_DESCENT_ASCENT = 7
|
|
list_id, name = identify_list_type(DL_LM_DESCENT_ASCENT)
|
|
assert list_id == 7
|
|
assert name == "LM Descent/Ascent"
|
|
|
|
def test_unknown_type(self):
|
|
"""Unknown type IDs should return 'Unknown' name."""
|
|
list_id, name = identify_list_type(0x0F) # 15, not defined
|
|
assert list_id == 15
|
|
assert "Unknown" in name
|
|
|
|
def test_list_id_extracted_from_lower_bits(self):
|
|
"""Only the lower 4 bits should be used for list ID extraction."""
|
|
# Set upper bits but lower nibble = 2 (CM Coast/Align)
|
|
word = 0x7FF2 # upper bits set, lower nibble = 2
|
|
list_id, name = identify_list_type(word)
|
|
assert list_id == 2
|
|
assert name == "CM Coast/Alignment"
|
|
|
|
def test_all_known_types(self):
|
|
"""All known list types should be identified."""
|
|
for type_id, expected_name in DL_LIST_NAMES.items():
|
|
list_id, name = identify_list_type(type_id)
|
|
assert list_id == type_id
|
|
assert name == expected_name
|
|
|
|
|
|
class TestDownlinkEngine:
|
|
"""Test the downlink decoding engine."""
|
|
|
|
def test_empty_engine(self):
|
|
"""New engine should have empty buffers."""
|
|
engine = DownlinkEngine()
|
|
assert engine.force_flush() is None
|
|
|
|
def test_single_word_pair(self):
|
|
"""Feeding one DNTM1/DNTM2 pair should buffer one word."""
|
|
engine = DownlinkEngine(buffer_size=1)
|
|
# DNTM1 alone should not produce output
|
|
result = engine.feed_agc_word(AGC_CH_DNTM1, 0x10)
|
|
assert result is None
|
|
|
|
# DNTM2 completes the pair -> with buffer_size=1, should emit
|
|
result = engine.feed_agc_word(AGC_CH_DNTM2, 0x20)
|
|
assert result is not None
|
|
assert result["word_count"] == 1
|
|
assert result["words"][0] == reassemble_agc_word(0x10, 0x20)
|
|
|
|
def test_buffer_fills_at_threshold(self):
|
|
"""Buffer should auto-emit when buffer_size words are collected."""
|
|
buf_size = 5
|
|
engine = DownlinkEngine(buffer_size=buf_size)
|
|
|
|
for i in range(buf_size - 1):
|
|
engine.feed_agc_word(AGC_CH_DNTM1, i & 0x7F)
|
|
result = engine.feed_agc_word(AGC_CH_DNTM2, i & 0xFF)
|
|
if i < buf_size - 2:
|
|
assert result is None
|
|
|
|
# The last pair should trigger emission
|
|
# (already fed buf_size-1 pairs in the loop, but the loop
|
|
# feeds all buf_size-1 pairs, so result on the last iteration
|
|
# is not None because that's pair buf_size-1. Let me redo.)
|
|
# Actually: the loop runs buf_size-1 times, feeding buf_size-1 pairs.
|
|
# We need one more pair.
|
|
engine.feed_agc_word(AGC_CH_DNTM1, 0x7F)
|
|
result = engine.feed_agc_word(AGC_CH_DNTM2, 0xFF)
|
|
assert result is not None
|
|
assert result["word_count"] == buf_size
|
|
|
|
def test_list_type_in_snapshot(self):
|
|
"""Snapshot should identify the list type from the first word."""
|
|
engine = DownlinkEngine(buffer_size=3)
|
|
|
|
# First word has list type ID in lower 4 bits
|
|
# Use DL_CM_COAST_ALIGN (2) as the first word
|
|
first_high = 0x00
|
|
first_low = DL_CM_COAST_ALIGN # = 2
|
|
engine.feed_agc_word(AGC_CH_DNTM1, first_high)
|
|
engine.feed_agc_word(AGC_CH_DNTM2, first_low)
|
|
|
|
# Two more filler words
|
|
engine.feed_agc_word(AGC_CH_DNTM1, 0x00)
|
|
engine.feed_agc_word(AGC_CH_DNTM2, 0x00)
|
|
|
|
engine.feed_agc_word(AGC_CH_DNTM1, 0x00)
|
|
result = engine.feed_agc_word(AGC_CH_DNTM2, 0x00)
|
|
|
|
assert result is not None
|
|
assert result["list_type_id"] == DL_CM_COAST_ALIGN
|
|
assert result["list_name"] == "CM Coast/Alignment"
|
|
|
|
def test_outlink_data_collected(self):
|
|
"""OUTLINK channel data should be accumulated in outlink_data."""
|
|
engine = DownlinkEngine(buffer_size=1)
|
|
|
|
# Feed outlink data before the buffer fills
|
|
engine.feed_agc_word(AGC_CH_OUTLINK, 0xAA)
|
|
engine.feed_agc_word(AGC_CH_OUTLINK, 0xBB)
|
|
|
|
# Now complete a word pair to trigger snapshot
|
|
engine.feed_agc_word(AGC_CH_DNTM1, 0x00)
|
|
result = engine.feed_agc_word(AGC_CH_DNTM2, 0x00)
|
|
|
|
assert result is not None
|
|
assert result["outlink_data"] == [0xAA, 0xBB]
|
|
|
|
def test_force_flush_partial(self):
|
|
"""force_flush should emit partial buffer contents."""
|
|
engine = DownlinkEngine(buffer_size=100)
|
|
|
|
engine.feed_agc_word(AGC_CH_DNTM1, 0x10)
|
|
engine.feed_agc_word(AGC_CH_DNTM2, 0x20)
|
|
engine.feed_agc_word(AGC_CH_DNTM1, 0x30)
|
|
engine.feed_agc_word(AGC_CH_DNTM2, 0x40)
|
|
|
|
result = engine.force_flush()
|
|
assert result is not None
|
|
assert result["word_count"] == 2
|
|
|
|
def test_reset_clears_state(self):
|
|
"""reset should clear all internal buffers."""
|
|
engine = DownlinkEngine(buffer_size=100)
|
|
|
|
engine.feed_agc_word(AGC_CH_DNTM1, 0x10)
|
|
engine.feed_agc_word(AGC_CH_DNTM2, 0x20)
|
|
engine.feed_agc_word(AGC_CH_OUTLINK, 0xFF)
|
|
|
|
engine.reset()
|
|
|
|
assert engine.force_flush() is None
|
|
|
|
def test_dntm1_without_dntm2_ignored(self):
|
|
"""A DNTM1 not followed by DNTM2 should be overwritten by next DNTM1."""
|
|
engine = DownlinkEngine(buffer_size=1)
|
|
|
|
# Two DNTM1s in a row: only the second should be used
|
|
engine.feed_agc_word(AGC_CH_DNTM1, 0xAA)
|
|
engine.feed_agc_word(AGC_CH_DNTM1, 0xBB) # overwrites 0xAA
|
|
result = engine.feed_agc_word(AGC_CH_DNTM2, 0x00)
|
|
|
|
assert result is not None
|
|
expected = reassemble_agc_word(0xBB, 0x00)
|
|
assert result["words"][0] == expected
|
|
|
|
def test_unknown_channel_ignored(self):
|
|
"""Channels other than 34/35/57 should be silently ignored."""
|
|
engine = DownlinkEngine(buffer_size=1)
|
|
result = engine.feed_agc_word(99, 0xFF)
|
|
assert result is None
|
|
|
|
def test_multiple_snapshots(self):
|
|
"""Engine should produce multiple snapshots as buffer fills repeatedly."""
|
|
engine = DownlinkEngine(buffer_size=2)
|
|
|
|
snapshots = []
|
|
for i in range(6):
|
|
engine.feed_agc_word(AGC_CH_DNTM1, i & 0x7F)
|
|
result = engine.feed_agc_word(AGC_CH_DNTM2, i & 0xFF)
|
|
if result is not None:
|
|
snapshots.append(result)
|
|
|
|
# 6 pairs / buffer_size 2 = 3 snapshots
|
|
assert len(snapshots) == 3
|
|
for snap in snapshots:
|
|
assert snap["word_count"] == 2
|