gr-apollo/tests/test_pm_mod.py
Ryan Malloy 493c21c511 Add transmit chain: 6 composable GR source blocks mirroring CuriousMarc bench
Implement the transmit/generate side as streaming GNU Radio blocks,
complementing the existing receive chain. Each block maps to a physical
instrument on CuriousMarc's Keysight bench:

  pcm_frame_source  - PCM bit stream generator (sync_block + FrameSourceEngine)
  nrz_encoder       - bits to NRZ waveform (+1/-1) with upsampling
  bpsk_subcarrier_mod - NRZ x cos(1.024 MHz) BPSK modulator
  fm_voice_subcarrier_mod - 1.25 MHz FM test tone source
  pm_mod            - phase modulator: exp(j * deviation * input)
  usb_signal_source - convenience wrapper wiring all blocks together

Includes GRC YAML definitions for all blocks under [Apollo USB] category,
49 new tests (271 total, all passing), and a loopback test that validates
the full TX->RX round trip including frame recovery with 30 dB AWGN.
2026-02-21 18:55:50 -07:00

147 lines
4.6 KiB
Python

"""Tests for the PM modulator block."""
import numpy as np
import pytest
try:
from gnuradio import blocks, gr
HAS_GNURADIO = True
except ImportError:
HAS_GNURADIO = False
from apollo.constants import PM_PEAK_DEVIATION_RAD, SAMPLE_RATE_BASEBAND
pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
class TestPMMod:
"""Test PM modulation with synthetic signals."""
def test_zero_input_constant_envelope(self):
"""Zero input should produce exp(j*0) = 1+0j (unit carrier)."""
from apollo.pm_mod import pm_mod
tb = gr.top_block()
n_samples = 10000
data = [0.0] * n_samples
src = blocks.vector_source_f(data)
mod = pm_mod(pm_deviation=PM_PEAK_DEVIATION_RAD, sample_rate=SAMPLE_RATE_BASEBAND)
snk = blocks.vector_sink_c()
tb.connect(src, mod, snk)
tb.run()
output = np.array(snk.data())
assert len(output) == n_samples
# Magnitude should be 1.0 (constant envelope)
magnitudes = np.abs(output)
np.testing.assert_allclose(magnitudes, 1.0, atol=1e-6)
# Phase should be 0 (no modulation)
phases = np.angle(output)
np.testing.assert_allclose(phases, 0.0, atol=1e-6)
def test_sine_input_phase_deviation(self):
"""Sine wave input should produce phase swinging +/- pm_deviation."""
from apollo.pm_mod import pm_mod
tb = gr.top_block()
n_samples = 100000
sample_rate = SAMPLE_RATE_BASEBAND
# Unit-amplitude sine at 10 kHz as modulating signal
t = np.arange(n_samples, dtype=np.float64) / sample_rate
tone_freq = 10000.0
modulating = np.sin(2 * np.pi * tone_freq * t).astype(np.float32)
src = blocks.vector_source_f(modulating.tolist())
mod = pm_mod(pm_deviation=PM_PEAK_DEVIATION_RAD, sample_rate=sample_rate)
snk = blocks.vector_sink_c()
tb.connect(src, mod, snk)
tb.run()
output = np.array(snk.data())
phases = np.angle(output)
# Peak phase should be approximately pm_deviation
peak_phase = np.max(np.abs(phases))
assert peak_phase == pytest.approx(PM_PEAK_DEVIATION_RAD, abs=0.01), (
f"Peak phase {peak_phase} doesn't match deviation {PM_PEAK_DEVIATION_RAD}"
)
def test_constant_envelope(self):
"""PM output should always have |s(t)| = 1.0 regardless of input."""
from apollo.pm_mod import pm_mod
tb = gr.top_block()
n_samples = 50000
sample_rate = SAMPLE_RATE_BASEBAND
# Arbitrary varying input: sum of two tones
t = np.arange(n_samples, dtype=np.float64) / sample_rate
modulating = (
0.7 * np.sin(2 * np.pi * 5000 * t) + 0.3 * np.cos(2 * np.pi * 20000 * t)
).astype(np.float32)
src = blocks.vector_source_f(modulating.tolist())
mod = pm_mod(pm_deviation=PM_PEAK_DEVIATION_RAD, sample_rate=sample_rate)
snk = blocks.vector_sink_c()
tb.connect(src, mod, snk)
tb.run()
output = np.array(snk.data())
magnitudes = np.abs(output)
np.testing.assert_allclose(magnitudes, 1.0, atol=1e-6)
def test_custom_deviation(self):
"""Custom pm_deviation should scale output phase accordingly."""
from apollo.pm_mod import pm_mod
tb = gr.top_block()
n_samples = 100000
sample_rate = SAMPLE_RATE_BASEBAND
custom_dev = 0.5
# Unit-amplitude sine
t = np.arange(n_samples, dtype=np.float64) / sample_rate
tone_freq = 10000.0
modulating = np.sin(2 * np.pi * tone_freq * t).astype(np.float32)
src = blocks.vector_source_f(modulating.tolist())
mod = pm_mod(pm_deviation=custom_dev, sample_rate=sample_rate)
snk = blocks.vector_sink_c()
tb.connect(src, mod, snk)
tb.run()
output = np.array(snk.data())
phases = np.angle(output)
peak_phase = np.max(np.abs(phases))
assert peak_phase == pytest.approx(custom_dev, abs=0.02), (
f"Peak phase {peak_phase} doesn't match custom deviation {custom_dev}"
)
def test_block_instantiation(self):
"""Block should instantiate with default parameters."""
from apollo.pm_mod import pm_mod
mod = pm_mod()
assert mod is not None
assert mod.get_pm_deviation() == PM_PEAK_DEVIATION_RAD
def test_set_pm_deviation(self):
"""Runtime deviation update should take effect."""
from apollo.pm_mod import pm_mod
mod = pm_mod()
assert mod.get_pm_deviation() == PM_PEAK_DEVIATION_RAD
mod.set_pm_deviation(0.25)
assert mod.get_pm_deviation() == 0.25