From 493c21c5118efb2d781f4d5e4ec339beae58c2ca Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Sat, 21 Feb 2026 18:55:50 -0700 Subject: [PATCH] Add transmit chain: 6 composable GR source blocks mirroring CuriousMarc bench Implement the transmit/generate side as streaming GNU Radio blocks, complementing the existing receive chain. Each block maps to a physical instrument on CuriousMarc's Keysight bench: pcm_frame_source - PCM bit stream generator (sync_block + FrameSourceEngine) nrz_encoder - bits to NRZ waveform (+1/-1) with upsampling bpsk_subcarrier_mod - NRZ x cos(1.024 MHz) BPSK modulator fm_voice_subcarrier_mod - 1.25 MHz FM test tone source pm_mod - phase modulator: exp(j * deviation * input) usb_signal_source - convenience wrapper wiring all blocks together Includes GRC YAML definitions for all blocks under [Apollo USB] category, 49 new tests (271 total, all passing), and a loopback test that validates the full TX->RX round trip including frame recovery with 30 dB AWGN. --- grc/apollo_bpsk_subcarrier_mod.block.yml | 47 +++++ grc/apollo_fm_voice_subcarrier_mod.block.yml | 58 ++++++ grc/apollo_nrz_encoder.block.yml | 52 +++++ grc/apollo_pcm_frame_source.block.yml | 47 +++++ grc/apollo_pm_mod.block.yml | 44 +++++ grc/apollo_usb_signal_source.block.yml | 80 ++++++++ src/apollo/__init__.py | 30 ++- src/apollo/bpsk_subcarrier_mod.py | 59 ++++++ src/apollo/fm_voice_subcarrier_mod.py | 93 +++++++++ src/apollo/nrz_encoder.py | 61 ++++++ src/apollo/pcm_frame_source.py | 153 ++++++++++++++ src/apollo/pm_mod.py | 53 +++++ src/apollo/usb_signal_source.py | 146 ++++++++++++++ tests/test_bpsk_subcarrier_mod.py | 197 +++++++++++++++++++ tests/test_fm_voice_subcarrier_mod.py | 149 ++++++++++++++ tests/test_loopback.py | 144 ++++++++++++++ tests/test_nrz_encoder.py | 135 +++++++++++++ tests/test_pcm_frame_source.py | 196 ++++++++++++++++++ tests/test_pm_mod.py | 146 ++++++++++++++ tests/test_usb_signal_source.py | 113 +++++++++++ 20 files changed, 1993 insertions(+), 10 deletions(-) create mode 100644 grc/apollo_bpsk_subcarrier_mod.block.yml create mode 100644 grc/apollo_fm_voice_subcarrier_mod.block.yml create mode 100644 grc/apollo_nrz_encoder.block.yml create mode 100644 grc/apollo_pcm_frame_source.block.yml create mode 100644 grc/apollo_pm_mod.block.yml create mode 100644 grc/apollo_usb_signal_source.block.yml create mode 100644 src/apollo/bpsk_subcarrier_mod.py create mode 100644 src/apollo/fm_voice_subcarrier_mod.py create mode 100644 src/apollo/nrz_encoder.py create mode 100644 src/apollo/pcm_frame_source.py create mode 100644 src/apollo/pm_mod.py create mode 100644 src/apollo/usb_signal_source.py create mode 100644 tests/test_bpsk_subcarrier_mod.py create mode 100644 tests/test_fm_voice_subcarrier_mod.py create mode 100644 tests/test_loopback.py create mode 100644 tests/test_nrz_encoder.py create mode 100644 tests/test_pcm_frame_source.py create mode 100644 tests/test_pm_mod.py create mode 100644 tests/test_usb_signal_source.py diff --git a/grc/apollo_bpsk_subcarrier_mod.block.yml b/grc/apollo_bpsk_subcarrier_mod.block.yml new file mode 100644 index 0000000..18b6a01 --- /dev/null +++ b/grc/apollo_bpsk_subcarrier_mod.block.yml @@ -0,0 +1,47 @@ +id: apollo_bpsk_subcarrier_mod +label: Apollo BPSK Subcarrier Mod +category: '[Apollo USB]' +flags: [python] + +parameters: +- id: subcarrier_freq + label: Subcarrier Frequency (Hz) + dtype: real + default: '1024000' +- id: sample_rate + label: Sample Rate (Hz) + dtype: real + default: '5120000' + +inputs: +- label: in + domain: stream + dtype: float + +outputs: +- label: out + domain: stream + dtype: float + +templates: + imports: from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod + make: apollo.bpsk_subcarrier_mod.bpsk_subcarrier_mod(subcarrier_freq=${subcarrier_freq}, sample_rate=${sample_rate}) + +documentation: |- + Apollo BPSK Subcarrier Modulator + + Multiplies an NRZ baseband waveform (+1/-1) by a 1.024 MHz cosine to produce + a BPSK-modulated subcarrier: output(t) = nrz(t) * cos(2*pi*f_sc*t). + + The cosine phase flips 180 degrees at each NRZ sign change, implementing + bi-phase shift keying. This is the transmit-side counterpart to the + Apollo BPSK Subcarrier Demod block. + + On the real spacecraft, the PCM encoder drives the BPSK subcarrier modulator + before summing with the voice subcarrier for PM transmission. + + Parameters: + subcarrier_freq: BPSK subcarrier frequency in Hz (default 1.024 MHz) + sample_rate: Sample rate in Hz (default 5.12 MHz) + +file_format: 1 diff --git a/grc/apollo_fm_voice_subcarrier_mod.block.yml b/grc/apollo_fm_voice_subcarrier_mod.block.yml new file mode 100644 index 0000000..7da803a --- /dev/null +++ b/grc/apollo_fm_voice_subcarrier_mod.block.yml @@ -0,0 +1,58 @@ +id: apollo_fm_voice_subcarrier_mod +label: Apollo FM Voice Subcarrier Mod +category: '[Apollo USB]' +flags: [python] + +parameters: +- id: sample_rate + label: Sample Rate (Hz) + dtype: real + default: '5120000' +- id: subcarrier_freq + label: Subcarrier Frequency (Hz) + dtype: real + default: '1250000' +- id: fm_deviation + label: FM Deviation (Hz) + dtype: real + default: '29000' +- id: tone_freq + label: Test Tone Frequency (Hz) + dtype: real + default: '1000' + +outputs: +- label: out + domain: stream + dtype: float + +templates: + imports: from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod + make: >- + apollo.fm_voice_subcarrier_mod.fm_voice_subcarrier_mod( + sample_rate=${sample_rate}, + subcarrier_freq=${subcarrier_freq}, + fm_deviation=${fm_deviation}, + tone_freq=${tone_freq}) + +documentation: |- + Apollo FM Voice Subcarrier Modulator + + Source block that generates a 1.25 MHz FM subcarrier with a sinusoidal test + tone. Transmit-side counterpart to the voice subcarrier demodulator. + + The signal chain is: test tone -> FM modulator -> upconvert to 1.25 MHz + -> real-valued output. With default parameters this produces a signal + matching the Apollo USB downlink voice subcarrier (+/-29 kHz deviation). + + On the real spacecraft, voice audio (300-3000 Hz) drives an FM VCO at + 113 kHz, which is mixed with the 512 kHz master clock and doubled to + produce the 1.25 MHz FM subcarrier. + + Parameters: + sample_rate: Output sample rate in Hz (default 5.12 MHz) + subcarrier_freq: FM subcarrier center frequency in Hz (default 1.25 MHz) + fm_deviation: FM deviation in Hz (default +/-29 kHz) + tone_freq: Internal test tone frequency in Hz (default 1 kHz) + +file_format: 1 diff --git a/grc/apollo_nrz_encoder.block.yml b/grc/apollo_nrz_encoder.block.yml new file mode 100644 index 0000000..781956a --- /dev/null +++ b/grc/apollo_nrz_encoder.block.yml @@ -0,0 +1,52 @@ +id: apollo_nrz_encoder +label: Apollo NRZ Encoder +category: '[Apollo USB]' +flags: [python] + +parameters: +- id: bit_rate + label: Bit Rate (bps) + dtype: int + default: '51200' + options: ['51200', '1600'] + option_labels: ['High (51.2 kbps)', 'Low (1.6 kbps)'] +- id: sample_rate + label: Sample Rate + dtype: real + default: '5120000' + +inputs: +- label: in + domain: stream + dtype: byte + +outputs: +- label: out + domain: stream + dtype: float + +templates: + imports: from apollo.nrz_encoder import nrz_encoder + make: apollo.nrz_encoder.nrz_encoder(bit_rate=${bit_rate}, sample_rate=${sample_rate}) + +documentation: |- + Apollo NRZ Encoder + + Converts a stream of byte values (0 or 1) to a Non-Return-to-Zero + baseband waveform at the specified sample rate. + + Mapping: + bit 1 -> +1.0 (held for bit period) + bit 0 -> -1.0 (held for bit period) + + Each bit value is upsampled by samples_per_bit = sample_rate / bit_rate. + At the default high rate (51.2 kbps, 5.12 MHz), this is 100 samples + per bit. At low rate (1.6 kbps), it is 3200 samples per bit. + + This is the transmit-side counterpart to the slicer in bpsk_demod. + + Parameters: + bit_rate: PCM bit rate in bps (51200 high, 1600 low) + sample_rate: Output sample rate in Hz (default 5.12 MHz) + +file_format: 1 diff --git a/grc/apollo_pcm_frame_source.block.yml b/grc/apollo_pcm_frame_source.block.yml new file mode 100644 index 0000000..fff1d46 --- /dev/null +++ b/grc/apollo_pcm_frame_source.block.yml @@ -0,0 +1,47 @@ +id: apollo_pcm_frame_source +label: Apollo PCM Frame Source +category: '[Apollo USB]' +flags: [python] + +parameters: +- id: bit_rate + label: Bit Rate (bps) + dtype: int + default: '51200' + options: ['51200', '1600'] + option_labels: ['High (51.2 kbps)', 'Low (1.6 kbps)'] + +inputs: +- label: frame_data + domain: message + optional: true + +outputs: +- label: out + domain: stream + dtype: byte + +templates: + imports: from apollo.pcm_frame_source import pcm_frame_source + make: apollo.pcm_frame_source.pcm_frame_source(bit_rate=${bit_rate}) + +documentation: |- + Apollo PCM Frame Source + + Generates a continuous stream of NRZ-encoded PCM telemetry frame bits + (byte values 0 or 1). Frame IDs cycle 1 through 50 automatically, + with the 15-bit sync core complemented on odd-numbered frames. + + This is the transmit-side counterpart to the PCM Frame Sync block. + + The sync word format is: + [5-bit A][15-bit core][6-bit B][6-bit frame ID] + + An optional message input (frame_data) accepts u8vector payloads that + will be used as data words for the next generated frame. Without + injected data, frames carry zero-fill. + + Parameters: + bit_rate: 51200 (128 words/frame, 50 fps) or 1600 (200 words/frame, 1 fps) + +file_format: 1 diff --git a/grc/apollo_pm_mod.block.yml b/grc/apollo_pm_mod.block.yml new file mode 100644 index 0000000..fb91585 --- /dev/null +++ b/grc/apollo_pm_mod.block.yml @@ -0,0 +1,44 @@ +id: apollo_pm_mod +label: Apollo PM Mod +category: '[Apollo USB]' +flags: [python] + +parameters: +- id: pm_deviation + label: PM Deviation (rad) + dtype: real + default: '0.133' +- id: sample_rate + label: Sample Rate + dtype: real + default: '5120000' + +inputs: +- label: in + domain: stream + dtype: float + +outputs: +- label: out + domain: stream + dtype: complex + +templates: + imports: from apollo.pm_mod import pm_mod + make: apollo.pm_mod.pm_mod(pm_deviation=${pm_deviation}, sample_rate=${sample_rate}) + +documentation: |- + Apollo PM Modulator + + Applies phase modulation to produce complex baseband signal. + Takes a composite modulating signal (sum of subcarriers) and outputs + s(t) = exp(j * pm_deviation * input(t)). + + The spacecraft PM deviation is 0.133 rad (7.6 degrees) peak. + This is the transmit-side counterpart to Apollo PM Demod. + + Parameters: + pm_deviation: Peak phase deviation in radians (default 0.133) + sample_rate: Sample rate in Hz (default 5.12 MHz) + +file_format: 1 diff --git a/grc/apollo_usb_signal_source.block.yml b/grc/apollo_usb_signal_source.block.yml new file mode 100644 index 0000000..93a6482 --- /dev/null +++ b/grc/apollo_usb_signal_source.block.yml @@ -0,0 +1,80 @@ +id: apollo_usb_signal_source +label: Apollo USB Signal Source +category: '[Apollo USB]' +flags: [python] + +parameters: +- id: sample_rate + label: Sample Rate (Hz) + dtype: float + default: '5120000' +- id: bit_rate + label: PCM Bit Rate + dtype: int + default: '51200' + options: ['51200', '1600'] + option_labels: ['51.2 kbps (high rate)', '1.6 kbps (low rate)'] +- id: pm_deviation + label: PM Deviation (rad) + dtype: float + default: '0.133' +- id: voice_enabled + label: Voice Subcarrier + dtype: bool + default: 'False' + options: ['True', 'False'] + option_labels: ['Enabled', 'Disabled'] +- id: voice_tone_hz + label: Voice Test Tone (Hz) + dtype: float + default: '1000' +- id: snr_db + label: SNR (dB) + dtype: raw + default: 'None' + +inputs: +- label: frame_data + domain: message + optional: true + +outputs: +- label: out + domain: stream + dtype: complex + +templates: + imports: from apollo.usb_signal_source import usb_signal_source + make: >- + apollo.usb_signal_source.usb_signal_source( + sample_rate=${sample_rate}, + bit_rate=${bit_rate}, + pm_deviation=${pm_deviation}, + voice_enabled=${voice_enabled}, + voice_tone_hz=${voice_tone_hz}, + snr_db=${snr_db}) + +documentation: |- + Apollo USB Signal Source -- complete transmit chain in one block. + + Generates a PM-modulated complex baseband signal containing: + - 1.024 MHz BPSK subcarrier with PCM telemetry frames + - Optional 1.25 MHz FM voice subcarrier (test tone) + - Optional AWGN noise + + This is the transmit-side counterpart to the USB Downlink Receiver. + It mirrors CuriousMarc's bench: individual composable blocks wired + together as one convenience wrapper. + + Message input: + frame_data -- inject custom payload bytes for the next PCM frame + + Parameters: + sample_rate: Output sample rate (default 5.12 MHz) + bit_rate: PCM bit rate -- 51200 (high) or 1600 (low) + pm_deviation: Peak PM deviation in radians (default 0.133) + voice_enabled: Include 1.25 MHz FM voice subcarrier + voice_tone_hz: Voice test tone frequency in Hz + snr_db: Add AWGN noise at this SNR (None = no noise) + +file_format: 1 diff --git a/src/apollo/__init__.py b/src/apollo/__init__.py index 2f6b56a..52a088b 100644 --- a/src/apollo/__init__.py +++ b/src/apollo/__init__.py @@ -1,7 +1,7 @@ """ -gr-apollo: Apollo Unified S-Band decoder for GNU Radio 3.10+ +gr-apollo: Apollo Unified S-Band for GNU Radio 3.10+ -Decodes Apollo-era Unified S-Band (USB) telecommunications: +Receive and transmit Apollo-era Unified S-Band (USB) telecommunications: - 2287.5 MHz downlink with PM/FM modulation - 1.024 MHz BPSK subcarrier (PCM telemetry @ 51.2 kbps) - 1.25 MHz FM subcarrier (voice) @@ -10,21 +10,19 @@ Decodes Apollo-era Unified S-Band (USB) telecommunications: __version__ = "0.1.0" -# Pure-python modules (always available) +# Pure-python modules and engines (always available, no GR dependency) from apollo import constants as constants from apollo import protocol as protocol - -# Pure-python engines (always available, no GR dependency) from apollo.agc_bridge import AGCBridgeClient as AGCBridgeClient from apollo.downlink_decoder import DownlinkEngine as DownlinkEngine from apollo.pcm_demux import DemuxEngine as DemuxEngine +from apollo.pcm_frame_source import FrameSourceEngine as FrameSourceEngine from apollo.pcm_frame_sync import FrameSyncEngine as FrameSyncEngine from apollo.uplink_encoder import UplinkEncoder as UplinkEncoder from apollo.usb_signal_gen import generate_usb_baseband as generate_usb_baseband -# GNU Radio blocks (require gnuradio runtime) -# These are imported lazily to allow the package to be used -# for its pure-python utilities without GNU Radio installed. +# GNU Radio receive-side blocks (require gnuradio runtime) +# Imported lazily so the package works without GNU Radio installed. try: from apollo.bpsk_demod import bpsk_demod as bpsk_demod from apollo.bpsk_subcarrier_demod import bpsk_subcarrier_demod as bpsk_subcarrier_demod @@ -33,8 +31,19 @@ try: from apollo.subcarrier_extract import subcarrier_extract as subcarrier_extract from apollo.voice_subcarrier_demod import voice_subcarrier_demod as voice_subcarrier_demod except ImportError: - pass # GNU Radio not available — Phase 1/3 GR blocks won't be importable + pass # GNU Radio not available — receive-side GR blocks won't be importable +# GNU Radio transmit-side blocks +try: + from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod as bpsk_subcarrier_mod + from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod as fm_voice_subcarrier_mod + from apollo.nrz_encoder import nrz_encoder as nrz_encoder + from apollo.pcm_frame_source import pcm_frame_source as pcm_frame_source + from apollo.pm_mod import pm_mod as pm_mod +except ImportError: + pass # GNU Radio not available — transmit-side GR blocks won't be importable + +# GNU Radio composite blocks (depend on individual blocks above) try: from apollo.agc_bridge import agc_bridge as agc_bridge from apollo.downlink_decoder import downlink_decoder as downlink_decoder @@ -42,5 +51,6 @@ try: from apollo.pcm_frame_sync import pcm_frame_sync as pcm_frame_sync from apollo.uplink_encoder import uplink_encoder as uplink_encoder from apollo.usb_downlink_receiver import usb_downlink_receiver as usb_downlink_receiver + from apollo.usb_signal_source import usb_signal_source as usb_signal_source except (ImportError, NameError): - pass # GNU Radio not available — Phase 2/4/5 GR blocks won't be importable + pass diff --git a/src/apollo/bpsk_subcarrier_mod.py b/src/apollo/bpsk_subcarrier_mod.py new file mode 100644 index 0000000..1b866be --- /dev/null +++ b/src/apollo/bpsk_subcarrier_mod.py @@ -0,0 +1,59 @@ +""" +Apollo BPSK Subcarrier Modulator -- NRZ data onto 1.024 MHz subcarrier. + +The transmit-side counterpart to bpsk_subcarrier_demod. Takes an NRZ baseband +waveform (+1/-1) and modulates it onto a 1.024 MHz cosine subcarrier via +simple multiplication: output(t) = nrz(t) * cos(2*pi*f_sc*t). + +This is Bi-Phase Shift Keying (BPSK): the cosine phase flips 180 degrees +when the NRZ data changes sign. + +On the real spacecraft, a 33522B AWG (or equivalent) generates this +BPSK-modulated subcarrier before summing with the voice subcarrier. + +Reference: IMPLEMENTATION_SPEC.md section 4.2 +""" + +from gnuradio import analog, blocks, gr + +from apollo.constants import PCM_SUBCARRIER_HZ, SAMPLE_RATE_BASEBAND + + +class bpsk_subcarrier_mod(gr.hier_block2): + """BPSK modulator: NRZ float input -> BPSK subcarrier float output.""" + + def __init__( + self, + subcarrier_freq: float = PCM_SUBCARRIER_HZ, + sample_rate: float = SAMPLE_RATE_BASEBAND, + ): + gr.hier_block2.__init__( + self, + "apollo_bpsk_subcarrier_mod", + gr.io_signature(1, 1, gr.sizeof_float), + gr.io_signature(1, 1, gr.sizeof_float), + ) + + self._subcarrier_freq = subcarrier_freq + self._sample_rate = sample_rate + + # 1.024 MHz cosine subcarrier (continuous phase, maintained by sig_source) + self.carrier = analog.sig_source_f( + sample_rate, analog.GR_COS_WAVE, subcarrier_freq, 1.0, 0, + ) + + # Multiply NRZ data by subcarrier + self.mixer = blocks.multiply_ff(1) + + # Connect: input (NRZ) -> mixer port 0, carrier -> mixer port 1 -> output + self.connect(self, (self.mixer, 0)) + self.connect(self.carrier, (self.mixer, 1)) + self.connect(self.mixer, self) + + @property + def subcarrier_freq(self) -> float: + return self._subcarrier_freq + + @property + def sample_rate(self) -> float: + return self._sample_rate diff --git a/src/apollo/fm_voice_subcarrier_mod.py b/src/apollo/fm_voice_subcarrier_mod.py new file mode 100644 index 0000000..bc5de35 --- /dev/null +++ b/src/apollo/fm_voice_subcarrier_mod.py @@ -0,0 +1,93 @@ +""" +Apollo FM Voice Subcarrier Modulator -- generates 1.25 MHz FM test tone. + +Transmit-side counterpart to voice_subcarrier_demod. Generates a sinusoidal +test tone, FM-modulates it onto a 1.25 MHz subcarrier, and outputs the +real-valued subcarrier signal. + +On the real spacecraft, voice audio (300-3000 Hz) drives an FM VCO at 113 kHz, +which is mixed with the 512 kHz master clock and doubled to produce the +1.25 MHz FM subcarrier with +/-29 kHz deviation. + +For testing, an internal sine-wave test tone replaces live audio. + +Reference: IMPLEMENTATION_SPEC.md section 4.2 +""" + +import math + +from gnuradio import analog, blocks, gr + +from apollo.constants import ( + SAMPLE_RATE_BASEBAND, + VOICE_FM_DEVIATION_HZ, + VOICE_SUBCARRIER_HZ, +) + + +class fm_voice_subcarrier_mod(gr.hier_block2): + """Source block: FM-modulated 1.25 MHz voice subcarrier with test tone. + + Outputs: + float -- real-valued FM subcarrier at subcarrier_freq + """ + + def __init__( + self, + sample_rate: float = SAMPLE_RATE_BASEBAND, + subcarrier_freq: float = VOICE_SUBCARRIER_HZ, + fm_deviation: float = VOICE_FM_DEVIATION_HZ, + tone_freq: float = 1000.0, + ): + gr.hier_block2.__init__( + self, + "apollo_fm_voice_subcarrier_mod", + gr.io_signature(0, 0, 0), # source -- no input + gr.io_signature(1, 1, gr.sizeof_float), # float output + ) + + self._sample_rate = sample_rate + self._tone_freq = tone_freq + self._subcarrier_freq = subcarrier_freq + self._fm_deviation = fm_deviation + + # Audio test tone generator + self.tone = analog.sig_source_f( + sample_rate, analog.GR_SIN_WAVE, tone_freq, 1.0, 0, + ) + + # FM modulator: sensitivity = 2*pi*deviation/sample_rate rad/sample + # With unit-amplitude sine input this gives +/-fm_deviation Hz. + fm_sensitivity = 2.0 * math.pi * fm_deviation / sample_rate + self.fm_mod = analog.frequency_modulator_fc(fm_sensitivity) + + # LO at subcarrier frequency for upconversion + self.lo = analog.sig_source_c( + sample_rate, analog.GR_COS_WAVE, subcarrier_freq, 1.0, 0, + ) + + # Mixer: FM baseband x LO -> subcarrier + self.mixer = blocks.multiply_cc(1) + + # Extract real part for float output + self.to_real = blocks.complex_to_real(1) + + # Connect: tone -> FM mod -> mixer(x LO) -> real + self.connect(self.tone, self.fm_mod, (self.mixer, 0)) + self.connect(self.lo, (self.mixer, 1)) + self.connect(self.mixer, self.to_real, self) + + @property + def tone_freq(self) -> float: + """Test tone frequency in Hz.""" + return self._tone_freq + + @property + def subcarrier_freq(self) -> float: + """Subcarrier center frequency in Hz.""" + return self._subcarrier_freq + + @property + def fm_deviation(self) -> float: + """FM deviation in Hz.""" + return self._fm_deviation diff --git a/src/apollo/nrz_encoder.py b/src/apollo/nrz_encoder.py new file mode 100644 index 0000000..3df8dc4 --- /dev/null +++ b/src/apollo/nrz_encoder.py @@ -0,0 +1,61 @@ +""" +Apollo NRZ Encoder -- converts PCM bit stream to NRZ baseband waveform. + +Takes a stream of byte values (0 or 1) from pcm_frame_source and produces +a float waveform at the output sample rate where bit 1 -> +1.0 and +bit 0 -> -1.0, with each bit repeated for samples_per_bit samples. + +This is the transmit-side counterpart to the slicer in bpsk_demod. + +NRZ (Non-Return-to-Zero) encoding maps: + bit 1 -> +1.0 (held for bit period) + bit 0 -> -1.0 (held for bit period) + +Reference: IMPLEMENTATION_SPEC.md section 5.1 +""" +from gnuradio import blocks, gr + +from apollo.constants import PCM_HIGH_BIT_RATE, SAMPLE_RATE_BASEBAND + + +class nrz_encoder(gr.hier_block2): + """NRZ line encoder: byte (0/1) stream -> float (+1/-1) waveform. + + Input: byte stream (values 0 or 1) + Output: float NRZ waveform at sample_rate + """ + + def __init__( + self, + bit_rate: int = PCM_HIGH_BIT_RATE, + sample_rate: float = SAMPLE_RATE_BASEBAND, + ): + gr.hier_block2.__init__( + self, + "apollo_nrz_encoder", + gr.io_signature(1, 1, gr.sizeof_char), + gr.io_signature(1, 1, gr.sizeof_float), + ) + + self._bit_rate = bit_rate + self._sample_rate = sample_rate + samples_per_bit = int(sample_rate / bit_rate) + + # byte (0/1) -> float (0.0/1.0) + self.to_float = blocks.char_to_float(1, 1) + + # float (0.0/1.0) -> float (0.0/2.0) + self.scale = blocks.multiply_const_ff(2.0) + + # float (0.0/2.0) -> float (-1.0/+1.0) + self.offset = blocks.add_const_ff(-1.0) + + # Upsample: repeat each value samples_per_bit times + self.upsample = blocks.repeat(gr.sizeof_float, samples_per_bit) + + # Connect chain + self.connect(self, self.to_float, self.scale, self.offset, self.upsample, self) + + @property + def samples_per_bit(self) -> int: + return int(self._sample_rate / self._bit_rate) diff --git a/src/apollo/pcm_frame_source.py b/src/apollo/pcm_frame_source.py new file mode 100644 index 0000000..fde69cc --- /dev/null +++ b/src/apollo/pcm_frame_source.py @@ -0,0 +1,153 @@ +""" +Apollo PCM Frame Source -- generates a continuous NRZ bit stream of PCM frames. + +The transmit-side counterpart to pcm_frame_sync. Produces a steady stream of +128-word (high rate, 51.2 kbps) or 200-word (low rate, 1.6 kbps) PCM frames, +each beginning with the standard 32-bit sync word: + + [5-bit A][15-bit core][6-bit B][6-bit frame ID] + +Frame IDs cycle 1 through 50 (one subframe), with the 15-bit core complemented +on odd-numbered frames. An optional message input allows dynamic payload +injection; otherwise frames carry zero-fill data. + +The core logic lives in FrameSourceEngine (pure Python, testable without GNU +Radio). The GR sync_block wrapper bridges frame-granularity generation with +GR's sample-granularity scheduler via an internal bit buffer. + +Reference: IMPLEMENTATION_SPEC.md sections 5.1, 5.2 +""" + +from collections import deque + +import numpy as np + +from apollo.constants import ( + PCM_HIGH_BIT_RATE, + PCM_HIGH_WORDS_PER_FRAME, + PCM_LOW_WORDS_PER_FRAME, +) +from apollo.usb_signal_gen import generate_pcm_frame + + +class FrameSourceEngine: + """PCM frame generation engine (pure Python, no GR dependency). + + Maintains a rolling frame counter (1-50) and generates complete frames + on demand via next_frame(). Odd-numbered frames get a complemented + sync core automatically. + + Args: + bit_rate: PCM bit rate in bps (51200 or 1600). + """ + + def __init__(self, bit_rate: int = PCM_HIGH_BIT_RATE): + self.bit_rate = bit_rate + if bit_rate == PCM_HIGH_BIT_RATE: + self.words_per_frame = PCM_HIGH_WORDS_PER_FRAME + else: + self.words_per_frame = PCM_LOW_WORDS_PER_FRAME + + self.frame_counter = 1 + + def next_frame(self, data: bytes | None = None) -> list[int]: + """Generate the next PCM frame as a list of bits (0/1 values, MSB first). + + Args: + data: Optional payload bytes for data words. If None, the frame + carries zero-fill (deterministic, unlike the random fill in + generate_pcm_frame when data=None for signal-gen use). + + Returns: + List of bit values, length = words_per_frame * 8. + """ + frame_id = self.frame_counter + odd = (frame_id % 2) == 1 + + # Default to zero-fill rather than random for a transmit source -- + # downstream blocks and tests need deterministic output. + if data is None: + data = bytes(self.words_per_frame) + + bits = generate_pcm_frame( + frame_id=frame_id, + odd=odd, + data=data, + words_per_frame=self.words_per_frame, + ) + + # Advance counter: 1 -> 2 -> ... -> 50 -> 1 + self.frame_counter = (self.frame_counter % 50) + 1 + + return bits + + +# --------------------------------------------------------------------------- +# GNU Radio block wrapper (optional -- only if gnuradio is available) +# --------------------------------------------------------------------------- + +try: + import pmt + from gnuradio import gr + + class pcm_frame_source(gr.sync_block): + """GNU Radio source block: continuous PCM frame bit stream. + + Outputs a stream of bytes (values 0 or 1) representing NRZ-encoded + PCM telemetry frames. Frame IDs cycle 1-50 automatically. + + An optional ``frame_data`` message input accepts PMT u8vector payloads + that will be used as the data words for the next generated frame. + + Parameters: + bit_rate: 51200 (128 words/frame) or 1600 (200 words/frame). + """ + + def __init__(self, bit_rate: int = PCM_HIGH_BIT_RATE): + gr.sync_block.__init__( + self, + name="apollo_pcm_frame_source", + in_sig=None, + out_sig=[np.byte], + ) + + self._engine = FrameSourceEngine(bit_rate=bit_rate) + self._bit_buffer: deque[int] = deque() + self._pending_data: bytes | None = None + + # Message input for dynamic payload injection + self.message_port_register_in(pmt.intern("frame_data")) + self.set_msg_handler( + pmt.intern("frame_data"), self._handle_frame_data + ) + + def _handle_frame_data(self, msg): + """Store incoming PMT payload bytes for the next frame.""" + if pmt.is_u8vector(msg): + self._pending_data = bytes(pmt.u8vector_elements(msg)) + elif pmt.is_pair(msg): + # Accept PDU (car=meta, cdr=payload) + payload = pmt.cdr(msg) + if pmt.is_u8vector(payload): + self._pending_data = bytes(pmt.u8vector_elements(payload)) + + def work(self, input_items, output_items): + out = output_items[0] + n_out = len(out) + produced = 0 + + while produced < n_out: + if not self._bit_buffer: + frame_bits = self._engine.next_frame(data=self._pending_data) + self._pending_data = None + self._bit_buffer.extend(frame_bits) + + chunk = min(n_out - produced, len(self._bit_buffer)) + for i in range(chunk): + out[produced + i] = self._bit_buffer.popleft() + produced += chunk + + return produced + +except ImportError: + pass diff --git a/src/apollo/pm_mod.py b/src/apollo/pm_mod.py new file mode 100644 index 0000000..377592b --- /dev/null +++ b/src/apollo/pm_mod.py @@ -0,0 +1,53 @@ +""" +Apollo PM Modulator — applies phase modulation to produce complex baseband. + +The transmit-side counterpart to pm_demod. Takes a composite modulating signal +(sum of subcarriers) and produces PM complex baseband: s(t) = exp(j * phi(t)) +where phi(t) = pm_deviation * modulating(t). + +The spacecraft transmitter phase-modulates at 0.133 rad peak deviation (7.6 deg). +At this small deviation, the modulation is essentially linear. + +Reference: IMPLEMENTATION_SPEC.md section 2.3 +""" + +from gnuradio import analog, blocks, gr + +from apollo.constants import PM_PEAK_DEVIATION_RAD, SAMPLE_RATE_BASEBAND + + +class pm_mod(gr.hier_block2): + """Phase modulator: float input -> PM complex baseband output.""" + + def __init__( + self, + pm_deviation: float = PM_PEAK_DEVIATION_RAD, + sample_rate: float = SAMPLE_RATE_BASEBAND, + ): + gr.hier_block2.__init__( + self, + "apollo_pm_mod", + gr.io_signature(1, 1, gr.sizeof_float), + gr.io_signature(1, 1, gr.sizeof_gr_complex), + ) + + self._pm_deviation = pm_deviation + + # Scale input by PM deviation so phase_modulator sees deviation-scaled values + self.gain = blocks.multiply_const_ff(pm_deviation) + + # Phase modulate: output = exp(j * 1.0 * input) + # Sensitivity is 1.0 because we pre-scale by pm_deviation above + self.modulator = analog.phase_modulator_fc(1.0) + + # Connect: input -> gain -> phase_mod -> output + self.connect(self, self.gain, self.modulator, self) + + def get_pm_deviation(self) -> float: + """Return current PM deviation in radians.""" + return self._pm_deviation + + def set_pm_deviation(self, dev: float): + """Update PM deviation at runtime.""" + self._pm_deviation = dev + self.gain.set_k(dev) diff --git a/src/apollo/usb_signal_source.py b/src/apollo/usb_signal_source.py new file mode 100644 index 0000000..fc1aa06 --- /dev/null +++ b/src/apollo/usb_signal_source.py @@ -0,0 +1,146 @@ +""" +Apollo USB Signal Source -- complete transmit chain in one block. + +The transmit-side counterpart to usb_downlink_receiver. Wires together the +full modulation chain: + + pcm_frame_source -> nrz_encoder -> bpsk_subcarrier_mod -+-> add_ff -> pm_mod -> [complex out] + | + fm_voice_subcarrier_mod --------+ + (optional, scaled by 1.68/2.2) + +This mirrors CuriousMarc's physical bench topology: the individual composable +blocks map 1:1 to Keysight instruments (EXG signal generator for PM, two +33522B AWGs for subcarrier modulation). + +For finer control, use the individual blocks directly. + +Reference: IMPLEMENTATION_SPEC.md -- full downlink transmit path +""" + +import math + +from gnuradio import analog, blocks, gr + +from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod +from apollo.constants import ( + PCM_HIGH_BIT_RATE, + PCM_SUBCARRIER_HZ, + PM_PEAK_DEVIATION_RAD, + SAMPLE_RATE_BASEBAND, + VOICE_FM_DEVIATION_HZ, + VOICE_SUBCARRIER_HZ, +) +from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod +from apollo.nrz_encoder import nrz_encoder +from apollo.pcm_frame_source import pcm_frame_source +from apollo.pm_mod import pm_mod + + +class usb_signal_source(gr.hier_block2): + """Apollo USB downlink signal source -- complex baseband output. + + Outputs: + complex -- PM-modulated baseband at sample_rate (default 5.12 MHz) + + Message inputs: + frame_data -- forwarded to pcm_frame_source for dynamic payload injection + + The block generates PCM telemetry frames, NRZ-encodes them, BPSK-modulates + onto a 1.024 MHz subcarrier, optionally adds a 1.25 MHz FM voice subcarrier, + and applies PM modulation to produce complex baseband. + + Optional AWGN noise can be added by setting snr_db to a finite value. + """ + + def __init__( + self, + sample_rate: float = SAMPLE_RATE_BASEBAND, + bit_rate: int = PCM_HIGH_BIT_RATE, + pm_deviation: float = PM_PEAK_DEVIATION_RAD, + voice_enabled: bool = False, + voice_tone_hz: float = 1000.0, + snr_db: float | None = None, + ): + gr.hier_block2.__init__( + self, + "apollo_usb_signal_source", + gr.io_signature(0, 0, 0), # source -- no input + gr.io_signature(1, 1, gr.sizeof_gr_complex), + ) + + self._sample_rate = sample_rate + self._voice_enabled = voice_enabled + + # Forward the frame_data message port from pcm_frame_source + self.message_port_register_hier_in("frame_data") + + # --- PCM telemetry path --- + + # Stage 1: Generate PCM frame bits (0/1 byte stream) + self.frame_src = pcm_frame_source(bit_rate=bit_rate) + + # Forward message port: hier input -> pcm_frame_source + self.msg_connect(self, "frame_data", self.frame_src, "frame_data") + + # Stage 2: NRZ encode (byte 0/1 -> float +1/-1 at sample_rate) + self.nrz = nrz_encoder(bit_rate=bit_rate, sample_rate=sample_rate) + + # Stage 3: BPSK modulate onto 1.024 MHz subcarrier + self.bpsk = bpsk_subcarrier_mod( + subcarrier_freq=PCM_SUBCARRIER_HZ, + sample_rate=sample_rate, + ) + + # Connect PCM chain: frame_src -> nrz -> bpsk + self.connect(self.frame_src, self.nrz, self.bpsk) + + # --- Subcarrier summing --- + + if voice_enabled: + # Voice subcarrier level relative to PCM: + # Per IMPL_SPEC: PCM = 2.2 Vpp, Voice = 1.68 Vpp + # The BPSK subcarrier has unity amplitude, so voice is scaled + # by 1.68/2.2 to maintain the correct power ratio. + voice_scale = 1.68 / 2.2 + + self.voice = fm_voice_subcarrier_mod( + sample_rate=sample_rate, + subcarrier_freq=VOICE_SUBCARRIER_HZ, + fm_deviation=VOICE_FM_DEVIATION_HZ, + tone_freq=voice_tone_hz, + ) + self.voice_gain = blocks.multiply_const_ff(voice_scale) + self.adder = blocks.add_ff(1) + + # PCM subcarrier -> adder port 0 + self.connect(self.bpsk, (self.adder, 0)) + # Voice subcarrier (scaled) -> adder port 1 + self.connect(self.voice, self.voice_gain, (self.adder, 1)) + + composite = self.adder + else: + composite = self.bpsk + + # --- PM modulation --- + + self.pm = pm_mod(pm_deviation=pm_deviation, sample_rate=sample_rate) + self.connect(composite, self.pm) + + # --- Optional AWGN --- + + if snr_db is not None: + # Signal power is 1.0 (PM constant envelope) + noise_power = 1.0 / (10.0 ** (snr_db / 10.0)) + noise_amplitude = math.sqrt(noise_power / 2.0) + + self.noise = analog.noise_source_c( + analog.GR_GAUSSIAN, noise_amplitude, 0, + ) + self.sum_noise = blocks.add_cc(1) + + self.connect(self.pm, (self.sum_noise, 0)) + self.connect(self.noise, (self.sum_noise, 1)) + self.connect(self.sum_noise, self) + else: + self.connect(self.pm, self) diff --git a/tests/test_bpsk_subcarrier_mod.py b/tests/test_bpsk_subcarrier_mod.py new file mode 100644 index 0000000..2275ef2 --- /dev/null +++ b/tests/test_bpsk_subcarrier_mod.py @@ -0,0 +1,197 @@ +"""Tests for the BPSK subcarrier modulator block.""" + +import numpy as np +import pytest + +try: + from gnuradio import blocks, gr + + HAS_GNURADIO = True +except ImportError: + HAS_GNURADIO = False + +from apollo.constants import PCM_SUBCARRIER_HZ, SAMPLE_RATE_BASEBAND + +pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed") + + +class TestBPSKSubcarrierMod: + """Test BPSK subcarrier modulation with synthetic NRZ inputs.""" + + def test_block_instantiation(self): + """Block should instantiate with default parameters.""" + from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod + + mod = bpsk_subcarrier_mod() + assert mod is not None + assert mod.subcarrier_freq == PCM_SUBCARRIER_HZ + assert mod.sample_rate == SAMPLE_RATE_BASEBAND + + def test_constant_positive_input(self): + """All +1.0 input should produce a pure cosine at 1.024 MHz.""" + from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod + + sample_rate = SAMPLE_RATE_BASEBAND + n_samples = 51200 + + tb = gr.top_block() + src = blocks.vector_source_f([1.0] * n_samples) + mod = bpsk_subcarrier_mod( + subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate, + ) + snk = blocks.vector_sink_f() + + tb.connect(src, mod, snk) + tb.run() + + data = np.array(snk.data()) + assert len(data) == n_samples + + # FFT: spectral energy should concentrate at 1.024 MHz + fft_vals = np.fft.fft(data) + freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate) + + pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000) + pcm_power = np.mean(np.abs(fft_vals[pcm_mask]) ** 2) + total_power = np.mean(np.abs(fft_vals) ** 2) + + assert pcm_power > total_power * 0.1, ( + f"PCM band power ({pcm_power:.1f}) is less than 10% of " + f"total power ({total_power:.1f})" + ) + + def test_constant_negative_input(self): + """All -1.0 input should produce -cos (inverted cosine) at 1.024 MHz.""" + from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod + + sample_rate = SAMPLE_RATE_BASEBAND + n_samples = 51200 + + tb = gr.top_block() + src = blocks.vector_source_f([-1.0] * n_samples) + mod = bpsk_subcarrier_mod( + subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate, + ) + snk = blocks.vector_sink_f() + + tb.connect(src, mod, snk) + tb.run() + + data = np.array(snk.data()) + assert len(data) == n_samples + + # Inverted cosine still has energy at 1.024 MHz + fft_vals = np.fft.fft(data) + freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate) + + pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000) + pcm_power = np.mean(np.abs(fft_vals[pcm_mask]) ** 2) + total_power = np.mean(np.abs(fft_vals) ** 2) + + assert pcm_power > total_power * 0.1, ( + f"PCM band power ({pcm_power:.1f}) is less than 10% of " + f"total power ({total_power:.1f})" + ) + + def test_alternating_input_spectrum(self): + """Alternating +1/-1 NRZ should still have spectral peak near subcarrier.""" + from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod + + sample_rate = SAMPLE_RATE_BASEBAND + # Samples per bit at high rate: 5_120_000 / 51_200 = 100 + samples_per_bit = int(sample_rate / 51_200) + n_bits = 512 + n_samples = n_bits * samples_per_bit + + # Build alternating NRZ: +1 for 100 samples, -1 for 100, ... + nrz = [] + for i in range(n_bits): + val = 1.0 if i % 2 == 0 else -1.0 + nrz.extend([val] * samples_per_bit) + + tb = gr.top_block() + src = blocks.vector_source_f(nrz) + mod = bpsk_subcarrier_mod( + subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate, + ) + snk = blocks.vector_sink_f() + + tb.connect(src, mod, snk) + tb.run() + + data = np.array(snk.data()) + assert len(data) == n_samples + + # BPSK with alternating data spreads energy around subcarrier +/- bit_rate, + # but the band near 1.024 MHz should still carry significant power + fft_vals = np.fft.fft(data) + freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate) + + pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000) + pcm_power = np.mean(np.abs(fft_vals[pcm_mask]) ** 2) + total_power = np.mean(np.abs(fft_vals) ** 2) + + assert pcm_power > total_power * 0.1, ( + f"PCM band power ({pcm_power:.1f}) is less than 10% of " + f"total power ({total_power:.1f})" + ) + + def test_amplitude_bounded(self): + """Output amplitude should be <= 1.0 (product of +/-1 and cos).""" + from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod + + sample_rate = SAMPLE_RATE_BASEBAND + n_samples = 51200 + + tb = gr.top_block() + src = blocks.vector_source_f([1.0] * n_samples) + mod = bpsk_subcarrier_mod( + subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate, + ) + snk = blocks.vector_sink_f() + + tb.connect(src, mod, snk) + tb.run() + + data = np.array(snk.data()) + peak = np.max(np.abs(data)) + + # cos has peak 1.0, NRZ is +/-1.0, product peak should be ~1.0 + assert peak <= 1.0 + 1e-6, ( + f"Output peak amplitude {peak:.6f} exceeds 1.0" + ) + assert peak > 0.9, ( + f"Output peak amplitude {peak:.6f} is suspiciously low" + ) + + def test_custom_subcarrier_freq(self): + """Custom subcarrier frequency should shift spectral peak.""" + from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod + + sample_rate = SAMPLE_RATE_BASEBAND + n_samples = 51200 + custom_freq = 500_000 # 500 kHz + + tb = gr.top_block() + src = blocks.vector_source_f([1.0] * n_samples) + mod = bpsk_subcarrier_mod( + subcarrier_freq=custom_freq, sample_rate=sample_rate, + ) + snk = blocks.vector_sink_f() + + tb.connect(src, mod, snk) + tb.run() + + data = np.array(snk.data()) + fft_vals = np.fft.fft(data) + freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate) + + # Energy should be near 500 kHz, not 1.024 MHz + custom_mask = (np.abs(freqs) > 450_000) & (np.abs(freqs) < 550_000) + custom_power = np.mean(np.abs(fft_vals[custom_mask]) ** 2) + total_power = np.mean(np.abs(fft_vals) ** 2) + + assert custom_power > total_power * 0.1, ( + f"Custom freq band power ({custom_power:.1f}) is less than 10% of " + f"total power ({total_power:.1f})" + ) diff --git a/tests/test_fm_voice_subcarrier_mod.py b/tests/test_fm_voice_subcarrier_mod.py new file mode 100644 index 0000000..ce0b71d --- /dev/null +++ b/tests/test_fm_voice_subcarrier_mod.py @@ -0,0 +1,149 @@ +"""Tests for the FM voice subcarrier modulator block.""" + +import numpy as np +import pytest + +try: + from gnuradio import blocks, gr + + HAS_GNURADIO = True +except ImportError: + HAS_GNURADIO = False + +from apollo.constants import ( + SAMPLE_RATE_BASEBAND, + VOICE_SUBCARRIER_HZ, +) + +pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed") + + +class TestFmVoiceModInstantiation: + """Test block creation and parameter handling.""" + + def test_default_parameters(self): + """Block should instantiate with default parameters.""" + from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod + + mod = fm_voice_subcarrier_mod() + assert mod is not None + + def test_custom_tone_freq(self): + """Block should accept a custom tone frequency and produce output.""" + from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod + + sample_rate = SAMPLE_RATE_BASEBAND + n_samples = 10240 + + tb = gr.top_block() + src = fm_voice_subcarrier_mod(sample_rate=sample_rate, tone_freq=2000.0) + head = blocks.head(gr.sizeof_float, n_samples) + snk = blocks.vector_sink_f() + tb.connect(src, head, snk) + tb.run() + + data = np.array(snk.data()) + assert len(data) == n_samples + assert np.any(data != 0), "Output is all zeros with tone_freq=2000" + + def test_properties(self): + """Properties should reflect constructor arguments.""" + from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod + + mod = fm_voice_subcarrier_mod( + tone_freq=1500.0, + subcarrier_freq=1_000_000, + fm_deviation=20_000, + ) + assert mod.tone_freq == 1500.0 + assert mod.subcarrier_freq == 1_000_000 + assert mod.fm_deviation == 20_000 + + +class TestFmVoiceModFunctional: + """Functional tests with signal analysis.""" + + def test_produces_output(self): + """Source block should produce non-zero float output.""" + from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod + + sample_rate = SAMPLE_RATE_BASEBAND + n_samples = 51200 + + tb = gr.top_block() + src = fm_voice_subcarrier_mod(sample_rate=sample_rate) + head = blocks.head(gr.sizeof_float, n_samples) + snk = blocks.vector_sink_f() + tb.connect(src, head, snk) + tb.run() + + data = np.array(snk.data()) + assert len(data) == n_samples, f"Expected {n_samples} samples, got {len(data)}" + assert np.any(data != 0), "Output is all zeros" + + def test_output_is_float(self): + """Output samples should be real-valued floats.""" + from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod + + sample_rate = SAMPLE_RATE_BASEBAND + n_samples = 1024 + + tb = gr.top_block() + src = fm_voice_subcarrier_mod(sample_rate=sample_rate) + head = blocks.head(gr.sizeof_float, n_samples) + snk = blocks.vector_sink_f() + tb.connect(src, head, snk) + tb.run() + + data = np.array(snk.data()) + assert data.dtype in (np.float32, np.float64), ( + f"Expected float output, got {data.dtype}" + ) + + def test_spectral_energy_at_subcarrier(self): + """Most spectral energy should be near the 1.25 MHz subcarrier.""" + from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod + + sample_rate = SAMPLE_RATE_BASEBAND + n_samples = 51200 # ~10 ms + + tb = gr.top_block() + src = fm_voice_subcarrier_mod(sample_rate=sample_rate) + head = blocks.head(gr.sizeof_float, n_samples) + snk = blocks.vector_sink_f() + tb.connect(src, head, snk) + tb.run() + + data = np.array(snk.data()) + fft_vals = np.fft.fft(data) + freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate) + + # Energy in the 1.2-1.3 MHz band (subcarrier +/- deviation margin) + voice_mask = (np.abs(freqs) > 1_200_000) & (np.abs(freqs) < 1_300_000) + voice_power = np.mean(np.abs(fft_vals[voice_mask]) ** 2) + total_power = np.mean(np.abs(fft_vals) ** 2) + + assert voice_power > total_power * 0.1, ( + f"Subcarrier band power ({voice_power:.1f}) is less than 10% of " + f"total power ({total_power:.1f})" + ) + + def test_output_bounded(self): + """Output amplitude should stay bounded (not blow up).""" + from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod + + sample_rate = SAMPLE_RATE_BASEBAND + n_samples = 51200 + + tb = gr.top_block() + src = fm_voice_subcarrier_mod(sample_rate=sample_rate) + head = blocks.head(gr.sizeof_float, n_samples) + snk = blocks.vector_sink_f() + tb.connect(src, head, snk) + tb.run() + + data = np.array(snk.data()) + peak = np.max(np.abs(data)) + # FM on a cosine carrier: peak should be around 1.0, certainly < 2.0 + assert peak < 2.0, f"Output peak amplitude {peak:.3f} exceeds expected bound" + assert peak > 0.1, f"Output peak amplitude {peak:.3f} is suspiciously low" diff --git a/tests/test_loopback.py b/tests/test_loopback.py new file mode 100644 index 0000000..c42eb59 --- /dev/null +++ b/tests/test_loopback.py @@ -0,0 +1,144 @@ +"""Loopback test: usb_signal_source -> usb_downlink_receiver round-trip. + +The ultimate validation -- generates a PM-modulated signal with known PCM +frames using the transmit chain, feeds it through the complete receive chain, +and verifies that frames are recovered correctly. + +This exercises every block in both the transmit and receive paths: + + TX: pcm_frame_source -> nrz_encoder -> bpsk_subcarrier_mod -> pm_mod + RX: pm_demod -> subcarrier_extract -> bpsk_demod -> pcm_frame_sync -> pcm_demux +""" + +import numpy as np +import pytest + +try: + from gnuradio import blocks, gr + + HAS_GNURADIO = True +except ImportError: + HAS_GNURADIO = False + +from apollo.constants import ( + PCM_HIGH_BIT_RATE, + PCM_HIGH_WORDS_PER_FRAME, + PCM_WORD_LENGTH, + SAMPLE_RATE_BASEBAND, +) + +pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed") + + +class TestLoopback: + """Round-trip: transmit -> receive -> verify.""" + + def test_loopback_recovers_frames(self): + """TX signal source -> RX downlink receiver should produce frame PDUs.""" + from apollo.usb_downlink_receiver import usb_downlink_receiver + from apollo.usb_signal_source import usb_signal_source + + # Generate enough samples for several frames so the receiver PLL can settle. + # At 51.2 kbps high rate, one frame = 1024 bits = 102400 samples. + # Give the receiver 8 frames worth (~0.16 seconds). + bits_per_frame = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH + samples_per_frame = int(bits_per_frame * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE) + n_frames = 8 + n_samples = n_frames * samples_per_frame + + tb = gr.top_block() + + # Transmit chain (clean, no noise) + tx = usb_signal_source( + sample_rate=SAMPLE_RATE_BASEBAND, + bit_rate=PCM_HIGH_BIT_RATE, + snr_db=None, + ) + head = blocks.head(gr.sizeof_gr_complex, n_samples) + + # Receive chain + rx = usb_downlink_receiver( + sample_rate=SAMPLE_RATE_BASEBAND, + bit_rate=PCM_HIGH_BIT_RATE, + ) + snk = blocks.message_debug() + + tb.connect(tx, head, rx) + tb.msg_connect(rx, "frames", snk, "store") + tb.run() + + n_recovered = snk.num_messages() + # The receiver needs ~1-2 frames for PLL settling, so we expect + # at least a few frames from 8 transmitted. + assert n_recovered >= 1, ( + f"Loopback recovered {n_recovered} frames from {n_frames} transmitted, " + f"expected >= 1" + ) + + def test_loopback_frame_structure(self): + """Recovered frames should have valid sync word structure.""" + from apollo.pcm_demux import DemuxEngine + from apollo.usb_downlink_receiver import usb_downlink_receiver + from apollo.usb_signal_source import usb_signal_source + + import pmt + + bits_per_frame = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH + samples_per_frame = int(bits_per_frame * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE) + n_samples = 8 * samples_per_frame + + tb = gr.top_block() + tx = usb_signal_source(snr_db=None) + head = blocks.head(gr.sizeof_gr_complex, n_samples) + rx = usb_downlink_receiver(output_format="raw") + snk = blocks.message_debug() + + tb.connect(tx, head, rx) + tb.msg_connect(rx, "frames", snk, "store") + tb.run() + + n_recovered = snk.num_messages() + if n_recovered == 0: + pytest.skip("No frames recovered in loopback -- PLL may need tuning") + + # Validate first recovered frame through the demux engine + msg = snk.get_message(0) + if pmt.is_pair(msg): + payload = pmt.cdr(msg) + else: + payload = msg + + frame_bytes = bytes(pmt.u8vector_elements(payload)) + + demux = DemuxEngine(output_format="raw") + result = demux.process_frame(frame_bytes) + + assert "sync" in result + assert "words" in result + assert result["sync"]["frame_id"] >= 1 + assert result["sync"]["frame_id"] <= 50 + + def test_loopback_with_noise(self): + """Loopback at 30 dB SNR should still recover frames.""" + from apollo.usb_downlink_receiver import usb_downlink_receiver + from apollo.usb_signal_source import usb_signal_source + + bits_per_frame = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH + samples_per_frame = int(bits_per_frame * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE) + n_samples = 10 * samples_per_frame # more frames for noisy recovery + + tb = gr.top_block() + tx = usb_signal_source(snr_db=30.0) + head = blocks.head(gr.sizeof_gr_complex, n_samples) + rx = usb_downlink_receiver() + snk = blocks.message_debug() + + tb.connect(tx, head, rx) + tb.msg_connect(rx, "frames", snk, "store") + tb.run() + + n_recovered = snk.num_messages() + # At 30 dB SNR with 10 frames, should get at least 1 + assert n_recovered >= 1, ( + f"Noisy loopback recovered {n_recovered} frames, expected >= 1" + ) diff --git a/tests/test_nrz_encoder.py b/tests/test_nrz_encoder.py new file mode 100644 index 0000000..b910638 --- /dev/null +++ b/tests/test_nrz_encoder.py @@ -0,0 +1,135 @@ +"""Tests for the NRZ encoder block.""" + +import numpy as np +import pytest + +try: + from gnuradio import blocks, gr + + HAS_GNURADIO = True +except ImportError: + HAS_GNURADIO = False + +from apollo.constants import PCM_HIGH_BIT_RATE, SAMPLE_RATE_BASEBAND + +pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed") + + +class TestNRZEncoder: + """Test NRZ encoding of bit streams to baseband waveforms.""" + + def test_bit_one_maps_to_positive(self): + """A single 1-bit should produce +1.0 repeated for samples_per_bit.""" + from apollo.nrz_encoder import nrz_encoder + + tb = gr.top_block() + samples_per_bit = int(SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE) + + src = blocks.vector_source_b([1]) + enc = nrz_encoder(bit_rate=PCM_HIGH_BIT_RATE, sample_rate=SAMPLE_RATE_BASEBAND) + snk = blocks.vector_sink_f() + + tb.connect(src, enc, snk) + tb.run() + + output = np.array(snk.data()) + assert len(output) == samples_per_bit + np.testing.assert_allclose(output, 1.0, atol=1e-6) + + def test_bit_zero_maps_to_negative(self): + """A single 0-bit should produce -1.0 repeated for samples_per_bit.""" + from apollo.nrz_encoder import nrz_encoder + + tb = gr.top_block() + samples_per_bit = int(SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE) + + src = blocks.vector_source_b([0]) + enc = nrz_encoder(bit_rate=PCM_HIGH_BIT_RATE, sample_rate=SAMPLE_RATE_BASEBAND) + snk = blocks.vector_sink_f() + + tb.connect(src, enc, snk) + tb.run() + + output = np.array(snk.data()) + assert len(output) == samples_per_bit + np.testing.assert_allclose(output, -1.0, atol=1e-6) + + def test_alternating_bits(self): + """Alternating [1,0,1,0] should produce +1*N, -1*N, +1*N, -1*N.""" + from apollo.nrz_encoder import nrz_encoder + + tb = gr.top_block() + samples_per_bit = int(SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE) + bits = [1, 0, 1, 0] + + src = blocks.vector_source_b(bits) + enc = nrz_encoder(bit_rate=PCM_HIGH_BIT_RATE, sample_rate=SAMPLE_RATE_BASEBAND) + snk = blocks.vector_sink_f() + + tb.connect(src, enc, snk) + tb.run() + + output = np.array(snk.data()) + expected_levels = [1.0, -1.0, 1.0, -1.0] + + for i, level in enumerate(expected_levels): + start = i * samples_per_bit + end = (i + 1) * samples_per_bit + segment = output[start:end] + np.testing.assert_allclose( + segment, level, atol=1e-6, + err_msg=f"Bit {i} (value {bits[i]}): expected {level}", + ) + + def test_output_length(self): + """4 bits at 51200/5120000 (100 samp/bit) should produce 400 samples.""" + from apollo.nrz_encoder import nrz_encoder + + tb = gr.top_block() + n_bits = 4 + samples_per_bit = int(SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE) # 100 + + src = blocks.vector_source_b([1, 0, 1, 1]) + enc = nrz_encoder(bit_rate=PCM_HIGH_BIT_RATE, sample_rate=SAMPLE_RATE_BASEBAND) + snk = blocks.vector_sink_f() + + tb.connect(src, enc, snk) + tb.run() + + output = np.array(snk.data()) + assert len(output) == n_bits * samples_per_bit + + def test_upsampling_ratio(self): + """Each NRZ level should be held for exactly samples_per_bit samples.""" + from apollo.nrz_encoder import nrz_encoder + + tb = gr.top_block() + # Use a different rate pair to verify generality: 1600 bps at 5.12 MHz + # gives 3200 samples per bit + bit_rate = 1600 + sample_rate = SAMPLE_RATE_BASEBAND + samples_per_bit = int(sample_rate / bit_rate) # 3200 + bits = [1, 0] + + src = blocks.vector_source_b(bits) + enc = nrz_encoder(bit_rate=bit_rate, sample_rate=sample_rate) + snk = blocks.vector_sink_f() + + tb.connect(src, enc, snk) + tb.run() + + output = np.array(snk.data()) + assert len(output) == len(bits) * samples_per_bit + + # First bit (1) -> +1.0 held for samples_per_bit + np.testing.assert_allclose(output[:samples_per_bit], 1.0, atol=1e-6) + # Second bit (0) -> -1.0 held for samples_per_bit + np.testing.assert_allclose(output[samples_per_bit:], -1.0, atol=1e-6) + + def test_block_instantiation(self): + """Block should instantiate with default parameters.""" + from apollo.nrz_encoder import nrz_encoder + + enc = nrz_encoder() + assert enc is not None + assert enc.samples_per_bit == int(SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE) diff --git a/tests/test_pcm_frame_source.py b/tests/test_pcm_frame_source.py new file mode 100644 index 0000000..6780ee7 --- /dev/null +++ b/tests/test_pcm_frame_source.py @@ -0,0 +1,196 @@ +"""Tests for the PCM frame source block.""" + +import numpy as np +import pytest + +try: + from gnuradio import blocks, gr + + HAS_GNURADIO = True +except ImportError: + HAS_GNURADIO = False + +from apollo.constants import ( + PCM_HIGH_WORDS_PER_FRAME, + PCM_LOW_WORDS_PER_FRAME, + PCM_SYNC_WORD_LENGTH, + PCM_WORD_LENGTH, + SUBFRAME_FRAMES, +) +from apollo.pcm_frame_source import FrameSourceEngine +from apollo.protocol import bits_to_sync_word, parse_sync_word + + +class TestFrameSourceEngine: + """Test the pure-Python frame generation engine (no GR needed).""" + + def test_frame_length(self): + """High-rate frame should be 128 words * 8 bits = 1024 bits.""" + engine = FrameSourceEngine(bit_rate=51200) + bits = engine.next_frame() + assert len(bits) == PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH + + def test_frame_length_low_rate(self): + """Low-rate frame should be 200 words * 8 bits = 1600 bits.""" + engine = FrameSourceEngine(bit_rate=1600) + bits = engine.next_frame() + assert len(bits) == PCM_LOW_WORDS_PER_FRAME * PCM_WORD_LENGTH + + def test_bits_are_binary(self): + """Every output value should be 0 or 1.""" + engine = FrameSourceEngine() + bits = engine.next_frame() + assert all(b in (0, 1) for b in bits) + + def test_frame_counter_wraps(self): + """Frame counter should cycle 1 -> 50 -> 1.""" + engine = FrameSourceEngine() + assert engine.frame_counter == 1 + + # Generate 50 frames (one full subframe) + for expected_id in range(1, SUBFRAME_FRAMES + 1): + assert engine.frame_counter == expected_id + engine.next_frame() + + # Should wrap back to 1 + assert engine.frame_counter == 1 + + # One more frame to confirm it keeps going + engine.next_frame() + assert engine.frame_counter == 2 + + def test_frame_id_in_sync_word(self): + """The 6-bit frame ID field in the sync word should match the counter.""" + engine = FrameSourceEngine() + for expected_id in range(1, 6): + bits = engine.next_frame() + sync_word = bits_to_sync_word(bits[:PCM_SYNC_WORD_LENGTH]) + parsed = parse_sync_word(sync_word) + assert parsed["frame_id"] == expected_id + + def test_odd_even_sync(self): + """Odd frames should have complemented sync core vs even frames.""" + engine = FrameSourceEngine() + + # Frame 1 (odd) and frame 2 (even) should differ in the core field + bits_1 = engine.next_frame() + bits_2 = engine.next_frame() + + sync_1 = bits_to_sync_word(bits_1[:PCM_SYNC_WORD_LENGTH]) + sync_2 = bits_to_sync_word(bits_2[:PCM_SYNC_WORD_LENGTH]) + + parsed_1 = parse_sync_word(sync_1) + parsed_2 = parse_sync_word(sync_2) + + # Cores should be bitwise complements (within 15 bits) + assert (parsed_1["core"] ^ parsed_2["core"]) == 0x7FFF + + def test_custom_payload(self): + """Injected data bytes should appear in the data portion of the frame.""" + engine = FrameSourceEngine() + payload = bytes([0xAA, 0x55, 0xDE, 0xAD]) + bits = engine.next_frame(data=payload) + + # Data starts after the 32-bit sync word + data_start = PCM_SYNC_WORD_LENGTH + for byte_idx, expected_byte in enumerate(payload): + byte_bits = bits[data_start + byte_idx * 8 : data_start + (byte_idx + 1) * 8] + recovered = 0 + for b in byte_bits: + recovered = (recovered << 1) | b + assert recovered == expected_byte, ( + f"Byte {byte_idx}: expected 0x{expected_byte:02x}, got 0x{recovered:02x}" + ) + + def test_default_zero_fill(self): + """Without explicit data, payload should be zero-filled.""" + engine = FrameSourceEngine() + bits = engine.next_frame() + + # All data bits after sync should be zero + data_bits = bits[PCM_SYNC_WORD_LENGTH:] + assert all(b == 0 for b in data_bits) + + +@pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed") +class TestPCMFrameSourceBlock: + """Test the GNU Radio sync_block wrapper.""" + + def test_block_instantiation(self): + """Block should instantiate with default parameters.""" + from apollo.pcm_frame_source import pcm_frame_source + + src = pcm_frame_source() + assert src is not None + + def test_produces_output(self): + """Source should produce a stream of 0s and 1s.""" + from apollo.pcm_frame_source import pcm_frame_source + + tb = gr.top_block() + n_samples = 2048 + + src = pcm_frame_source(bit_rate=51200) + head = blocks.head(gr.sizeof_char, n_samples) + snk = blocks.vector_sink_b() + + tb.connect(src, head, snk) + tb.run() + + data = np.array(snk.data(), dtype=np.uint8) + assert len(data) == n_samples + # All values should be 0 or 1 + assert np.all((data == 0) | (data == 1)) + + def test_frame_boundary(self): + """Getting exactly one frame's worth of bits should work.""" + from apollo.pcm_frame_source import pcm_frame_source + + tb = gr.top_block() + frame_bits = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH + + src = pcm_frame_source(bit_rate=51200) + head = blocks.head(gr.sizeof_char, frame_bits) + snk = blocks.vector_sink_b() + + tb.connect(src, head, snk) + tb.run() + + data = snk.data() + assert len(data) == frame_bits + + def test_continuous_stream(self): + """Multiple frames should produce the expected total length.""" + from apollo.pcm_frame_source import pcm_frame_source + + tb = gr.top_block() + n_frames = 5 + frame_bits = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH + total_bits = n_frames * frame_bits + + src = pcm_frame_source(bit_rate=51200) + head = blocks.head(gr.sizeof_char, total_bits) + snk = blocks.vector_sink_b() + + tb.connect(src, head, snk) + tb.run() + + data = snk.data() + assert len(data) == total_bits + + def test_low_rate(self): + """Low-rate source should produce 200-word frames.""" + from apollo.pcm_frame_source import pcm_frame_source + + tb = gr.top_block() + frame_bits = PCM_LOW_WORDS_PER_FRAME * PCM_WORD_LENGTH + + src = pcm_frame_source(bit_rate=1600) + head = blocks.head(gr.sizeof_char, frame_bits) + snk = blocks.vector_sink_b() + + tb.connect(src, head, snk) + tb.run() + + data = snk.data() + assert len(data) == frame_bits diff --git a/tests/test_pm_mod.py b/tests/test_pm_mod.py new file mode 100644 index 0000000..15d23cd --- /dev/null +++ b/tests/test_pm_mod.py @@ -0,0 +1,146 @@ +"""Tests for the PM modulator block.""" + +import numpy as np +import pytest + +try: + from gnuradio import blocks, gr + + HAS_GNURADIO = True +except ImportError: + HAS_GNURADIO = False + +from apollo.constants import PM_PEAK_DEVIATION_RAD, SAMPLE_RATE_BASEBAND + +pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed") + + +class TestPMMod: + """Test PM modulation with synthetic signals.""" + + def test_zero_input_constant_envelope(self): + """Zero input should produce exp(j*0) = 1+0j (unit carrier).""" + from apollo.pm_mod import pm_mod + + tb = gr.top_block() + n_samples = 10000 + + data = [0.0] * n_samples + src = blocks.vector_source_f(data) + mod = pm_mod(pm_deviation=PM_PEAK_DEVIATION_RAD, sample_rate=SAMPLE_RATE_BASEBAND) + snk = blocks.vector_sink_c() + + tb.connect(src, mod, snk) + tb.run() + + output = np.array(snk.data()) + assert len(output) == n_samples + + # Magnitude should be 1.0 (constant envelope) + magnitudes = np.abs(output) + np.testing.assert_allclose(magnitudes, 1.0, atol=1e-6) + + # Phase should be 0 (no modulation) + phases = np.angle(output) + np.testing.assert_allclose(phases, 0.0, atol=1e-6) + + def test_sine_input_phase_deviation(self): + """Sine wave input should produce phase swinging +/- pm_deviation.""" + from apollo.pm_mod import pm_mod + + tb = gr.top_block() + n_samples = 100000 + sample_rate = SAMPLE_RATE_BASEBAND + + # Unit-amplitude sine at 10 kHz as modulating signal + t = np.arange(n_samples, dtype=np.float64) / sample_rate + tone_freq = 10000.0 + modulating = np.sin(2 * np.pi * tone_freq * t).astype(np.float32) + + src = blocks.vector_source_f(modulating.tolist()) + mod = pm_mod(pm_deviation=PM_PEAK_DEVIATION_RAD, sample_rate=sample_rate) + snk = blocks.vector_sink_c() + + tb.connect(src, mod, snk) + tb.run() + + output = np.array(snk.data()) + phases = np.angle(output) + + # Peak phase should be approximately pm_deviation + peak_phase = np.max(np.abs(phases)) + assert peak_phase == pytest.approx(PM_PEAK_DEVIATION_RAD, abs=0.01), ( + f"Peak phase {peak_phase} doesn't match deviation {PM_PEAK_DEVIATION_RAD}" + ) + + def test_constant_envelope(self): + """PM output should always have |s(t)| = 1.0 regardless of input.""" + from apollo.pm_mod import pm_mod + + tb = gr.top_block() + n_samples = 50000 + sample_rate = SAMPLE_RATE_BASEBAND + + # Arbitrary varying input: sum of two tones + t = np.arange(n_samples, dtype=np.float64) / sample_rate + modulating = ( + 0.7 * np.sin(2 * np.pi * 5000 * t) + 0.3 * np.cos(2 * np.pi * 20000 * t) + ).astype(np.float32) + + src = blocks.vector_source_f(modulating.tolist()) + mod = pm_mod(pm_deviation=PM_PEAK_DEVIATION_RAD, sample_rate=sample_rate) + snk = blocks.vector_sink_c() + + tb.connect(src, mod, snk) + tb.run() + + output = np.array(snk.data()) + magnitudes = np.abs(output) + np.testing.assert_allclose(magnitudes, 1.0, atol=1e-6) + + def test_custom_deviation(self): + """Custom pm_deviation should scale output phase accordingly.""" + from apollo.pm_mod import pm_mod + + tb = gr.top_block() + n_samples = 100000 + sample_rate = SAMPLE_RATE_BASEBAND + custom_dev = 0.5 + + # Unit-amplitude sine + t = np.arange(n_samples, dtype=np.float64) / sample_rate + tone_freq = 10000.0 + modulating = np.sin(2 * np.pi * tone_freq * t).astype(np.float32) + + src = blocks.vector_source_f(modulating.tolist()) + mod = pm_mod(pm_deviation=custom_dev, sample_rate=sample_rate) + snk = blocks.vector_sink_c() + + tb.connect(src, mod, snk) + tb.run() + + output = np.array(snk.data()) + phases = np.angle(output) + + peak_phase = np.max(np.abs(phases)) + assert peak_phase == pytest.approx(custom_dev, abs=0.02), ( + f"Peak phase {peak_phase} doesn't match custom deviation {custom_dev}" + ) + + def test_block_instantiation(self): + """Block should instantiate with default parameters.""" + from apollo.pm_mod import pm_mod + + mod = pm_mod() + assert mod is not None + assert mod.get_pm_deviation() == PM_PEAK_DEVIATION_RAD + + def test_set_pm_deviation(self): + """Runtime deviation update should take effect.""" + from apollo.pm_mod import pm_mod + + mod = pm_mod() + assert mod.get_pm_deviation() == PM_PEAK_DEVIATION_RAD + + mod.set_pm_deviation(0.25) + assert mod.get_pm_deviation() == 0.25 diff --git a/tests/test_usb_signal_source.py b/tests/test_usb_signal_source.py new file mode 100644 index 0000000..c55ca4b --- /dev/null +++ b/tests/test_usb_signal_source.py @@ -0,0 +1,113 @@ +"""Tests for the USB signal source (complete transmit chain).""" + +import numpy as np +import pytest + +try: + from gnuradio import blocks, gr + + HAS_GNURADIO = True +except ImportError: + HAS_GNURADIO = False + +from apollo.constants import ( + PCM_HIGH_BIT_RATE, + PCM_HIGH_WORDS_PER_FRAME, + PCM_WORD_LENGTH, + SAMPLE_RATE_BASEBAND, +) + +pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed") + + +class TestUSBSignalSource: + """Test the convenience transmit wrapper.""" + + def _get_samples(self, n_samples, **kwargs): + """Helper: run usb_signal_source and return complex samples.""" + from apollo.usb_signal_source import usb_signal_source + + tb = gr.top_block() + src = usb_signal_source(**kwargs) + head = blocks.head(gr.sizeof_gr_complex, n_samples) + snk = blocks.vector_sink_c() + + tb.connect(src, head, snk) + tb.run() + + return np.array(snk.data()) + + def test_block_instantiation(self): + """Block should instantiate with default parameters.""" + from apollo.usb_signal_source import usb_signal_source + + src = usb_signal_source() + assert src is not None + + def test_produces_complex_output(self): + """Output should be complex-valued samples.""" + n_samples = 51200 # ~10ms worth + data = self._get_samples(n_samples) + assert len(data) == n_samples + assert data.dtype == np.complex128 or data.dtype == np.complex64 + + def test_constant_envelope(self): + """PM signal without noise should have near-constant envelope.""" + n_samples = 102400 # 1 frame worth + data = self._get_samples(n_samples, snr_db=None) + envelope = np.abs(data) + # PM output: |exp(j*phi)| = 1.0 always + np.testing.assert_allclose(envelope, 1.0, atol=1e-4) + + def test_spectral_content_pcm(self): + """FFT of demodulated phase should show energy at 1.024 MHz.""" + n_samples = 102400 + data = self._get_samples(n_samples, snr_db=None) + + # Extract phase (equivalent to PM demod) + phase = np.angle(data) + fft = np.fft.fft(phase) + freqs = np.fft.fftfreq(len(phase), 1.0 / SAMPLE_RATE_BASEBAND) + + # Energy near 1.024 MHz + pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000) + pcm_power = np.mean(np.abs(fft[pcm_mask]) ** 2) + total_power = np.mean(np.abs(fft) ** 2) + assert pcm_power > total_power * 0.01 + + def test_with_voice(self): + """With voice enabled, output should still be constant envelope.""" + n_samples = 51200 + data = self._get_samples(n_samples, voice_enabled=True, snr_db=None) + envelope = np.abs(data) + np.testing.assert_allclose(envelope, 1.0, atol=1e-4) + + def test_with_noise(self): + """With noise, envelope should vary (not constant).""" + n_samples = 51200 + data = self._get_samples(n_samples, snr_db=10.0) + envelope = np.abs(data) + # With noise, std(envelope) should be > 0 + assert np.std(envelope) > 0.01 + + def test_voice_spectral_content(self): + """With voice, phase should contain 1.25 MHz energy.""" + n_samples = 102400 + data = self._get_samples(n_samples, voice_enabled=True, snr_db=None) + + phase = np.angle(data) + fft = np.fft.fft(phase) + freqs = np.fft.fftfreq(len(phase), 1.0 / SAMPLE_RATE_BASEBAND) + + # Energy near 1.25 MHz + voice_mask = (np.abs(freqs) > 1_200_000) & (np.abs(freqs) < 1_300_000) + voice_power = np.mean(np.abs(fft[voice_mask]) ** 2) + assert voice_power > 0 + + def test_frame_duration(self): + """One frame at 51.2 kbps should produce the right number of samples.""" + bits_per_frame = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH + samples_per_frame = int(bits_per_frame * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE) + + data = self._get_samples(samples_per_frame) + assert len(data) == samples_per_frame