gr-apollo/tests/test_usb_signal_source.py
Ryan Malloy 493c21c511 Add transmit chain: 6 composable GR source blocks mirroring CuriousMarc bench
Implement the transmit/generate side as streaming GNU Radio blocks,
complementing the existing receive chain. Each block maps to a physical
instrument on CuriousMarc's Keysight bench:

  pcm_frame_source  - PCM bit stream generator (sync_block + FrameSourceEngine)
  nrz_encoder       - bits to NRZ waveform (+1/-1) with upsampling
  bpsk_subcarrier_mod - NRZ x cos(1.024 MHz) BPSK modulator
  fm_voice_subcarrier_mod - 1.25 MHz FM test tone source
  pm_mod            - phase modulator: exp(j * deviation * input)
  usb_signal_source - convenience wrapper wiring all blocks together

Includes GRC YAML definitions for all blocks under [Apollo USB] category,
49 new tests (271 total, all passing), and a loopback test that validates
the full TX->RX round trip including frame recovery with 30 dB AWGN.
2026-02-21 18:55:50 -07:00

114 lines
3.9 KiB
Python

"""Tests for the USB signal source (complete transmit chain)."""
import numpy as np
import pytest
try:
from gnuradio import blocks, gr
HAS_GNURADIO = True
except ImportError:
HAS_GNURADIO = False
from apollo.constants import (
PCM_HIGH_BIT_RATE,
PCM_HIGH_WORDS_PER_FRAME,
PCM_WORD_LENGTH,
SAMPLE_RATE_BASEBAND,
)
pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
class TestUSBSignalSource:
"""Test the convenience transmit wrapper."""
def _get_samples(self, n_samples, **kwargs):
"""Helper: run usb_signal_source and return complex samples."""
from apollo.usb_signal_source import usb_signal_source
tb = gr.top_block()
src = usb_signal_source(**kwargs)
head = blocks.head(gr.sizeof_gr_complex, n_samples)
snk = blocks.vector_sink_c()
tb.connect(src, head, snk)
tb.run()
return np.array(snk.data())
def test_block_instantiation(self):
"""Block should instantiate with default parameters."""
from apollo.usb_signal_source import usb_signal_source
src = usb_signal_source()
assert src is not None
def test_produces_complex_output(self):
"""Output should be complex-valued samples."""
n_samples = 51200 # ~10ms worth
data = self._get_samples(n_samples)
assert len(data) == n_samples
assert data.dtype == np.complex128 or data.dtype == np.complex64
def test_constant_envelope(self):
"""PM signal without noise should have near-constant envelope."""
n_samples = 102400 # 1 frame worth
data = self._get_samples(n_samples, snr_db=None)
envelope = np.abs(data)
# PM output: |exp(j*phi)| = 1.0 always
np.testing.assert_allclose(envelope, 1.0, atol=1e-4)
def test_spectral_content_pcm(self):
"""FFT of demodulated phase should show energy at 1.024 MHz."""
n_samples = 102400
data = self._get_samples(n_samples, snr_db=None)
# Extract phase (equivalent to PM demod)
phase = np.angle(data)
fft = np.fft.fft(phase)
freqs = np.fft.fftfreq(len(phase), 1.0 / SAMPLE_RATE_BASEBAND)
# Energy near 1.024 MHz
pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000)
pcm_power = np.mean(np.abs(fft[pcm_mask]) ** 2)
total_power = np.mean(np.abs(fft) ** 2)
assert pcm_power > total_power * 0.01
def test_with_voice(self):
"""With voice enabled, output should still be constant envelope."""
n_samples = 51200
data = self._get_samples(n_samples, voice_enabled=True, snr_db=None)
envelope = np.abs(data)
np.testing.assert_allclose(envelope, 1.0, atol=1e-4)
def test_with_noise(self):
"""With noise, envelope should vary (not constant)."""
n_samples = 51200
data = self._get_samples(n_samples, snr_db=10.0)
envelope = np.abs(data)
# With noise, std(envelope) should be > 0
assert np.std(envelope) > 0.01
def test_voice_spectral_content(self):
"""With voice, phase should contain 1.25 MHz energy."""
n_samples = 102400
data = self._get_samples(n_samples, voice_enabled=True, snr_db=None)
phase = np.angle(data)
fft = np.fft.fft(phase)
freqs = np.fft.fftfreq(len(phase), 1.0 / SAMPLE_RATE_BASEBAND)
# Energy near 1.25 MHz
voice_mask = (np.abs(freqs) > 1_200_000) & (np.abs(freqs) < 1_300_000)
voice_power = np.mean(np.abs(fft[voice_mask]) ** 2)
assert voice_power > 0
def test_frame_duration(self):
"""One frame at 51.2 kbps should produce the right number of samples."""
bits_per_frame = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
samples_per_frame = int(bits_per_frame * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
data = self._get_samples(samples_per_frame)
assert len(data) == samples_per_frame