Implement the transmit/generate side as streaming GNU Radio blocks, complementing the existing receive chain. Each block maps to a physical instrument on CuriousMarc's Keysight bench: pcm_frame_source - PCM bit stream generator (sync_block + FrameSourceEngine) nrz_encoder - bits to NRZ waveform (+1/-1) with upsampling bpsk_subcarrier_mod - NRZ x cos(1.024 MHz) BPSK modulator fm_voice_subcarrier_mod - 1.25 MHz FM test tone source pm_mod - phase modulator: exp(j * deviation * input) usb_signal_source - convenience wrapper wiring all blocks together Includes GRC YAML definitions for all blocks under [Apollo USB] category, 49 new tests (271 total, all passing), and a loopback test that validates the full TX->RX round trip including frame recovery with 30 dB AWGN.
198 lines
6.6 KiB
Python
198 lines
6.6 KiB
Python
"""Tests for the BPSK subcarrier modulator block."""
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
try:
|
|
from gnuradio import blocks, gr
|
|
|
|
HAS_GNURADIO = True
|
|
except ImportError:
|
|
HAS_GNURADIO = False
|
|
|
|
from apollo.constants import PCM_SUBCARRIER_HZ, SAMPLE_RATE_BASEBAND
|
|
|
|
pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
|
|
|
|
|
|
class TestBPSKSubcarrierMod:
|
|
"""Test BPSK subcarrier modulation with synthetic NRZ inputs."""
|
|
|
|
def test_block_instantiation(self):
|
|
"""Block should instantiate with default parameters."""
|
|
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
|
|
|
|
mod = bpsk_subcarrier_mod()
|
|
assert mod is not None
|
|
assert mod.subcarrier_freq == PCM_SUBCARRIER_HZ
|
|
assert mod.sample_rate == SAMPLE_RATE_BASEBAND
|
|
|
|
def test_constant_positive_input(self):
|
|
"""All +1.0 input should produce a pure cosine at 1.024 MHz."""
|
|
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
|
|
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
n_samples = 51200
|
|
|
|
tb = gr.top_block()
|
|
src = blocks.vector_source_f([1.0] * n_samples)
|
|
mod = bpsk_subcarrier_mod(
|
|
subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate,
|
|
)
|
|
snk = blocks.vector_sink_f()
|
|
|
|
tb.connect(src, mod, snk)
|
|
tb.run()
|
|
|
|
data = np.array(snk.data())
|
|
assert len(data) == n_samples
|
|
|
|
# FFT: spectral energy should concentrate at 1.024 MHz
|
|
fft_vals = np.fft.fft(data)
|
|
freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate)
|
|
|
|
pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000)
|
|
pcm_power = np.mean(np.abs(fft_vals[pcm_mask]) ** 2)
|
|
total_power = np.mean(np.abs(fft_vals) ** 2)
|
|
|
|
assert pcm_power > total_power * 0.1, (
|
|
f"PCM band power ({pcm_power:.1f}) is less than 10% of "
|
|
f"total power ({total_power:.1f})"
|
|
)
|
|
|
|
def test_constant_negative_input(self):
|
|
"""All -1.0 input should produce -cos (inverted cosine) at 1.024 MHz."""
|
|
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
|
|
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
n_samples = 51200
|
|
|
|
tb = gr.top_block()
|
|
src = blocks.vector_source_f([-1.0] * n_samples)
|
|
mod = bpsk_subcarrier_mod(
|
|
subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate,
|
|
)
|
|
snk = blocks.vector_sink_f()
|
|
|
|
tb.connect(src, mod, snk)
|
|
tb.run()
|
|
|
|
data = np.array(snk.data())
|
|
assert len(data) == n_samples
|
|
|
|
# Inverted cosine still has energy at 1.024 MHz
|
|
fft_vals = np.fft.fft(data)
|
|
freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate)
|
|
|
|
pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000)
|
|
pcm_power = np.mean(np.abs(fft_vals[pcm_mask]) ** 2)
|
|
total_power = np.mean(np.abs(fft_vals) ** 2)
|
|
|
|
assert pcm_power > total_power * 0.1, (
|
|
f"PCM band power ({pcm_power:.1f}) is less than 10% of "
|
|
f"total power ({total_power:.1f})"
|
|
)
|
|
|
|
def test_alternating_input_spectrum(self):
|
|
"""Alternating +1/-1 NRZ should still have spectral peak near subcarrier."""
|
|
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
|
|
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
# Samples per bit at high rate: 5_120_000 / 51_200 = 100
|
|
samples_per_bit = int(sample_rate / 51_200)
|
|
n_bits = 512
|
|
n_samples = n_bits * samples_per_bit
|
|
|
|
# Build alternating NRZ: +1 for 100 samples, -1 for 100, ...
|
|
nrz = []
|
|
for i in range(n_bits):
|
|
val = 1.0 if i % 2 == 0 else -1.0
|
|
nrz.extend([val] * samples_per_bit)
|
|
|
|
tb = gr.top_block()
|
|
src = blocks.vector_source_f(nrz)
|
|
mod = bpsk_subcarrier_mod(
|
|
subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate,
|
|
)
|
|
snk = blocks.vector_sink_f()
|
|
|
|
tb.connect(src, mod, snk)
|
|
tb.run()
|
|
|
|
data = np.array(snk.data())
|
|
assert len(data) == n_samples
|
|
|
|
# BPSK with alternating data spreads energy around subcarrier +/- bit_rate,
|
|
# but the band near 1.024 MHz should still carry significant power
|
|
fft_vals = np.fft.fft(data)
|
|
freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate)
|
|
|
|
pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000)
|
|
pcm_power = np.mean(np.abs(fft_vals[pcm_mask]) ** 2)
|
|
total_power = np.mean(np.abs(fft_vals) ** 2)
|
|
|
|
assert pcm_power > total_power * 0.1, (
|
|
f"PCM band power ({pcm_power:.1f}) is less than 10% of "
|
|
f"total power ({total_power:.1f})"
|
|
)
|
|
|
|
def test_amplitude_bounded(self):
|
|
"""Output amplitude should be <= 1.0 (product of +/-1 and cos)."""
|
|
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
|
|
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
n_samples = 51200
|
|
|
|
tb = gr.top_block()
|
|
src = blocks.vector_source_f([1.0] * n_samples)
|
|
mod = bpsk_subcarrier_mod(
|
|
subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate,
|
|
)
|
|
snk = blocks.vector_sink_f()
|
|
|
|
tb.connect(src, mod, snk)
|
|
tb.run()
|
|
|
|
data = np.array(snk.data())
|
|
peak = np.max(np.abs(data))
|
|
|
|
# cos has peak 1.0, NRZ is +/-1.0, product peak should be ~1.0
|
|
assert peak <= 1.0 + 1e-6, (
|
|
f"Output peak amplitude {peak:.6f} exceeds 1.0"
|
|
)
|
|
assert peak > 0.9, (
|
|
f"Output peak amplitude {peak:.6f} is suspiciously low"
|
|
)
|
|
|
|
def test_custom_subcarrier_freq(self):
|
|
"""Custom subcarrier frequency should shift spectral peak."""
|
|
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
|
|
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
n_samples = 51200
|
|
custom_freq = 500_000 # 500 kHz
|
|
|
|
tb = gr.top_block()
|
|
src = blocks.vector_source_f([1.0] * n_samples)
|
|
mod = bpsk_subcarrier_mod(
|
|
subcarrier_freq=custom_freq, sample_rate=sample_rate,
|
|
)
|
|
snk = blocks.vector_sink_f()
|
|
|
|
tb.connect(src, mod, snk)
|
|
tb.run()
|
|
|
|
data = np.array(snk.data())
|
|
fft_vals = np.fft.fft(data)
|
|
freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate)
|
|
|
|
# Energy should be near 500 kHz, not 1.024 MHz
|
|
custom_mask = (np.abs(freqs) > 450_000) & (np.abs(freqs) < 550_000)
|
|
custom_power = np.mean(np.abs(fft_vals[custom_mask]) ** 2)
|
|
total_power = np.mean(np.abs(fft_vals) ** 2)
|
|
|
|
assert custom_power > total_power * 0.1, (
|
|
f"Custom freq band power ({custom_power:.1f}) is less than 10% of "
|
|
f"total power ({total_power:.1f})"
|
|
)
|