Add transmit chain: 6 composable GR source blocks mirroring CuriousMarc bench

Implement the transmit/generate side as streaming GNU Radio blocks,
complementing the existing receive chain. Each block maps to a physical
instrument on CuriousMarc's Keysight bench:

  pcm_frame_source  - PCM bit stream generator (sync_block + FrameSourceEngine)
  nrz_encoder       - bits to NRZ waveform (+1/-1) with upsampling
  bpsk_subcarrier_mod - NRZ x cos(1.024 MHz) BPSK modulator
  fm_voice_subcarrier_mod - 1.25 MHz FM test tone source
  pm_mod            - phase modulator: exp(j * deviation * input)
  usb_signal_source - convenience wrapper wiring all blocks together

Includes GRC YAML definitions for all blocks under [Apollo USB] category,
49 new tests (271 total, all passing), and a loopback test that validates
the full TX->RX round trip including frame recovery with 30 dB AWGN.
This commit is contained in:
Ryan Malloy 2026-02-21 18:55:50 -07:00
parent 0dffcdbb54
commit 493c21c511
20 changed files with 1993 additions and 10 deletions

View File

@ -0,0 +1,47 @@
id: apollo_bpsk_subcarrier_mod
label: Apollo BPSK Subcarrier Mod
category: '[Apollo USB]'
flags: [python]
parameters:
- id: subcarrier_freq
label: Subcarrier Frequency (Hz)
dtype: real
default: '1024000'
- id: sample_rate
label: Sample Rate (Hz)
dtype: real
default: '5120000'
inputs:
- label: in
domain: stream
dtype: float
outputs:
- label: out
domain: stream
dtype: float
templates:
imports: from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
make: apollo.bpsk_subcarrier_mod.bpsk_subcarrier_mod(subcarrier_freq=${subcarrier_freq}, sample_rate=${sample_rate})
documentation: |-
Apollo BPSK Subcarrier Modulator
Multiplies an NRZ baseband waveform (+1/-1) by a 1.024 MHz cosine to produce
a BPSK-modulated subcarrier: output(t) = nrz(t) * cos(2*pi*f_sc*t).
The cosine phase flips 180 degrees at each NRZ sign change, implementing
bi-phase shift keying. This is the transmit-side counterpart to the
Apollo BPSK Subcarrier Demod block.
On the real spacecraft, the PCM encoder drives the BPSK subcarrier modulator
before summing with the voice subcarrier for PM transmission.
Parameters:
subcarrier_freq: BPSK subcarrier frequency in Hz (default 1.024 MHz)
sample_rate: Sample rate in Hz (default 5.12 MHz)
file_format: 1

View File

@ -0,0 +1,58 @@
id: apollo_fm_voice_subcarrier_mod
label: Apollo FM Voice Subcarrier Mod
category: '[Apollo USB]'
flags: [python]
parameters:
- id: sample_rate
label: Sample Rate (Hz)
dtype: real
default: '5120000'
- id: subcarrier_freq
label: Subcarrier Frequency (Hz)
dtype: real
default: '1250000'
- id: fm_deviation
label: FM Deviation (Hz)
dtype: real
default: '29000'
- id: tone_freq
label: Test Tone Frequency (Hz)
dtype: real
default: '1000'
outputs:
- label: out
domain: stream
dtype: float
templates:
imports: from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
make: >-
apollo.fm_voice_subcarrier_mod.fm_voice_subcarrier_mod(
sample_rate=${sample_rate},
subcarrier_freq=${subcarrier_freq},
fm_deviation=${fm_deviation},
tone_freq=${tone_freq})
documentation: |-
Apollo FM Voice Subcarrier Modulator
Source block that generates a 1.25 MHz FM subcarrier with a sinusoidal test
tone. Transmit-side counterpart to the voice subcarrier demodulator.
The signal chain is: test tone -> FM modulator -> upconvert to 1.25 MHz
-> real-valued output. With default parameters this produces a signal
matching the Apollo USB downlink voice subcarrier (+/-29 kHz deviation).
On the real spacecraft, voice audio (300-3000 Hz) drives an FM VCO at
113 kHz, which is mixed with the 512 kHz master clock and doubled to
produce the 1.25 MHz FM subcarrier.
Parameters:
sample_rate: Output sample rate in Hz (default 5.12 MHz)
subcarrier_freq: FM subcarrier center frequency in Hz (default 1.25 MHz)
fm_deviation: FM deviation in Hz (default +/-29 kHz)
tone_freq: Internal test tone frequency in Hz (default 1 kHz)
file_format: 1

View File

@ -0,0 +1,52 @@
id: apollo_nrz_encoder
label: Apollo NRZ Encoder
category: '[Apollo USB]'
flags: [python]
parameters:
- id: bit_rate
label: Bit Rate (bps)
dtype: int
default: '51200'
options: ['51200', '1600']
option_labels: ['High (51.2 kbps)', 'Low (1.6 kbps)']
- id: sample_rate
label: Sample Rate
dtype: real
default: '5120000'
inputs:
- label: in
domain: stream
dtype: byte
outputs:
- label: out
domain: stream
dtype: float
templates:
imports: from apollo.nrz_encoder import nrz_encoder
make: apollo.nrz_encoder.nrz_encoder(bit_rate=${bit_rate}, sample_rate=${sample_rate})
documentation: |-
Apollo NRZ Encoder
Converts a stream of byte values (0 or 1) to a Non-Return-to-Zero
baseband waveform at the specified sample rate.
Mapping:
bit 1 -> +1.0 (held for bit period)
bit 0 -> -1.0 (held for bit period)
Each bit value is upsampled by samples_per_bit = sample_rate / bit_rate.
At the default high rate (51.2 kbps, 5.12 MHz), this is 100 samples
per bit. At low rate (1.6 kbps), it is 3200 samples per bit.
This is the transmit-side counterpart to the slicer in bpsk_demod.
Parameters:
bit_rate: PCM bit rate in bps (51200 high, 1600 low)
sample_rate: Output sample rate in Hz (default 5.12 MHz)
file_format: 1

View File

@ -0,0 +1,47 @@
id: apollo_pcm_frame_source
label: Apollo PCM Frame Source
category: '[Apollo USB]'
flags: [python]
parameters:
- id: bit_rate
label: Bit Rate (bps)
dtype: int
default: '51200'
options: ['51200', '1600']
option_labels: ['High (51.2 kbps)', 'Low (1.6 kbps)']
inputs:
- label: frame_data
domain: message
optional: true
outputs:
- label: out
domain: stream
dtype: byte
templates:
imports: from apollo.pcm_frame_source import pcm_frame_source
make: apollo.pcm_frame_source.pcm_frame_source(bit_rate=${bit_rate})
documentation: |-
Apollo PCM Frame Source
Generates a continuous stream of NRZ-encoded PCM telemetry frame bits
(byte values 0 or 1). Frame IDs cycle 1 through 50 automatically,
with the 15-bit sync core complemented on odd-numbered frames.
This is the transmit-side counterpart to the PCM Frame Sync block.
The sync word format is:
[5-bit A][15-bit core][6-bit B][6-bit frame ID]
An optional message input (frame_data) accepts u8vector payloads that
will be used as data words for the next generated frame. Without
injected data, frames carry zero-fill.
Parameters:
bit_rate: 51200 (128 words/frame, 50 fps) or 1600 (200 words/frame, 1 fps)
file_format: 1

View File

@ -0,0 +1,44 @@
id: apollo_pm_mod
label: Apollo PM Mod
category: '[Apollo USB]'
flags: [python]
parameters:
- id: pm_deviation
label: PM Deviation (rad)
dtype: real
default: '0.133'
- id: sample_rate
label: Sample Rate
dtype: real
default: '5120000'
inputs:
- label: in
domain: stream
dtype: float
outputs:
- label: out
domain: stream
dtype: complex
templates:
imports: from apollo.pm_mod import pm_mod
make: apollo.pm_mod.pm_mod(pm_deviation=${pm_deviation}, sample_rate=${sample_rate})
documentation: |-
Apollo PM Modulator
Applies phase modulation to produce complex baseband signal.
Takes a composite modulating signal (sum of subcarriers) and outputs
s(t) = exp(j * pm_deviation * input(t)).
The spacecraft PM deviation is 0.133 rad (7.6 degrees) peak.
This is the transmit-side counterpart to Apollo PM Demod.
Parameters:
pm_deviation: Peak phase deviation in radians (default 0.133)
sample_rate: Sample rate in Hz (default 5.12 MHz)
file_format: 1

View File

@ -0,0 +1,80 @@
id: apollo_usb_signal_source
label: Apollo USB Signal Source
category: '[Apollo USB]'
flags: [python]
parameters:
- id: sample_rate
label: Sample Rate (Hz)
dtype: float
default: '5120000'
- id: bit_rate
label: PCM Bit Rate
dtype: int
default: '51200'
options: ['51200', '1600']
option_labels: ['51.2 kbps (high rate)', '1.6 kbps (low rate)']
- id: pm_deviation
label: PM Deviation (rad)
dtype: float
default: '0.133'
- id: voice_enabled
label: Voice Subcarrier
dtype: bool
default: 'False'
options: ['True', 'False']
option_labels: ['Enabled', 'Disabled']
- id: voice_tone_hz
label: Voice Test Tone (Hz)
dtype: float
default: '1000'
- id: snr_db
label: SNR (dB)
dtype: raw
default: 'None'
inputs:
- label: frame_data
domain: message
optional: true
outputs:
- label: out
domain: stream
dtype: complex
templates:
imports: from apollo.usb_signal_source import usb_signal_source
make: >-
apollo.usb_signal_source.usb_signal_source(
sample_rate=${sample_rate},
bit_rate=${bit_rate},
pm_deviation=${pm_deviation},
voice_enabled=${voice_enabled},
voice_tone_hz=${voice_tone_hz},
snr_db=${snr_db})
documentation: |-
Apollo USB Signal Source -- complete transmit chain in one block.
Generates a PM-modulated complex baseband signal containing:
- 1.024 MHz BPSK subcarrier with PCM telemetry frames
- Optional 1.25 MHz FM voice subcarrier (test tone)
- Optional AWGN noise
This is the transmit-side counterpart to the USB Downlink Receiver.
It mirrors CuriousMarc's bench: individual composable blocks wired
together as one convenience wrapper.
Message input:
frame_data -- inject custom payload bytes for the next PCM frame
Parameters:
sample_rate: Output sample rate (default 5.12 MHz)
bit_rate: PCM bit rate -- 51200 (high) or 1600 (low)
pm_deviation: Peak PM deviation in radians (default 0.133)
voice_enabled: Include 1.25 MHz FM voice subcarrier
voice_tone_hz: Voice test tone frequency in Hz
snr_db: Add AWGN noise at this SNR (None = no noise)
file_format: 1

View File

@ -1,7 +1,7 @@
"""
gr-apollo: Apollo Unified S-Band decoder for GNU Radio 3.10+
gr-apollo: Apollo Unified S-Band for GNU Radio 3.10+
Decodes Apollo-era Unified S-Band (USB) telecommunications:
Receive and transmit Apollo-era Unified S-Band (USB) telecommunications:
- 2287.5 MHz downlink with PM/FM modulation
- 1.024 MHz BPSK subcarrier (PCM telemetry @ 51.2 kbps)
- 1.25 MHz FM subcarrier (voice)
@ -10,21 +10,19 @@ Decodes Apollo-era Unified S-Band (USB) telecommunications:
__version__ = "0.1.0"
# Pure-python modules (always available)
# Pure-python modules and engines (always available, no GR dependency)
from apollo import constants as constants
from apollo import protocol as protocol
# Pure-python engines (always available, no GR dependency)
from apollo.agc_bridge import AGCBridgeClient as AGCBridgeClient
from apollo.downlink_decoder import DownlinkEngine as DownlinkEngine
from apollo.pcm_demux import DemuxEngine as DemuxEngine
from apollo.pcm_frame_source import FrameSourceEngine as FrameSourceEngine
from apollo.pcm_frame_sync import FrameSyncEngine as FrameSyncEngine
from apollo.uplink_encoder import UplinkEncoder as UplinkEncoder
from apollo.usb_signal_gen import generate_usb_baseband as generate_usb_baseband
# GNU Radio blocks (require gnuradio runtime)
# These are imported lazily to allow the package to be used
# for its pure-python utilities without GNU Radio installed.
# GNU Radio receive-side blocks (require gnuradio runtime)
# Imported lazily so the package works without GNU Radio installed.
try:
from apollo.bpsk_demod import bpsk_demod as bpsk_demod
from apollo.bpsk_subcarrier_demod import bpsk_subcarrier_demod as bpsk_subcarrier_demod
@ -33,8 +31,19 @@ try:
from apollo.subcarrier_extract import subcarrier_extract as subcarrier_extract
from apollo.voice_subcarrier_demod import voice_subcarrier_demod as voice_subcarrier_demod
except ImportError:
pass # GNU Radio not available — Phase 1/3 GR blocks won't be importable
pass # GNU Radio not available — receive-side GR blocks won't be importable
# GNU Radio transmit-side blocks
try:
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod as bpsk_subcarrier_mod
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod as fm_voice_subcarrier_mod
from apollo.nrz_encoder import nrz_encoder as nrz_encoder
from apollo.pcm_frame_source import pcm_frame_source as pcm_frame_source
from apollo.pm_mod import pm_mod as pm_mod
except ImportError:
pass # GNU Radio not available — transmit-side GR blocks won't be importable
# GNU Radio composite blocks (depend on individual blocks above)
try:
from apollo.agc_bridge import agc_bridge as agc_bridge
from apollo.downlink_decoder import downlink_decoder as downlink_decoder
@ -42,5 +51,6 @@ try:
from apollo.pcm_frame_sync import pcm_frame_sync as pcm_frame_sync
from apollo.uplink_encoder import uplink_encoder as uplink_encoder
from apollo.usb_downlink_receiver import usb_downlink_receiver as usb_downlink_receiver
from apollo.usb_signal_source import usb_signal_source as usb_signal_source
except (ImportError, NameError):
pass # GNU Radio not available — Phase 2/4/5 GR blocks won't be importable
pass

View File

@ -0,0 +1,59 @@
"""
Apollo BPSK Subcarrier Modulator -- NRZ data onto 1.024 MHz subcarrier.
The transmit-side counterpart to bpsk_subcarrier_demod. Takes an NRZ baseband
waveform (+1/-1) and modulates it onto a 1.024 MHz cosine subcarrier via
simple multiplication: output(t) = nrz(t) * cos(2*pi*f_sc*t).
This is Bi-Phase Shift Keying (BPSK): the cosine phase flips 180 degrees
when the NRZ data changes sign.
On the real spacecraft, a 33522B AWG (or equivalent) generates this
BPSK-modulated subcarrier before summing with the voice subcarrier.
Reference: IMPLEMENTATION_SPEC.md section 4.2
"""
from gnuradio import analog, blocks, gr
from apollo.constants import PCM_SUBCARRIER_HZ, SAMPLE_RATE_BASEBAND
class bpsk_subcarrier_mod(gr.hier_block2):
"""BPSK modulator: NRZ float input -> BPSK subcarrier float output."""
def __init__(
self,
subcarrier_freq: float = PCM_SUBCARRIER_HZ,
sample_rate: float = SAMPLE_RATE_BASEBAND,
):
gr.hier_block2.__init__(
self,
"apollo_bpsk_subcarrier_mod",
gr.io_signature(1, 1, gr.sizeof_float),
gr.io_signature(1, 1, gr.sizeof_float),
)
self._subcarrier_freq = subcarrier_freq
self._sample_rate = sample_rate
# 1.024 MHz cosine subcarrier (continuous phase, maintained by sig_source)
self.carrier = analog.sig_source_f(
sample_rate, analog.GR_COS_WAVE, subcarrier_freq, 1.0, 0,
)
# Multiply NRZ data by subcarrier
self.mixer = blocks.multiply_ff(1)
# Connect: input (NRZ) -> mixer port 0, carrier -> mixer port 1 -> output
self.connect(self, (self.mixer, 0))
self.connect(self.carrier, (self.mixer, 1))
self.connect(self.mixer, self)
@property
def subcarrier_freq(self) -> float:
return self._subcarrier_freq
@property
def sample_rate(self) -> float:
return self._sample_rate

View File

@ -0,0 +1,93 @@
"""
Apollo FM Voice Subcarrier Modulator -- generates 1.25 MHz FM test tone.
Transmit-side counterpart to voice_subcarrier_demod. Generates a sinusoidal
test tone, FM-modulates it onto a 1.25 MHz subcarrier, and outputs the
real-valued subcarrier signal.
On the real spacecraft, voice audio (300-3000 Hz) drives an FM VCO at 113 kHz,
which is mixed with the 512 kHz master clock and doubled to produce the
1.25 MHz FM subcarrier with +/-29 kHz deviation.
For testing, an internal sine-wave test tone replaces live audio.
Reference: IMPLEMENTATION_SPEC.md section 4.2
"""
import math
from gnuradio import analog, blocks, gr
from apollo.constants import (
SAMPLE_RATE_BASEBAND,
VOICE_FM_DEVIATION_HZ,
VOICE_SUBCARRIER_HZ,
)
class fm_voice_subcarrier_mod(gr.hier_block2):
"""Source block: FM-modulated 1.25 MHz voice subcarrier with test tone.
Outputs:
float -- real-valued FM subcarrier at subcarrier_freq
"""
def __init__(
self,
sample_rate: float = SAMPLE_RATE_BASEBAND,
subcarrier_freq: float = VOICE_SUBCARRIER_HZ,
fm_deviation: float = VOICE_FM_DEVIATION_HZ,
tone_freq: float = 1000.0,
):
gr.hier_block2.__init__(
self,
"apollo_fm_voice_subcarrier_mod",
gr.io_signature(0, 0, 0), # source -- no input
gr.io_signature(1, 1, gr.sizeof_float), # float output
)
self._sample_rate = sample_rate
self._tone_freq = tone_freq
self._subcarrier_freq = subcarrier_freq
self._fm_deviation = fm_deviation
# Audio test tone generator
self.tone = analog.sig_source_f(
sample_rate, analog.GR_SIN_WAVE, tone_freq, 1.0, 0,
)
# FM modulator: sensitivity = 2*pi*deviation/sample_rate rad/sample
# With unit-amplitude sine input this gives +/-fm_deviation Hz.
fm_sensitivity = 2.0 * math.pi * fm_deviation / sample_rate
self.fm_mod = analog.frequency_modulator_fc(fm_sensitivity)
# LO at subcarrier frequency for upconversion
self.lo = analog.sig_source_c(
sample_rate, analog.GR_COS_WAVE, subcarrier_freq, 1.0, 0,
)
# Mixer: FM baseband x LO -> subcarrier
self.mixer = blocks.multiply_cc(1)
# Extract real part for float output
self.to_real = blocks.complex_to_real(1)
# Connect: tone -> FM mod -> mixer(x LO) -> real
self.connect(self.tone, self.fm_mod, (self.mixer, 0))
self.connect(self.lo, (self.mixer, 1))
self.connect(self.mixer, self.to_real, self)
@property
def tone_freq(self) -> float:
"""Test tone frequency in Hz."""
return self._tone_freq
@property
def subcarrier_freq(self) -> float:
"""Subcarrier center frequency in Hz."""
return self._subcarrier_freq
@property
def fm_deviation(self) -> float:
"""FM deviation in Hz."""
return self._fm_deviation

61
src/apollo/nrz_encoder.py Normal file
View File

@ -0,0 +1,61 @@
"""
Apollo NRZ Encoder -- converts PCM bit stream to NRZ baseband waveform.
Takes a stream of byte values (0 or 1) from pcm_frame_source and produces
a float waveform at the output sample rate where bit 1 -> +1.0 and
bit 0 -> -1.0, with each bit repeated for samples_per_bit samples.
This is the transmit-side counterpart to the slicer in bpsk_demod.
NRZ (Non-Return-to-Zero) encoding maps:
bit 1 -> +1.0 (held for bit period)
bit 0 -> -1.0 (held for bit period)
Reference: IMPLEMENTATION_SPEC.md section 5.1
"""
from gnuradio import blocks, gr
from apollo.constants import PCM_HIGH_BIT_RATE, SAMPLE_RATE_BASEBAND
class nrz_encoder(gr.hier_block2):
"""NRZ line encoder: byte (0/1) stream -> float (+1/-1) waveform.
Input: byte stream (values 0 or 1)
Output: float NRZ waveform at sample_rate
"""
def __init__(
self,
bit_rate: int = PCM_HIGH_BIT_RATE,
sample_rate: float = SAMPLE_RATE_BASEBAND,
):
gr.hier_block2.__init__(
self,
"apollo_nrz_encoder",
gr.io_signature(1, 1, gr.sizeof_char),
gr.io_signature(1, 1, gr.sizeof_float),
)
self._bit_rate = bit_rate
self._sample_rate = sample_rate
samples_per_bit = int(sample_rate / bit_rate)
# byte (0/1) -> float (0.0/1.0)
self.to_float = blocks.char_to_float(1, 1)
# float (0.0/1.0) -> float (0.0/2.0)
self.scale = blocks.multiply_const_ff(2.0)
# float (0.0/2.0) -> float (-1.0/+1.0)
self.offset = blocks.add_const_ff(-1.0)
# Upsample: repeat each value samples_per_bit times
self.upsample = blocks.repeat(gr.sizeof_float, samples_per_bit)
# Connect chain
self.connect(self, self.to_float, self.scale, self.offset, self.upsample, self)
@property
def samples_per_bit(self) -> int:
return int(self._sample_rate / self._bit_rate)

View File

@ -0,0 +1,153 @@
"""
Apollo PCM Frame Source -- generates a continuous NRZ bit stream of PCM frames.
The transmit-side counterpart to pcm_frame_sync. Produces a steady stream of
128-word (high rate, 51.2 kbps) or 200-word (low rate, 1.6 kbps) PCM frames,
each beginning with the standard 32-bit sync word:
[5-bit A][15-bit core][6-bit B][6-bit frame ID]
Frame IDs cycle 1 through 50 (one subframe), with the 15-bit core complemented
on odd-numbered frames. An optional message input allows dynamic payload
injection; otherwise frames carry zero-fill data.
The core logic lives in FrameSourceEngine (pure Python, testable without GNU
Radio). The GR sync_block wrapper bridges frame-granularity generation with
GR's sample-granularity scheduler via an internal bit buffer.
Reference: IMPLEMENTATION_SPEC.md sections 5.1, 5.2
"""
from collections import deque
import numpy as np
from apollo.constants import (
PCM_HIGH_BIT_RATE,
PCM_HIGH_WORDS_PER_FRAME,
PCM_LOW_WORDS_PER_FRAME,
)
from apollo.usb_signal_gen import generate_pcm_frame
class FrameSourceEngine:
"""PCM frame generation engine (pure Python, no GR dependency).
Maintains a rolling frame counter (1-50) and generates complete frames
on demand via next_frame(). Odd-numbered frames get a complemented
sync core automatically.
Args:
bit_rate: PCM bit rate in bps (51200 or 1600).
"""
def __init__(self, bit_rate: int = PCM_HIGH_BIT_RATE):
self.bit_rate = bit_rate
if bit_rate == PCM_HIGH_BIT_RATE:
self.words_per_frame = PCM_HIGH_WORDS_PER_FRAME
else:
self.words_per_frame = PCM_LOW_WORDS_PER_FRAME
self.frame_counter = 1
def next_frame(self, data: bytes | None = None) -> list[int]:
"""Generate the next PCM frame as a list of bits (0/1 values, MSB first).
Args:
data: Optional payload bytes for data words. If None, the frame
carries zero-fill (deterministic, unlike the random fill in
generate_pcm_frame when data=None for signal-gen use).
Returns:
List of bit values, length = words_per_frame * 8.
"""
frame_id = self.frame_counter
odd = (frame_id % 2) == 1
# Default to zero-fill rather than random for a transmit source --
# downstream blocks and tests need deterministic output.
if data is None:
data = bytes(self.words_per_frame)
bits = generate_pcm_frame(
frame_id=frame_id,
odd=odd,
data=data,
words_per_frame=self.words_per_frame,
)
# Advance counter: 1 -> 2 -> ... -> 50 -> 1
self.frame_counter = (self.frame_counter % 50) + 1
return bits
# ---------------------------------------------------------------------------
# GNU Radio block wrapper (optional -- only if gnuradio is available)
# ---------------------------------------------------------------------------
try:
import pmt
from gnuradio import gr
class pcm_frame_source(gr.sync_block):
"""GNU Radio source block: continuous PCM frame bit stream.
Outputs a stream of bytes (values 0 or 1) representing NRZ-encoded
PCM telemetry frames. Frame IDs cycle 1-50 automatically.
An optional ``frame_data`` message input accepts PMT u8vector payloads
that will be used as the data words for the next generated frame.
Parameters:
bit_rate: 51200 (128 words/frame) or 1600 (200 words/frame).
"""
def __init__(self, bit_rate: int = PCM_HIGH_BIT_RATE):
gr.sync_block.__init__(
self,
name="apollo_pcm_frame_source",
in_sig=None,
out_sig=[np.byte],
)
self._engine = FrameSourceEngine(bit_rate=bit_rate)
self._bit_buffer: deque[int] = deque()
self._pending_data: bytes | None = None
# Message input for dynamic payload injection
self.message_port_register_in(pmt.intern("frame_data"))
self.set_msg_handler(
pmt.intern("frame_data"), self._handle_frame_data
)
def _handle_frame_data(self, msg):
"""Store incoming PMT payload bytes for the next frame."""
if pmt.is_u8vector(msg):
self._pending_data = bytes(pmt.u8vector_elements(msg))
elif pmt.is_pair(msg):
# Accept PDU (car=meta, cdr=payload)
payload = pmt.cdr(msg)
if pmt.is_u8vector(payload):
self._pending_data = bytes(pmt.u8vector_elements(payload))
def work(self, input_items, output_items):
out = output_items[0]
n_out = len(out)
produced = 0
while produced < n_out:
if not self._bit_buffer:
frame_bits = self._engine.next_frame(data=self._pending_data)
self._pending_data = None
self._bit_buffer.extend(frame_bits)
chunk = min(n_out - produced, len(self._bit_buffer))
for i in range(chunk):
out[produced + i] = self._bit_buffer.popleft()
produced += chunk
return produced
except ImportError:
pass

53
src/apollo/pm_mod.py Normal file
View File

@ -0,0 +1,53 @@
"""
Apollo PM Modulator applies phase modulation to produce complex baseband.
The transmit-side counterpart to pm_demod. Takes a composite modulating signal
(sum of subcarriers) and produces PM complex baseband: s(t) = exp(j * phi(t))
where phi(t) = pm_deviation * modulating(t).
The spacecraft transmitter phase-modulates at 0.133 rad peak deviation (7.6 deg).
At this small deviation, the modulation is essentially linear.
Reference: IMPLEMENTATION_SPEC.md section 2.3
"""
from gnuradio import analog, blocks, gr
from apollo.constants import PM_PEAK_DEVIATION_RAD, SAMPLE_RATE_BASEBAND
class pm_mod(gr.hier_block2):
"""Phase modulator: float input -> PM complex baseband output."""
def __init__(
self,
pm_deviation: float = PM_PEAK_DEVIATION_RAD,
sample_rate: float = SAMPLE_RATE_BASEBAND,
):
gr.hier_block2.__init__(
self,
"apollo_pm_mod",
gr.io_signature(1, 1, gr.sizeof_float),
gr.io_signature(1, 1, gr.sizeof_gr_complex),
)
self._pm_deviation = pm_deviation
# Scale input by PM deviation so phase_modulator sees deviation-scaled values
self.gain = blocks.multiply_const_ff(pm_deviation)
# Phase modulate: output = exp(j * 1.0 * input)
# Sensitivity is 1.0 because we pre-scale by pm_deviation above
self.modulator = analog.phase_modulator_fc(1.0)
# Connect: input -> gain -> phase_mod -> output
self.connect(self, self.gain, self.modulator, self)
def get_pm_deviation(self) -> float:
"""Return current PM deviation in radians."""
return self._pm_deviation
def set_pm_deviation(self, dev: float):
"""Update PM deviation at runtime."""
self._pm_deviation = dev
self.gain.set_k(dev)

View File

@ -0,0 +1,146 @@
"""
Apollo USB Signal Source -- complete transmit chain in one block.
The transmit-side counterpart to usb_downlink_receiver. Wires together the
full modulation chain:
pcm_frame_source -> nrz_encoder -> bpsk_subcarrier_mod -+-> add_ff -> pm_mod -> [complex out]
|
fm_voice_subcarrier_mod --------+
(optional, scaled by 1.68/2.2)
This mirrors CuriousMarc's physical bench topology: the individual composable
blocks map 1:1 to Keysight instruments (EXG signal generator for PM, two
33522B AWGs for subcarrier modulation).
For finer control, use the individual blocks directly.
Reference: IMPLEMENTATION_SPEC.md -- full downlink transmit path
"""
import math
from gnuradio import analog, blocks, gr
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
from apollo.constants import (
PCM_HIGH_BIT_RATE,
PCM_SUBCARRIER_HZ,
PM_PEAK_DEVIATION_RAD,
SAMPLE_RATE_BASEBAND,
VOICE_FM_DEVIATION_HZ,
VOICE_SUBCARRIER_HZ,
)
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
from apollo.nrz_encoder import nrz_encoder
from apollo.pcm_frame_source import pcm_frame_source
from apollo.pm_mod import pm_mod
class usb_signal_source(gr.hier_block2):
"""Apollo USB downlink signal source -- complex baseband output.
Outputs:
complex -- PM-modulated baseband at sample_rate (default 5.12 MHz)
Message inputs:
frame_data -- forwarded to pcm_frame_source for dynamic payload injection
The block generates PCM telemetry frames, NRZ-encodes them, BPSK-modulates
onto a 1.024 MHz subcarrier, optionally adds a 1.25 MHz FM voice subcarrier,
and applies PM modulation to produce complex baseband.
Optional AWGN noise can be added by setting snr_db to a finite value.
"""
def __init__(
self,
sample_rate: float = SAMPLE_RATE_BASEBAND,
bit_rate: int = PCM_HIGH_BIT_RATE,
pm_deviation: float = PM_PEAK_DEVIATION_RAD,
voice_enabled: bool = False,
voice_tone_hz: float = 1000.0,
snr_db: float | None = None,
):
gr.hier_block2.__init__(
self,
"apollo_usb_signal_source",
gr.io_signature(0, 0, 0), # source -- no input
gr.io_signature(1, 1, gr.sizeof_gr_complex),
)
self._sample_rate = sample_rate
self._voice_enabled = voice_enabled
# Forward the frame_data message port from pcm_frame_source
self.message_port_register_hier_in("frame_data")
# --- PCM telemetry path ---
# Stage 1: Generate PCM frame bits (0/1 byte stream)
self.frame_src = pcm_frame_source(bit_rate=bit_rate)
# Forward message port: hier input -> pcm_frame_source
self.msg_connect(self, "frame_data", self.frame_src, "frame_data")
# Stage 2: NRZ encode (byte 0/1 -> float +1/-1 at sample_rate)
self.nrz = nrz_encoder(bit_rate=bit_rate, sample_rate=sample_rate)
# Stage 3: BPSK modulate onto 1.024 MHz subcarrier
self.bpsk = bpsk_subcarrier_mod(
subcarrier_freq=PCM_SUBCARRIER_HZ,
sample_rate=sample_rate,
)
# Connect PCM chain: frame_src -> nrz -> bpsk
self.connect(self.frame_src, self.nrz, self.bpsk)
# --- Subcarrier summing ---
if voice_enabled:
# Voice subcarrier level relative to PCM:
# Per IMPL_SPEC: PCM = 2.2 Vpp, Voice = 1.68 Vpp
# The BPSK subcarrier has unity amplitude, so voice is scaled
# by 1.68/2.2 to maintain the correct power ratio.
voice_scale = 1.68 / 2.2
self.voice = fm_voice_subcarrier_mod(
sample_rate=sample_rate,
subcarrier_freq=VOICE_SUBCARRIER_HZ,
fm_deviation=VOICE_FM_DEVIATION_HZ,
tone_freq=voice_tone_hz,
)
self.voice_gain = blocks.multiply_const_ff(voice_scale)
self.adder = blocks.add_ff(1)
# PCM subcarrier -> adder port 0
self.connect(self.bpsk, (self.adder, 0))
# Voice subcarrier (scaled) -> adder port 1
self.connect(self.voice, self.voice_gain, (self.adder, 1))
composite = self.adder
else:
composite = self.bpsk
# --- PM modulation ---
self.pm = pm_mod(pm_deviation=pm_deviation, sample_rate=sample_rate)
self.connect(composite, self.pm)
# --- Optional AWGN ---
if snr_db is not None:
# Signal power is 1.0 (PM constant envelope)
noise_power = 1.0 / (10.0 ** (snr_db / 10.0))
noise_amplitude = math.sqrt(noise_power / 2.0)
self.noise = analog.noise_source_c(
analog.GR_GAUSSIAN, noise_amplitude, 0,
)
self.sum_noise = blocks.add_cc(1)
self.connect(self.pm, (self.sum_noise, 0))
self.connect(self.noise, (self.sum_noise, 1))
self.connect(self.sum_noise, self)
else:
self.connect(self.pm, self)

View File

@ -0,0 +1,197 @@
"""Tests for the BPSK subcarrier modulator block."""
import numpy as np
import pytest
try:
from gnuradio import blocks, gr
HAS_GNURADIO = True
except ImportError:
HAS_GNURADIO = False
from apollo.constants import PCM_SUBCARRIER_HZ, SAMPLE_RATE_BASEBAND
pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
class TestBPSKSubcarrierMod:
"""Test BPSK subcarrier modulation with synthetic NRZ inputs."""
def test_block_instantiation(self):
"""Block should instantiate with default parameters."""
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
mod = bpsk_subcarrier_mod()
assert mod is not None
assert mod.subcarrier_freq == PCM_SUBCARRIER_HZ
assert mod.sample_rate == SAMPLE_RATE_BASEBAND
def test_constant_positive_input(self):
"""All +1.0 input should produce a pure cosine at 1.024 MHz."""
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
sample_rate = SAMPLE_RATE_BASEBAND
n_samples = 51200
tb = gr.top_block()
src = blocks.vector_source_f([1.0] * n_samples)
mod = bpsk_subcarrier_mod(
subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate,
)
snk = blocks.vector_sink_f()
tb.connect(src, mod, snk)
tb.run()
data = np.array(snk.data())
assert len(data) == n_samples
# FFT: spectral energy should concentrate at 1.024 MHz
fft_vals = np.fft.fft(data)
freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate)
pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000)
pcm_power = np.mean(np.abs(fft_vals[pcm_mask]) ** 2)
total_power = np.mean(np.abs(fft_vals) ** 2)
assert pcm_power > total_power * 0.1, (
f"PCM band power ({pcm_power:.1f}) is less than 10% of "
f"total power ({total_power:.1f})"
)
def test_constant_negative_input(self):
"""All -1.0 input should produce -cos (inverted cosine) at 1.024 MHz."""
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
sample_rate = SAMPLE_RATE_BASEBAND
n_samples = 51200
tb = gr.top_block()
src = blocks.vector_source_f([-1.0] * n_samples)
mod = bpsk_subcarrier_mod(
subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate,
)
snk = blocks.vector_sink_f()
tb.connect(src, mod, snk)
tb.run()
data = np.array(snk.data())
assert len(data) == n_samples
# Inverted cosine still has energy at 1.024 MHz
fft_vals = np.fft.fft(data)
freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate)
pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000)
pcm_power = np.mean(np.abs(fft_vals[pcm_mask]) ** 2)
total_power = np.mean(np.abs(fft_vals) ** 2)
assert pcm_power > total_power * 0.1, (
f"PCM band power ({pcm_power:.1f}) is less than 10% of "
f"total power ({total_power:.1f})"
)
def test_alternating_input_spectrum(self):
"""Alternating +1/-1 NRZ should still have spectral peak near subcarrier."""
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
sample_rate = SAMPLE_RATE_BASEBAND
# Samples per bit at high rate: 5_120_000 / 51_200 = 100
samples_per_bit = int(sample_rate / 51_200)
n_bits = 512
n_samples = n_bits * samples_per_bit
# Build alternating NRZ: +1 for 100 samples, -1 for 100, ...
nrz = []
for i in range(n_bits):
val = 1.0 if i % 2 == 0 else -1.0
nrz.extend([val] * samples_per_bit)
tb = gr.top_block()
src = blocks.vector_source_f(nrz)
mod = bpsk_subcarrier_mod(
subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate,
)
snk = blocks.vector_sink_f()
tb.connect(src, mod, snk)
tb.run()
data = np.array(snk.data())
assert len(data) == n_samples
# BPSK with alternating data spreads energy around subcarrier +/- bit_rate,
# but the band near 1.024 MHz should still carry significant power
fft_vals = np.fft.fft(data)
freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate)
pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000)
pcm_power = np.mean(np.abs(fft_vals[pcm_mask]) ** 2)
total_power = np.mean(np.abs(fft_vals) ** 2)
assert pcm_power > total_power * 0.1, (
f"PCM band power ({pcm_power:.1f}) is less than 10% of "
f"total power ({total_power:.1f})"
)
def test_amplitude_bounded(self):
"""Output amplitude should be <= 1.0 (product of +/-1 and cos)."""
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
sample_rate = SAMPLE_RATE_BASEBAND
n_samples = 51200
tb = gr.top_block()
src = blocks.vector_source_f([1.0] * n_samples)
mod = bpsk_subcarrier_mod(
subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate,
)
snk = blocks.vector_sink_f()
tb.connect(src, mod, snk)
tb.run()
data = np.array(snk.data())
peak = np.max(np.abs(data))
# cos has peak 1.0, NRZ is +/-1.0, product peak should be ~1.0
assert peak <= 1.0 + 1e-6, (
f"Output peak amplitude {peak:.6f} exceeds 1.0"
)
assert peak > 0.9, (
f"Output peak amplitude {peak:.6f} is suspiciously low"
)
def test_custom_subcarrier_freq(self):
"""Custom subcarrier frequency should shift spectral peak."""
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
sample_rate = SAMPLE_RATE_BASEBAND
n_samples = 51200
custom_freq = 500_000 # 500 kHz
tb = gr.top_block()
src = blocks.vector_source_f([1.0] * n_samples)
mod = bpsk_subcarrier_mod(
subcarrier_freq=custom_freq, sample_rate=sample_rate,
)
snk = blocks.vector_sink_f()
tb.connect(src, mod, snk)
tb.run()
data = np.array(snk.data())
fft_vals = np.fft.fft(data)
freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate)
# Energy should be near 500 kHz, not 1.024 MHz
custom_mask = (np.abs(freqs) > 450_000) & (np.abs(freqs) < 550_000)
custom_power = np.mean(np.abs(fft_vals[custom_mask]) ** 2)
total_power = np.mean(np.abs(fft_vals) ** 2)
assert custom_power > total_power * 0.1, (
f"Custom freq band power ({custom_power:.1f}) is less than 10% of "
f"total power ({total_power:.1f})"
)

View File

@ -0,0 +1,149 @@
"""Tests for the FM voice subcarrier modulator block."""
import numpy as np
import pytest
try:
from gnuradio import blocks, gr
HAS_GNURADIO = True
except ImportError:
HAS_GNURADIO = False
from apollo.constants import (
SAMPLE_RATE_BASEBAND,
VOICE_SUBCARRIER_HZ,
)
pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
class TestFmVoiceModInstantiation:
"""Test block creation and parameter handling."""
def test_default_parameters(self):
"""Block should instantiate with default parameters."""
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
mod = fm_voice_subcarrier_mod()
assert mod is not None
def test_custom_tone_freq(self):
"""Block should accept a custom tone frequency and produce output."""
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
sample_rate = SAMPLE_RATE_BASEBAND
n_samples = 10240
tb = gr.top_block()
src = fm_voice_subcarrier_mod(sample_rate=sample_rate, tone_freq=2000.0)
head = blocks.head(gr.sizeof_float, n_samples)
snk = blocks.vector_sink_f()
tb.connect(src, head, snk)
tb.run()
data = np.array(snk.data())
assert len(data) == n_samples
assert np.any(data != 0), "Output is all zeros with tone_freq=2000"
def test_properties(self):
"""Properties should reflect constructor arguments."""
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
mod = fm_voice_subcarrier_mod(
tone_freq=1500.0,
subcarrier_freq=1_000_000,
fm_deviation=20_000,
)
assert mod.tone_freq == 1500.0
assert mod.subcarrier_freq == 1_000_000
assert mod.fm_deviation == 20_000
class TestFmVoiceModFunctional:
"""Functional tests with signal analysis."""
def test_produces_output(self):
"""Source block should produce non-zero float output."""
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
sample_rate = SAMPLE_RATE_BASEBAND
n_samples = 51200
tb = gr.top_block()
src = fm_voice_subcarrier_mod(sample_rate=sample_rate)
head = blocks.head(gr.sizeof_float, n_samples)
snk = blocks.vector_sink_f()
tb.connect(src, head, snk)
tb.run()
data = np.array(snk.data())
assert len(data) == n_samples, f"Expected {n_samples} samples, got {len(data)}"
assert np.any(data != 0), "Output is all zeros"
def test_output_is_float(self):
"""Output samples should be real-valued floats."""
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
sample_rate = SAMPLE_RATE_BASEBAND
n_samples = 1024
tb = gr.top_block()
src = fm_voice_subcarrier_mod(sample_rate=sample_rate)
head = blocks.head(gr.sizeof_float, n_samples)
snk = blocks.vector_sink_f()
tb.connect(src, head, snk)
tb.run()
data = np.array(snk.data())
assert data.dtype in (np.float32, np.float64), (
f"Expected float output, got {data.dtype}"
)
def test_spectral_energy_at_subcarrier(self):
"""Most spectral energy should be near the 1.25 MHz subcarrier."""
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
sample_rate = SAMPLE_RATE_BASEBAND
n_samples = 51200 # ~10 ms
tb = gr.top_block()
src = fm_voice_subcarrier_mod(sample_rate=sample_rate)
head = blocks.head(gr.sizeof_float, n_samples)
snk = blocks.vector_sink_f()
tb.connect(src, head, snk)
tb.run()
data = np.array(snk.data())
fft_vals = np.fft.fft(data)
freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate)
# Energy in the 1.2-1.3 MHz band (subcarrier +/- deviation margin)
voice_mask = (np.abs(freqs) > 1_200_000) & (np.abs(freqs) < 1_300_000)
voice_power = np.mean(np.abs(fft_vals[voice_mask]) ** 2)
total_power = np.mean(np.abs(fft_vals) ** 2)
assert voice_power > total_power * 0.1, (
f"Subcarrier band power ({voice_power:.1f}) is less than 10% of "
f"total power ({total_power:.1f})"
)
def test_output_bounded(self):
"""Output amplitude should stay bounded (not blow up)."""
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
sample_rate = SAMPLE_RATE_BASEBAND
n_samples = 51200
tb = gr.top_block()
src = fm_voice_subcarrier_mod(sample_rate=sample_rate)
head = blocks.head(gr.sizeof_float, n_samples)
snk = blocks.vector_sink_f()
tb.connect(src, head, snk)
tb.run()
data = np.array(snk.data())
peak = np.max(np.abs(data))
# FM on a cosine carrier: peak should be around 1.0, certainly < 2.0
assert peak < 2.0, f"Output peak amplitude {peak:.3f} exceeds expected bound"
assert peak > 0.1, f"Output peak amplitude {peak:.3f} is suspiciously low"

144
tests/test_loopback.py Normal file
View File

@ -0,0 +1,144 @@
"""Loopback test: usb_signal_source -> usb_downlink_receiver round-trip.
The ultimate validation -- generates a PM-modulated signal with known PCM
frames using the transmit chain, feeds it through the complete receive chain,
and verifies that frames are recovered correctly.
This exercises every block in both the transmit and receive paths:
TX: pcm_frame_source -> nrz_encoder -> bpsk_subcarrier_mod -> pm_mod
RX: pm_demod -> subcarrier_extract -> bpsk_demod -> pcm_frame_sync -> pcm_demux
"""
import numpy as np
import pytest
try:
from gnuradio import blocks, gr
HAS_GNURADIO = True
except ImportError:
HAS_GNURADIO = False
from apollo.constants import (
PCM_HIGH_BIT_RATE,
PCM_HIGH_WORDS_PER_FRAME,
PCM_WORD_LENGTH,
SAMPLE_RATE_BASEBAND,
)
pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
class TestLoopback:
"""Round-trip: transmit -> receive -> verify."""
def test_loopback_recovers_frames(self):
"""TX signal source -> RX downlink receiver should produce frame PDUs."""
from apollo.usb_downlink_receiver import usb_downlink_receiver
from apollo.usb_signal_source import usb_signal_source
# Generate enough samples for several frames so the receiver PLL can settle.
# At 51.2 kbps high rate, one frame = 1024 bits = 102400 samples.
# Give the receiver 8 frames worth (~0.16 seconds).
bits_per_frame = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
samples_per_frame = int(bits_per_frame * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
n_frames = 8
n_samples = n_frames * samples_per_frame
tb = gr.top_block()
# Transmit chain (clean, no noise)
tx = usb_signal_source(
sample_rate=SAMPLE_RATE_BASEBAND,
bit_rate=PCM_HIGH_BIT_RATE,
snr_db=None,
)
head = blocks.head(gr.sizeof_gr_complex, n_samples)
# Receive chain
rx = usb_downlink_receiver(
sample_rate=SAMPLE_RATE_BASEBAND,
bit_rate=PCM_HIGH_BIT_RATE,
)
snk = blocks.message_debug()
tb.connect(tx, head, rx)
tb.msg_connect(rx, "frames", snk, "store")
tb.run()
n_recovered = snk.num_messages()
# The receiver needs ~1-2 frames for PLL settling, so we expect
# at least a few frames from 8 transmitted.
assert n_recovered >= 1, (
f"Loopback recovered {n_recovered} frames from {n_frames} transmitted, "
f"expected >= 1"
)
def test_loopback_frame_structure(self):
"""Recovered frames should have valid sync word structure."""
from apollo.pcm_demux import DemuxEngine
from apollo.usb_downlink_receiver import usb_downlink_receiver
from apollo.usb_signal_source import usb_signal_source
import pmt
bits_per_frame = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
samples_per_frame = int(bits_per_frame * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
n_samples = 8 * samples_per_frame
tb = gr.top_block()
tx = usb_signal_source(snr_db=None)
head = blocks.head(gr.sizeof_gr_complex, n_samples)
rx = usb_downlink_receiver(output_format="raw")
snk = blocks.message_debug()
tb.connect(tx, head, rx)
tb.msg_connect(rx, "frames", snk, "store")
tb.run()
n_recovered = snk.num_messages()
if n_recovered == 0:
pytest.skip("No frames recovered in loopback -- PLL may need tuning")
# Validate first recovered frame through the demux engine
msg = snk.get_message(0)
if pmt.is_pair(msg):
payload = pmt.cdr(msg)
else:
payload = msg
frame_bytes = bytes(pmt.u8vector_elements(payload))
demux = DemuxEngine(output_format="raw")
result = demux.process_frame(frame_bytes)
assert "sync" in result
assert "words" in result
assert result["sync"]["frame_id"] >= 1
assert result["sync"]["frame_id"] <= 50
def test_loopback_with_noise(self):
"""Loopback at 30 dB SNR should still recover frames."""
from apollo.usb_downlink_receiver import usb_downlink_receiver
from apollo.usb_signal_source import usb_signal_source
bits_per_frame = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
samples_per_frame = int(bits_per_frame * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
n_samples = 10 * samples_per_frame # more frames for noisy recovery
tb = gr.top_block()
tx = usb_signal_source(snr_db=30.0)
head = blocks.head(gr.sizeof_gr_complex, n_samples)
rx = usb_downlink_receiver()
snk = blocks.message_debug()
tb.connect(tx, head, rx)
tb.msg_connect(rx, "frames", snk, "store")
tb.run()
n_recovered = snk.num_messages()
# At 30 dB SNR with 10 frames, should get at least 1
assert n_recovered >= 1, (
f"Noisy loopback recovered {n_recovered} frames, expected >= 1"
)

135
tests/test_nrz_encoder.py Normal file
View File

@ -0,0 +1,135 @@
"""Tests for the NRZ encoder block."""
import numpy as np
import pytest
try:
from gnuradio import blocks, gr
HAS_GNURADIO = True
except ImportError:
HAS_GNURADIO = False
from apollo.constants import PCM_HIGH_BIT_RATE, SAMPLE_RATE_BASEBAND
pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
class TestNRZEncoder:
"""Test NRZ encoding of bit streams to baseband waveforms."""
def test_bit_one_maps_to_positive(self):
"""A single 1-bit should produce +1.0 repeated for samples_per_bit."""
from apollo.nrz_encoder import nrz_encoder
tb = gr.top_block()
samples_per_bit = int(SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
src = blocks.vector_source_b([1])
enc = nrz_encoder(bit_rate=PCM_HIGH_BIT_RATE, sample_rate=SAMPLE_RATE_BASEBAND)
snk = blocks.vector_sink_f()
tb.connect(src, enc, snk)
tb.run()
output = np.array(snk.data())
assert len(output) == samples_per_bit
np.testing.assert_allclose(output, 1.0, atol=1e-6)
def test_bit_zero_maps_to_negative(self):
"""A single 0-bit should produce -1.0 repeated for samples_per_bit."""
from apollo.nrz_encoder import nrz_encoder
tb = gr.top_block()
samples_per_bit = int(SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
src = blocks.vector_source_b([0])
enc = nrz_encoder(bit_rate=PCM_HIGH_BIT_RATE, sample_rate=SAMPLE_RATE_BASEBAND)
snk = blocks.vector_sink_f()
tb.connect(src, enc, snk)
tb.run()
output = np.array(snk.data())
assert len(output) == samples_per_bit
np.testing.assert_allclose(output, -1.0, atol=1e-6)
def test_alternating_bits(self):
"""Alternating [1,0,1,0] should produce +1*N, -1*N, +1*N, -1*N."""
from apollo.nrz_encoder import nrz_encoder
tb = gr.top_block()
samples_per_bit = int(SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
bits = [1, 0, 1, 0]
src = blocks.vector_source_b(bits)
enc = nrz_encoder(bit_rate=PCM_HIGH_BIT_RATE, sample_rate=SAMPLE_RATE_BASEBAND)
snk = blocks.vector_sink_f()
tb.connect(src, enc, snk)
tb.run()
output = np.array(snk.data())
expected_levels = [1.0, -1.0, 1.0, -1.0]
for i, level in enumerate(expected_levels):
start = i * samples_per_bit
end = (i + 1) * samples_per_bit
segment = output[start:end]
np.testing.assert_allclose(
segment, level, atol=1e-6,
err_msg=f"Bit {i} (value {bits[i]}): expected {level}",
)
def test_output_length(self):
"""4 bits at 51200/5120000 (100 samp/bit) should produce 400 samples."""
from apollo.nrz_encoder import nrz_encoder
tb = gr.top_block()
n_bits = 4
samples_per_bit = int(SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE) # 100
src = blocks.vector_source_b([1, 0, 1, 1])
enc = nrz_encoder(bit_rate=PCM_HIGH_BIT_RATE, sample_rate=SAMPLE_RATE_BASEBAND)
snk = blocks.vector_sink_f()
tb.connect(src, enc, snk)
tb.run()
output = np.array(snk.data())
assert len(output) == n_bits * samples_per_bit
def test_upsampling_ratio(self):
"""Each NRZ level should be held for exactly samples_per_bit samples."""
from apollo.nrz_encoder import nrz_encoder
tb = gr.top_block()
# Use a different rate pair to verify generality: 1600 bps at 5.12 MHz
# gives 3200 samples per bit
bit_rate = 1600
sample_rate = SAMPLE_RATE_BASEBAND
samples_per_bit = int(sample_rate / bit_rate) # 3200
bits = [1, 0]
src = blocks.vector_source_b(bits)
enc = nrz_encoder(bit_rate=bit_rate, sample_rate=sample_rate)
snk = blocks.vector_sink_f()
tb.connect(src, enc, snk)
tb.run()
output = np.array(snk.data())
assert len(output) == len(bits) * samples_per_bit
# First bit (1) -> +1.0 held for samples_per_bit
np.testing.assert_allclose(output[:samples_per_bit], 1.0, atol=1e-6)
# Second bit (0) -> -1.0 held for samples_per_bit
np.testing.assert_allclose(output[samples_per_bit:], -1.0, atol=1e-6)
def test_block_instantiation(self):
"""Block should instantiate with default parameters."""
from apollo.nrz_encoder import nrz_encoder
enc = nrz_encoder()
assert enc is not None
assert enc.samples_per_bit == int(SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)

View File

@ -0,0 +1,196 @@
"""Tests for the PCM frame source block."""
import numpy as np
import pytest
try:
from gnuradio import blocks, gr
HAS_GNURADIO = True
except ImportError:
HAS_GNURADIO = False
from apollo.constants import (
PCM_HIGH_WORDS_PER_FRAME,
PCM_LOW_WORDS_PER_FRAME,
PCM_SYNC_WORD_LENGTH,
PCM_WORD_LENGTH,
SUBFRAME_FRAMES,
)
from apollo.pcm_frame_source import FrameSourceEngine
from apollo.protocol import bits_to_sync_word, parse_sync_word
class TestFrameSourceEngine:
"""Test the pure-Python frame generation engine (no GR needed)."""
def test_frame_length(self):
"""High-rate frame should be 128 words * 8 bits = 1024 bits."""
engine = FrameSourceEngine(bit_rate=51200)
bits = engine.next_frame()
assert len(bits) == PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
def test_frame_length_low_rate(self):
"""Low-rate frame should be 200 words * 8 bits = 1600 bits."""
engine = FrameSourceEngine(bit_rate=1600)
bits = engine.next_frame()
assert len(bits) == PCM_LOW_WORDS_PER_FRAME * PCM_WORD_LENGTH
def test_bits_are_binary(self):
"""Every output value should be 0 or 1."""
engine = FrameSourceEngine()
bits = engine.next_frame()
assert all(b in (0, 1) for b in bits)
def test_frame_counter_wraps(self):
"""Frame counter should cycle 1 -> 50 -> 1."""
engine = FrameSourceEngine()
assert engine.frame_counter == 1
# Generate 50 frames (one full subframe)
for expected_id in range(1, SUBFRAME_FRAMES + 1):
assert engine.frame_counter == expected_id
engine.next_frame()
# Should wrap back to 1
assert engine.frame_counter == 1
# One more frame to confirm it keeps going
engine.next_frame()
assert engine.frame_counter == 2
def test_frame_id_in_sync_word(self):
"""The 6-bit frame ID field in the sync word should match the counter."""
engine = FrameSourceEngine()
for expected_id in range(1, 6):
bits = engine.next_frame()
sync_word = bits_to_sync_word(bits[:PCM_SYNC_WORD_LENGTH])
parsed = parse_sync_word(sync_word)
assert parsed["frame_id"] == expected_id
def test_odd_even_sync(self):
"""Odd frames should have complemented sync core vs even frames."""
engine = FrameSourceEngine()
# Frame 1 (odd) and frame 2 (even) should differ in the core field
bits_1 = engine.next_frame()
bits_2 = engine.next_frame()
sync_1 = bits_to_sync_word(bits_1[:PCM_SYNC_WORD_LENGTH])
sync_2 = bits_to_sync_word(bits_2[:PCM_SYNC_WORD_LENGTH])
parsed_1 = parse_sync_word(sync_1)
parsed_2 = parse_sync_word(sync_2)
# Cores should be bitwise complements (within 15 bits)
assert (parsed_1["core"] ^ parsed_2["core"]) == 0x7FFF
def test_custom_payload(self):
"""Injected data bytes should appear in the data portion of the frame."""
engine = FrameSourceEngine()
payload = bytes([0xAA, 0x55, 0xDE, 0xAD])
bits = engine.next_frame(data=payload)
# Data starts after the 32-bit sync word
data_start = PCM_SYNC_WORD_LENGTH
for byte_idx, expected_byte in enumerate(payload):
byte_bits = bits[data_start + byte_idx * 8 : data_start + (byte_idx + 1) * 8]
recovered = 0
for b in byte_bits:
recovered = (recovered << 1) | b
assert recovered == expected_byte, (
f"Byte {byte_idx}: expected 0x{expected_byte:02x}, got 0x{recovered:02x}"
)
def test_default_zero_fill(self):
"""Without explicit data, payload should be zero-filled."""
engine = FrameSourceEngine()
bits = engine.next_frame()
# All data bits after sync should be zero
data_bits = bits[PCM_SYNC_WORD_LENGTH:]
assert all(b == 0 for b in data_bits)
@pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
class TestPCMFrameSourceBlock:
"""Test the GNU Radio sync_block wrapper."""
def test_block_instantiation(self):
"""Block should instantiate with default parameters."""
from apollo.pcm_frame_source import pcm_frame_source
src = pcm_frame_source()
assert src is not None
def test_produces_output(self):
"""Source should produce a stream of 0s and 1s."""
from apollo.pcm_frame_source import pcm_frame_source
tb = gr.top_block()
n_samples = 2048
src = pcm_frame_source(bit_rate=51200)
head = blocks.head(gr.sizeof_char, n_samples)
snk = blocks.vector_sink_b()
tb.connect(src, head, snk)
tb.run()
data = np.array(snk.data(), dtype=np.uint8)
assert len(data) == n_samples
# All values should be 0 or 1
assert np.all((data == 0) | (data == 1))
def test_frame_boundary(self):
"""Getting exactly one frame's worth of bits should work."""
from apollo.pcm_frame_source import pcm_frame_source
tb = gr.top_block()
frame_bits = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
src = pcm_frame_source(bit_rate=51200)
head = blocks.head(gr.sizeof_char, frame_bits)
snk = blocks.vector_sink_b()
tb.connect(src, head, snk)
tb.run()
data = snk.data()
assert len(data) == frame_bits
def test_continuous_stream(self):
"""Multiple frames should produce the expected total length."""
from apollo.pcm_frame_source import pcm_frame_source
tb = gr.top_block()
n_frames = 5
frame_bits = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
total_bits = n_frames * frame_bits
src = pcm_frame_source(bit_rate=51200)
head = blocks.head(gr.sizeof_char, total_bits)
snk = blocks.vector_sink_b()
tb.connect(src, head, snk)
tb.run()
data = snk.data()
assert len(data) == total_bits
def test_low_rate(self):
"""Low-rate source should produce 200-word frames."""
from apollo.pcm_frame_source import pcm_frame_source
tb = gr.top_block()
frame_bits = PCM_LOW_WORDS_PER_FRAME * PCM_WORD_LENGTH
src = pcm_frame_source(bit_rate=1600)
head = blocks.head(gr.sizeof_char, frame_bits)
snk = blocks.vector_sink_b()
tb.connect(src, head, snk)
tb.run()
data = snk.data()
assert len(data) == frame_bits

146
tests/test_pm_mod.py Normal file
View File

@ -0,0 +1,146 @@
"""Tests for the PM modulator block."""
import numpy as np
import pytest
try:
from gnuradio import blocks, gr
HAS_GNURADIO = True
except ImportError:
HAS_GNURADIO = False
from apollo.constants import PM_PEAK_DEVIATION_RAD, SAMPLE_RATE_BASEBAND
pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
class TestPMMod:
"""Test PM modulation with synthetic signals."""
def test_zero_input_constant_envelope(self):
"""Zero input should produce exp(j*0) = 1+0j (unit carrier)."""
from apollo.pm_mod import pm_mod
tb = gr.top_block()
n_samples = 10000
data = [0.0] * n_samples
src = blocks.vector_source_f(data)
mod = pm_mod(pm_deviation=PM_PEAK_DEVIATION_RAD, sample_rate=SAMPLE_RATE_BASEBAND)
snk = blocks.vector_sink_c()
tb.connect(src, mod, snk)
tb.run()
output = np.array(snk.data())
assert len(output) == n_samples
# Magnitude should be 1.0 (constant envelope)
magnitudes = np.abs(output)
np.testing.assert_allclose(magnitudes, 1.0, atol=1e-6)
# Phase should be 0 (no modulation)
phases = np.angle(output)
np.testing.assert_allclose(phases, 0.0, atol=1e-6)
def test_sine_input_phase_deviation(self):
"""Sine wave input should produce phase swinging +/- pm_deviation."""
from apollo.pm_mod import pm_mod
tb = gr.top_block()
n_samples = 100000
sample_rate = SAMPLE_RATE_BASEBAND
# Unit-amplitude sine at 10 kHz as modulating signal
t = np.arange(n_samples, dtype=np.float64) / sample_rate
tone_freq = 10000.0
modulating = np.sin(2 * np.pi * tone_freq * t).astype(np.float32)
src = blocks.vector_source_f(modulating.tolist())
mod = pm_mod(pm_deviation=PM_PEAK_DEVIATION_RAD, sample_rate=sample_rate)
snk = blocks.vector_sink_c()
tb.connect(src, mod, snk)
tb.run()
output = np.array(snk.data())
phases = np.angle(output)
# Peak phase should be approximately pm_deviation
peak_phase = np.max(np.abs(phases))
assert peak_phase == pytest.approx(PM_PEAK_DEVIATION_RAD, abs=0.01), (
f"Peak phase {peak_phase} doesn't match deviation {PM_PEAK_DEVIATION_RAD}"
)
def test_constant_envelope(self):
"""PM output should always have |s(t)| = 1.0 regardless of input."""
from apollo.pm_mod import pm_mod
tb = gr.top_block()
n_samples = 50000
sample_rate = SAMPLE_RATE_BASEBAND
# Arbitrary varying input: sum of two tones
t = np.arange(n_samples, dtype=np.float64) / sample_rate
modulating = (
0.7 * np.sin(2 * np.pi * 5000 * t) + 0.3 * np.cos(2 * np.pi * 20000 * t)
).astype(np.float32)
src = blocks.vector_source_f(modulating.tolist())
mod = pm_mod(pm_deviation=PM_PEAK_DEVIATION_RAD, sample_rate=sample_rate)
snk = blocks.vector_sink_c()
tb.connect(src, mod, snk)
tb.run()
output = np.array(snk.data())
magnitudes = np.abs(output)
np.testing.assert_allclose(magnitudes, 1.0, atol=1e-6)
def test_custom_deviation(self):
"""Custom pm_deviation should scale output phase accordingly."""
from apollo.pm_mod import pm_mod
tb = gr.top_block()
n_samples = 100000
sample_rate = SAMPLE_RATE_BASEBAND
custom_dev = 0.5
# Unit-amplitude sine
t = np.arange(n_samples, dtype=np.float64) / sample_rate
tone_freq = 10000.0
modulating = np.sin(2 * np.pi * tone_freq * t).astype(np.float32)
src = blocks.vector_source_f(modulating.tolist())
mod = pm_mod(pm_deviation=custom_dev, sample_rate=sample_rate)
snk = blocks.vector_sink_c()
tb.connect(src, mod, snk)
tb.run()
output = np.array(snk.data())
phases = np.angle(output)
peak_phase = np.max(np.abs(phases))
assert peak_phase == pytest.approx(custom_dev, abs=0.02), (
f"Peak phase {peak_phase} doesn't match custom deviation {custom_dev}"
)
def test_block_instantiation(self):
"""Block should instantiate with default parameters."""
from apollo.pm_mod import pm_mod
mod = pm_mod()
assert mod is not None
assert mod.get_pm_deviation() == PM_PEAK_DEVIATION_RAD
def test_set_pm_deviation(self):
"""Runtime deviation update should take effect."""
from apollo.pm_mod import pm_mod
mod = pm_mod()
assert mod.get_pm_deviation() == PM_PEAK_DEVIATION_RAD
mod.set_pm_deviation(0.25)
assert mod.get_pm_deviation() == 0.25

View File

@ -0,0 +1,113 @@
"""Tests for the USB signal source (complete transmit chain)."""
import numpy as np
import pytest
try:
from gnuradio import blocks, gr
HAS_GNURADIO = True
except ImportError:
HAS_GNURADIO = False
from apollo.constants import (
PCM_HIGH_BIT_RATE,
PCM_HIGH_WORDS_PER_FRAME,
PCM_WORD_LENGTH,
SAMPLE_RATE_BASEBAND,
)
pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
class TestUSBSignalSource:
"""Test the convenience transmit wrapper."""
def _get_samples(self, n_samples, **kwargs):
"""Helper: run usb_signal_source and return complex samples."""
from apollo.usb_signal_source import usb_signal_source
tb = gr.top_block()
src = usb_signal_source(**kwargs)
head = blocks.head(gr.sizeof_gr_complex, n_samples)
snk = blocks.vector_sink_c()
tb.connect(src, head, snk)
tb.run()
return np.array(snk.data())
def test_block_instantiation(self):
"""Block should instantiate with default parameters."""
from apollo.usb_signal_source import usb_signal_source
src = usb_signal_source()
assert src is not None
def test_produces_complex_output(self):
"""Output should be complex-valued samples."""
n_samples = 51200 # ~10ms worth
data = self._get_samples(n_samples)
assert len(data) == n_samples
assert data.dtype == np.complex128 or data.dtype == np.complex64
def test_constant_envelope(self):
"""PM signal without noise should have near-constant envelope."""
n_samples = 102400 # 1 frame worth
data = self._get_samples(n_samples, snr_db=None)
envelope = np.abs(data)
# PM output: |exp(j*phi)| = 1.0 always
np.testing.assert_allclose(envelope, 1.0, atol=1e-4)
def test_spectral_content_pcm(self):
"""FFT of demodulated phase should show energy at 1.024 MHz."""
n_samples = 102400
data = self._get_samples(n_samples, snr_db=None)
# Extract phase (equivalent to PM demod)
phase = np.angle(data)
fft = np.fft.fft(phase)
freqs = np.fft.fftfreq(len(phase), 1.0 / SAMPLE_RATE_BASEBAND)
# Energy near 1.024 MHz
pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000)
pcm_power = np.mean(np.abs(fft[pcm_mask]) ** 2)
total_power = np.mean(np.abs(fft) ** 2)
assert pcm_power > total_power * 0.01
def test_with_voice(self):
"""With voice enabled, output should still be constant envelope."""
n_samples = 51200
data = self._get_samples(n_samples, voice_enabled=True, snr_db=None)
envelope = np.abs(data)
np.testing.assert_allclose(envelope, 1.0, atol=1e-4)
def test_with_noise(self):
"""With noise, envelope should vary (not constant)."""
n_samples = 51200
data = self._get_samples(n_samples, snr_db=10.0)
envelope = np.abs(data)
# With noise, std(envelope) should be > 0
assert np.std(envelope) > 0.01
def test_voice_spectral_content(self):
"""With voice, phase should contain 1.25 MHz energy."""
n_samples = 102400
data = self._get_samples(n_samples, voice_enabled=True, snr_db=None)
phase = np.angle(data)
fft = np.fft.fft(phase)
freqs = np.fft.fftfreq(len(phase), 1.0 / SAMPLE_RATE_BASEBAND)
# Energy near 1.25 MHz
voice_mask = (np.abs(freqs) > 1_200_000) & (np.abs(freqs) < 1_300_000)
voice_power = np.mean(np.abs(fft[voice_mask]) ** 2)
assert voice_power > 0
def test_frame_duration(self):
"""One frame at 51.2 kbps should produce the right number of samples."""
bits_per_frame = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
samples_per_frame = int(bits_per_frame * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
data = self._get_samples(samples_per_frame)
assert len(data) == samples_per_frame