gr-apollo/tests/test_pcm_frame_source.py
Ryan Malloy 493c21c511 Add transmit chain: 6 composable GR source blocks mirroring CuriousMarc bench
Implement the transmit/generate side as streaming GNU Radio blocks,
complementing the existing receive chain. Each block maps to a physical
instrument on CuriousMarc's Keysight bench:

  pcm_frame_source  - PCM bit stream generator (sync_block + FrameSourceEngine)
  nrz_encoder       - bits to NRZ waveform (+1/-1) with upsampling
  bpsk_subcarrier_mod - NRZ x cos(1.024 MHz) BPSK modulator
  fm_voice_subcarrier_mod - 1.25 MHz FM test tone source
  pm_mod            - phase modulator: exp(j * deviation * input)
  usb_signal_source - convenience wrapper wiring all blocks together

Includes GRC YAML definitions for all blocks under [Apollo USB] category,
49 new tests (271 total, all passing), and a loopback test that validates
the full TX->RX round trip including frame recovery with 30 dB AWGN.
2026-02-21 18:55:50 -07:00

197 lines
6.4 KiB
Python

"""Tests for the PCM frame source block."""
import numpy as np
import pytest
try:
from gnuradio import blocks, gr
HAS_GNURADIO = True
except ImportError:
HAS_GNURADIO = False
from apollo.constants import (
PCM_HIGH_WORDS_PER_FRAME,
PCM_LOW_WORDS_PER_FRAME,
PCM_SYNC_WORD_LENGTH,
PCM_WORD_LENGTH,
SUBFRAME_FRAMES,
)
from apollo.pcm_frame_source import FrameSourceEngine
from apollo.protocol import bits_to_sync_word, parse_sync_word
class TestFrameSourceEngine:
"""Test the pure-Python frame generation engine (no GR needed)."""
def test_frame_length(self):
"""High-rate frame should be 128 words * 8 bits = 1024 bits."""
engine = FrameSourceEngine(bit_rate=51200)
bits = engine.next_frame()
assert len(bits) == PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
def test_frame_length_low_rate(self):
"""Low-rate frame should be 200 words * 8 bits = 1600 bits."""
engine = FrameSourceEngine(bit_rate=1600)
bits = engine.next_frame()
assert len(bits) == PCM_LOW_WORDS_PER_FRAME * PCM_WORD_LENGTH
def test_bits_are_binary(self):
"""Every output value should be 0 or 1."""
engine = FrameSourceEngine()
bits = engine.next_frame()
assert all(b in (0, 1) for b in bits)
def test_frame_counter_wraps(self):
"""Frame counter should cycle 1 -> 50 -> 1."""
engine = FrameSourceEngine()
assert engine.frame_counter == 1
# Generate 50 frames (one full subframe)
for expected_id in range(1, SUBFRAME_FRAMES + 1):
assert engine.frame_counter == expected_id
engine.next_frame()
# Should wrap back to 1
assert engine.frame_counter == 1
# One more frame to confirm it keeps going
engine.next_frame()
assert engine.frame_counter == 2
def test_frame_id_in_sync_word(self):
"""The 6-bit frame ID field in the sync word should match the counter."""
engine = FrameSourceEngine()
for expected_id in range(1, 6):
bits = engine.next_frame()
sync_word = bits_to_sync_word(bits[:PCM_SYNC_WORD_LENGTH])
parsed = parse_sync_word(sync_word)
assert parsed["frame_id"] == expected_id
def test_odd_even_sync(self):
"""Odd frames should have complemented sync core vs even frames."""
engine = FrameSourceEngine()
# Frame 1 (odd) and frame 2 (even) should differ in the core field
bits_1 = engine.next_frame()
bits_2 = engine.next_frame()
sync_1 = bits_to_sync_word(bits_1[:PCM_SYNC_WORD_LENGTH])
sync_2 = bits_to_sync_word(bits_2[:PCM_SYNC_WORD_LENGTH])
parsed_1 = parse_sync_word(sync_1)
parsed_2 = parse_sync_word(sync_2)
# Cores should be bitwise complements (within 15 bits)
assert (parsed_1["core"] ^ parsed_2["core"]) == 0x7FFF
def test_custom_payload(self):
"""Injected data bytes should appear in the data portion of the frame."""
engine = FrameSourceEngine()
payload = bytes([0xAA, 0x55, 0xDE, 0xAD])
bits = engine.next_frame(data=payload)
# Data starts after the 32-bit sync word
data_start = PCM_SYNC_WORD_LENGTH
for byte_idx, expected_byte in enumerate(payload):
byte_bits = bits[data_start + byte_idx * 8 : data_start + (byte_idx + 1) * 8]
recovered = 0
for b in byte_bits:
recovered = (recovered << 1) | b
assert recovered == expected_byte, (
f"Byte {byte_idx}: expected 0x{expected_byte:02x}, got 0x{recovered:02x}"
)
def test_default_zero_fill(self):
"""Without explicit data, payload should be zero-filled."""
engine = FrameSourceEngine()
bits = engine.next_frame()
# All data bits after sync should be zero
data_bits = bits[PCM_SYNC_WORD_LENGTH:]
assert all(b == 0 for b in data_bits)
@pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
class TestPCMFrameSourceBlock:
"""Test the GNU Radio sync_block wrapper."""
def test_block_instantiation(self):
"""Block should instantiate with default parameters."""
from apollo.pcm_frame_source import pcm_frame_source
src = pcm_frame_source()
assert src is not None
def test_produces_output(self):
"""Source should produce a stream of 0s and 1s."""
from apollo.pcm_frame_source import pcm_frame_source
tb = gr.top_block()
n_samples = 2048
src = pcm_frame_source(bit_rate=51200)
head = blocks.head(gr.sizeof_char, n_samples)
snk = blocks.vector_sink_b()
tb.connect(src, head, snk)
tb.run()
data = np.array(snk.data(), dtype=np.uint8)
assert len(data) == n_samples
# All values should be 0 or 1
assert np.all((data == 0) | (data == 1))
def test_frame_boundary(self):
"""Getting exactly one frame's worth of bits should work."""
from apollo.pcm_frame_source import pcm_frame_source
tb = gr.top_block()
frame_bits = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
src = pcm_frame_source(bit_rate=51200)
head = blocks.head(gr.sizeof_char, frame_bits)
snk = blocks.vector_sink_b()
tb.connect(src, head, snk)
tb.run()
data = snk.data()
assert len(data) == frame_bits
def test_continuous_stream(self):
"""Multiple frames should produce the expected total length."""
from apollo.pcm_frame_source import pcm_frame_source
tb = gr.top_block()
n_frames = 5
frame_bits = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
total_bits = n_frames * frame_bits
src = pcm_frame_source(bit_rate=51200)
head = blocks.head(gr.sizeof_char, total_bits)
snk = blocks.vector_sink_b()
tb.connect(src, head, snk)
tb.run()
data = snk.data()
assert len(data) == total_bits
def test_low_rate(self):
"""Low-rate source should produce 200-word frames."""
from apollo.pcm_frame_source import pcm_frame_source
tb = gr.top_block()
frame_bits = PCM_LOW_WORDS_PER_FRAME * PCM_WORD_LENGTH
src = pcm_frame_source(bit_rate=1600)
head = blocks.head(gr.sizeof_char, frame_bits)
snk = blocks.vector_sink_b()
tb.connect(src, head, snk)
tb.run()
data = snk.data()
assert len(data) == frame_bits