gr-apollo/tests/test_protocol.py
Ryan Malloy 0ee7ff0ad7 Implement full Apollo USB downlink decoder chain
Complete signal processing pipeline from complex baseband to decoded
PCM telemetry, verified against the 1965 NAA Study Guide (A-624):

Core demod (Phase 1):
  - PM demodulator with carrier PLL recovery
  - 1.024 MHz subcarrier extractor (bandpass + downconvert)
  - BPSK demodulator with Costas loop + symbol sync
  - Convenience hier_block2 combining subcarrier + BPSK

PCM frame processing (Phase 2):
  - 32-bit frame sync with Hamming distance correlator
  - SEARCH/VERIFY/LOCKED state machine, complement-on-odd handling
  - Frame demultiplexer (128-word, A/D voltage scaling)
  - AGC downlink decoder (15-bit word reassembly from DNTM1/DNTM2)

Voice and analog (Phase 3):
  - 1.25 MHz FM voice subcarrier demod to 8 kHz audio
  - SCO demodulator for 9 analog sensor channels (14.5-165 kHz)

Virtual AGC integration (Phase 4):
  - TCP bridge client with auto-reconnect and channel filtering
  - DSKY uplink encoder (VERB/NOUN/DATA command sequences)

Top-level receiver (Phase 5):
  - usb_downlink_receiver hier_block2: one block, complex in, PDUs out
  - 14 GRC block YAML definitions for GNU Radio Companion
  - Example scripts for signal analysis and full-chain demo

Infrastructure:
  - constants.py with all timing/frequency/frame parameters
  - protocol.py for sync word + AGC packet encode/decode
  - Synthetic USB signal generator for testing
  - 222 tests passing, ruff lint clean
2026-02-20 13:18:42 -07:00

171 lines
5.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Tests for Apollo protocol utilities — sync words and AGC packets."""
import pytest
from apollo.constants import (
AGC_CH_DNTM1,
AGC_CH_INLINK,
AGC_CH_OUTLINK,
DEFAULT_SYNC_A,
DEFAULT_SYNC_B,
DEFAULT_SYNC_CORE,
)
from apollo.protocol import (
adc_to_voltage,
bits_to_sync_word,
form_io_packet,
generate_sync_word,
parse_io_packet,
parse_sync_word,
sync_word_to_bits,
voltage_to_adc,
)
class TestSyncWordGeneration:
"""Test 32-bit PCM frame sync word generation and parsing."""
def test_roundtrip(self):
"""Generate → parse → verify all fields match."""
word = generate_sync_word(frame_id=1)
fields = parse_sync_word(word)
assert fields["a_bits"] == DEFAULT_SYNC_A
assert fields["core"] == DEFAULT_SYNC_CORE
assert fields["b_bits"] == DEFAULT_SYNC_B
assert fields["frame_id"] == 1
def test_frame_id_range(self):
"""All valid frame IDs (1-50) should roundtrip."""
for fid in range(1, 51):
word = generate_sync_word(frame_id=fid)
fields = parse_sync_word(word)
assert fields["frame_id"] == fid
def test_invalid_frame_id(self):
with pytest.raises(ValueError):
generate_sync_word(frame_id=0)
with pytest.raises(ValueError):
generate_sync_word(frame_id=51)
def test_odd_frame_complements_core(self):
"""Odd frames should have complemented core."""
even = generate_sync_word(frame_id=2, odd=False)
odd = generate_sync_word(frame_id=1, odd=True)
even_fields = parse_sync_word(even)
odd_fields = parse_sync_word(odd)
# Core should be bitwise complement (15 bits)
assert (even_fields["core"] ^ odd_fields["core"]) == 0x7FFF
def test_word_is_32_bits(self):
word = generate_sync_word(frame_id=25)
assert 0 <= word < (1 << 32)
def test_bits_roundtrip(self):
"""word → bits → word should be identity."""
word = generate_sync_word(frame_id=42)
bits = sync_word_to_bits(word)
assert len(bits) == 32
assert all(b in (0, 1) for b in bits)
recovered = bits_to_sync_word(bits)
assert recovered == word
def test_bits_msb_first(self):
"""Bit 0 in the list should be the MSB of the word."""
word = generate_sync_word(frame_id=1)
bits = sync_word_to_bits(word)
# MSB is bit 31
assert bits[0] == (word >> 31) & 1
class TestAGCPacketProtocol:
"""Test Virtual AGC socket protocol encode/decode."""
def test_roundtrip_basic(self):
"""Encode → decode should preserve channel and value."""
packet = form_io_packet(channel=AGC_CH_OUTLINK, value=12345)
ch, val, u = parse_io_packet(packet)
assert ch == AGC_CH_OUTLINK
assert val == 12345
def test_roundtrip_all_telecom_channels(self):
for ch in [AGC_CH_INLINK, AGC_CH_OUTLINK, AGC_CH_DNTM1]:
for val in [0, 1, 0x3FFF, 0x7FFF]:
packet = form_io_packet(channel=ch, value=val)
got_ch, got_val, _ = parse_io_packet(packet)
assert got_ch == ch, f"Channel mismatch: {got_ch} != {ch}"
assert got_val == val, f"Value mismatch: {got_val} != {val}"
def test_packet_length(self):
packet = form_io_packet(channel=0, value=0)
assert len(packet) == 4
def test_signature_bits(self):
"""Verify the 2-bit signatures in each byte."""
packet = form_io_packet(channel=100, value=5000)
assert (packet[0] & 0xC0) == 0x00
assert (packet[1] & 0xC0) == 0x40
assert (packet[2] & 0xC0) == 0x80
assert (packet[3] & 0xC0) == 0xC0
def test_invalid_packet_length(self):
with pytest.raises(ValueError):
parse_io_packet(b"\x00\x40\x80")
def test_invalid_signature(self):
with pytest.raises(ValueError):
parse_io_packet(b"\xFF\x40\x80\xC0")
def test_zero_values(self):
packet = form_io_packet(channel=0, value=0)
ch, val, _ = parse_io_packet(packet)
assert ch == 0
assert val == 0
def test_max_values(self):
packet = form_io_packet(channel=0x1FF, value=0x7FFF)
ch, val, _ = parse_io_packet(packet)
assert ch == 0x1FF
assert val == 0x7FFF
class TestADCConversion:
"""Test A/D converter code ↔ voltage conversion (IMPL_SPEC section 5.3)."""
def test_zero_code(self):
"""Code 1 = 0V."""
assert adc_to_voltage(1) == 0.0
def test_fullscale_code(self):
"""Code 254 = 4.98V."""
assert abs(adc_to_voltage(254) - 4.98) < 0.001
def test_overflow_code(self):
"""Code 255 = >5V (clamped to 5.0)."""
assert adc_to_voltage(255) == 5.0
def test_midscale(self):
"""Midscale should be roughly 2.5V."""
mid_code = 128
voltage = adc_to_voltage(mid_code)
assert abs(voltage - 2.5) < 0.1 # within 100mV
def test_voltage_roundtrip(self):
"""voltage_to_adc(adc_to_voltage(code)) ≈ code for valid range."""
for code in [1, 50, 127, 200, 254]:
v = adc_to_voltage(code)
recovered = voltage_to_adc(v)
assert abs(recovered - code) <= 1, f"Code {code}: {v}V → {recovered}"
def test_low_level_scaling(self):
"""Low-level inputs use ×125 gain: 0-40 mV → 0-5V internal."""
# 40 mV at low-level = 40 * 125 = 5000 mV = 5V internal → code 254
v = adc_to_voltage(254, low_level=True)
assert abs(v - 0.03984) < 0.001 # 4.98V / 125 ≈ 0.03984V
def test_step_size(self):
"""Step size should be ~19.7 mV per LSB."""
v1 = adc_to_voltage(100)
v2 = adc_to_voltage(101)
step_mv = (v2 - v1) * 1000
assert abs(step_mv - 19.7) < 0.1