From cd3a8cc6be80ca2c416c90fe79e47ededba1a404 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Sun, 22 Feb 2026 13:01:48 -0700 Subject: [PATCH] Add SCO modulator, external audio input, and demo scripts - sco_mod: 9-channel FM subcarrier oscillator modulator (inverse of sco_demod), with round-trip tests proving voltage recovery across all channels - fm_voice_subcarrier_mod: add audio_input parameter to accept external float streams (e.g., Apollo mission voice recordings) instead of internal test tone - loopback_demo.py: streaming TX->RX round-trip with CLI for noise/voice/frames - agc_loopback_demo.py: full Virtual AGC integration via TCP bridge --- examples/agc_loopback_demo.py | 186 +++++++++++++++++++ examples/loopback_demo.py | 133 +++++++++++++ grc/apollo_fm_voice_subcarrier_mod.block.yml | 32 +++- grc/apollo_sco_mod.block.yml | 58 ++++++ src/apollo/__init__.py | 1 + src/apollo/fm_voice_subcarrier_mod.py | 54 ++++-- src/apollo/sco_mod.py | 124 +++++++++++++ tests/test_fm_voice_subcarrier_mod.py | 106 ++++++++++- tests/test_sco_mod.py | 178 ++++++++++++++++++ 9 files changed, 845 insertions(+), 27 deletions(-) create mode 100644 examples/agc_loopback_demo.py create mode 100644 examples/loopback_demo.py create mode 100644 grc/apollo_sco_mod.block.yml create mode 100644 src/apollo/sco_mod.py create mode 100644 tests/test_sco_mod.py diff --git a/examples/agc_loopback_demo.py b/examples/agc_loopback_demo.py new file mode 100644 index 0000000..907d015 --- /dev/null +++ b/examples/agc_loopback_demo.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +""" +Apollo AGC Integration Demo -- full communications loop with Virtual AGC. + +Demonstrates the complete Apollo unified S-band communications path: + + yaAGC (emulator) + | DNTM1/DNTM2 telemetry via TCP + v + agc_bridge + | PDU message + v + downlink_decoder --> print decoded telemetry + ^ + | PDU frames + usb_downlink_receiver (RX chain) + ^ + | complex baseband + usb_signal_source (TX chain) + +And the uplink path: + + DSKY commands + | + uplink_encoder + | (channel, value) pairs + v + agc_bridge --> yaAGC (INLINK channel 045) + +Prerequisites: + 1. Install Virtual AGC: https://www.ibiblio.org/apollo/ + 2. Start yaAGC with a mission (e.g., Luminary099 for Apollo 11 LM): + $ yaAGC --core=Luminary099.bin --port=19697 + 3. Optionally start yaDSKY2 for visual display: + $ yaDSKY2 --port=19698 + +Usage: + uv run python examples/agc_loopback_demo.py + uv run python examples/agc_loopback_demo.py --host 192.168.1.100 + uv run python examples/agc_loopback_demo.py --port 19697 + uv run python examples/agc_loopback_demo.py --send-v16n36 # request time display +""" + +import argparse +import sys +import time + +from apollo.agc_bridge import AGCBridgeClient +from apollo.constants import ( + AGC_CH_DNTM1, + AGC_CH_DNTM2, + AGC_CH_OUTLINK, + AGC_PORT_BASE, +) +from apollo.downlink_decoder import DownlinkEngine +from apollo.uplink_encoder import UplinkEncoder + + +def main(): + parser = argparse.ArgumentParser( + description="Apollo AGC integration demo -- connect to yaAGC emulator" + ) + parser.add_argument("--host", default="localhost", help="yaAGC host (default: localhost)") + parser.add_argument("--port", type=int, default=AGC_PORT_BASE, + help="yaAGC port (default: 19697)") + parser.add_argument("--duration", type=float, default=10.0, help="Run duration in seconds") + parser.add_argument("--send-v16n36", action="store_true", + help="Send V16N36E (display time) to AGC") + args = parser.parse_args() + + print("=" * 60) + print("Apollo AGC Integration Demo") + print("=" * 60) + print(f" Target: {args.host}:{args.port}") + print(f" Duration: {args.duration} seconds") + print() + + # Downlink decoder accumulates telemetry words + decoder = DownlinkEngine() + packet_count = 0 + telemetry_words = 0 + + def on_packet(channel: int, value: int): + nonlocal packet_count, telemetry_words + packet_count += 1 + + if channel in (AGC_CH_DNTM1, AGC_CH_DNTM2): + telemetry_words += 1 + decoder.feed_agc_word(channel, value) + elif channel == AGC_CH_OUTLINK: + print(f" OUTLINK: ch={channel:03o} val={value:05o} ({value})") + + def on_status(state: str): + print(f" Connection: {state}") + + # Connect to yaAGC + client = AGCBridgeClient( + host=args.host, + port=args.port, + channel_filter=None, # accept all channels for this demo + on_packet=on_packet, + on_status=on_status, + ) + + print(f"Connecting to yaAGC at {args.host}:{args.port}...") + client.start() + + # Wait for connection + for _ in range(20): # 10 seconds max + if client.connected: + break + time.sleep(0.5) + + if not client.connected: + print() + print("Could not connect to yaAGC.") + print() + print("Make sure yaAGC is running:") + print(f" yaAGC --core=Luminary099.bin --port={args.port}") + print() + print("Or try a different host/port:") + print(" python examples/agc_loopback_demo.py --host --port ") + client.stop() + sys.exit(1) + + print() + + # Optionally send a DSKY command + if args.send_v16n36: + print("Sending V16N36E (display time)...") + encoder = UplinkEncoder() + pairs = encoder.encode_verb_noun(verb=16, noun=36) + for channel, value in pairs: + client.send(channel, value) + time.sleep(0.1) # pace for UPRUPT processing + print(f" Sent {len(pairs)} uplink words") + print() + + # Collect telemetry for the specified duration + print(f"Collecting telemetry for {args.duration} seconds...") + print("-" * 60) + + start_time = time.time() + last_snapshot_count = 0 + + try: + while time.time() - start_time < args.duration: + time.sleep(0.5) + + # Check for new telemetry snapshots + snapshots = decoder._completed_snapshots + if len(snapshots) > last_snapshot_count: + for snap in snapshots[last_snapshot_count:]: + list_type = snap.get("list_type_id", "?") + list_name = snap.get("list_name", "Unknown") + n_words = snap.get("word_count", 0) + print(f" Telemetry snapshot: {list_name} " + f"(type {list_type}), {n_words} words") + + # Show first few words + words = snap.get("words", []) + for i, val in enumerate(words[:5]): + print(f" [{i:03d}] = {val:05o} ({val})") + if len(words) > 5: + print(f" ... ({len(words) - 5} more words)") + last_snapshot_count = len(snapshots) + + except KeyboardInterrupt: + print() + print("Interrupted.") + + print("-" * 60) + print() + print("Summary:") + print(f" Total packets received: {packet_count}") + print(f" Telemetry words: {telemetry_words}") + print(f" Telemetry snapshots: {len(decoder._completed_snapshots)}") + print(f" Duration: {time.time() - start_time:.1f} seconds") + + client.stop() + print() + print("Done.") + + +if __name__ == "__main__": + main() diff --git a/examples/loopback_demo.py b/examples/loopback_demo.py new file mode 100644 index 0000000..e33c342 --- /dev/null +++ b/examples/loopback_demo.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +""" +Apollo USB Loopback Demo -- streaming TX -> RX round-trip. + +Demonstrates the full gr-apollo block chain using GNU Radio streaming blocks: + + TX: pcm_frame_source -> nrz_encoder -> bpsk_subcarrier_mod -> pm_mod + RX: pm_demod -> subcarrier_extract -> bpsk_demod -> pcm_frame_sync -> pcm_demux + +All wrapped in the convenience blocks: + usb_signal_source -> usb_downlink_receiver + +Prints decoded frames as they arrive, including sync word analysis. + +Usage: + uv run python examples/loopback_demo.py + uv run python examples/loopback_demo.py --voice # include voice subcarrier + uv run python examples/loopback_demo.py --snr 20 # add noise at 20 dB SNR + uv run python examples/loopback_demo.py --frames 20 # generate 20 frames +""" + +import argparse +import sys + +import pmt +from gnuradio import blocks, gr + +from apollo.constants import ( + PCM_HIGH_BIT_RATE, + PCM_HIGH_WORDS_PER_FRAME, + PCM_WORD_LENGTH, + SAMPLE_RATE_BASEBAND, +) +from apollo.pcm_demux import DemuxEngine +from apollo.usb_downlink_receiver import usb_downlink_receiver +from apollo.usb_signal_source import usb_signal_source + + +def main(): + parser = argparse.ArgumentParser(description="Apollo USB loopback demo") + parser.add_argument("--frames", type=int, default=10, help="Number of frames to generate") + parser.add_argument("--snr", type=float, default=None, help="SNR in dB (None = no noise)") + parser.add_argument("--voice", action="store_true", help="Enable voice subcarrier") + args = parser.parse_args() + + bits_per_frame = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH + samples_per_frame = int(bits_per_frame * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE) + n_samples = args.frames * samples_per_frame + + print("=" * 60) + print("Apollo USB Loopback Demo") + print("=" * 60) + print(f" Frames to transmit: {args.frames}") + print(f" Samples per frame: {samples_per_frame:,}") + print(f" Total samples: {n_samples:,}") + print(f" Duration: {n_samples / SAMPLE_RATE_BASEBAND:.3f} s") + print(f" SNR: {'clean (no noise)' if args.snr is None else f'{args.snr} dB'}") + print(f" Voice subcarrier: {'enabled' if args.voice else 'disabled'}") + print() + + # Build the flowgraph + print("Building flowgraph...") + tb = gr.top_block() + + tx = usb_signal_source( + sample_rate=SAMPLE_RATE_BASEBAND, + bit_rate=PCM_HIGH_BIT_RATE, + snr_db=args.snr, + voice_enabled=args.voice, + ) + head = blocks.head(gr.sizeof_gr_complex, n_samples) + rx = usb_downlink_receiver( + sample_rate=SAMPLE_RATE_BASEBAND, + bit_rate=PCM_HIGH_BIT_RATE, + output_format="raw", + ) + snk = blocks.message_debug() + + tb.connect(tx, head, rx) + tb.msg_connect(rx, "frames", snk, "store") + + print("Running flowgraph (TX -> RX)...") + print() + tb.run() + + n_recovered = snk.num_messages() + print(f"Recovered {n_recovered} frames from {args.frames} transmitted") + print() + + if n_recovered == 0: + print("No frames recovered. PLL may need more settling time.") + print("Try increasing --frames to give the receiver more data.") + sys.exit(1) + + # Decode and display each recovered frame + demux = DemuxEngine(output_format="raw") + + print("-" * 60) + for i in range(n_recovered): + msg = snk.get_message(i) + payload = pmt.cdr(msg) if pmt.is_pair(msg) else msg + + frame_bytes = bytes(pmt.u8vector_elements(payload)) + result = demux.process_frame(frame_bytes) + + sync = result.get("sync", {}) + frame_id = sync.get("frame_id", 0) + parity = "odd" if (frame_id % 2 == 1) else "even" + + words = result.get("words", []) + n_words = len(words) + + # Show first few data words as hex + word_preview = " ".join( + f"{w['raw_value']:02X}" for w in words[:8] + ) + + print( + f" Frame {i + 1:3d}: " + f"ID={frame_id:>2} ({parity:4s}), " + f"sync=0x{sync.get('word', 0):08X}, " + f"{n_words} words " + f"[{word_preview} ...]" + ) + + print("-" * 60) + print() + print(f"Recovery rate: {n_recovered}/{args.frames} " + f"({100 * n_recovered / args.frames:.0f}%)") + + +if __name__ == "__main__": + main() diff --git a/grc/apollo_fm_voice_subcarrier_mod.block.yml b/grc/apollo_fm_voice_subcarrier_mod.block.yml index 7da803a..cfc3682 100644 --- a/grc/apollo_fm_voice_subcarrier_mod.block.yml +++ b/grc/apollo_fm_voice_subcarrier_mod.block.yml @@ -20,6 +20,19 @@ parameters: label: Test Tone Frequency (Hz) dtype: real default: '1000' +- id: audio_input + label: Audio Source + dtype: bool + default: 'False' + options: ['True', 'False'] + option_labels: ['External Input', 'Internal Test Tone'] + +inputs: +- label: audio + domain: stream + dtype: float + optional: true + hide: ${ 'all' if not audio_input else 'none' } outputs: - label: out @@ -33,17 +46,21 @@ templates: sample_rate=${sample_rate}, subcarrier_freq=${subcarrier_freq}, fm_deviation=${fm_deviation}, - tone_freq=${tone_freq}) + tone_freq=${tone_freq}, + audio_input=${audio_input}) documentation: |- Apollo FM Voice Subcarrier Modulator - Source block that generates a 1.25 MHz FM subcarrier with a sinusoidal test - tone. Transmit-side counterpart to the voice subcarrier demodulator. + Generates a 1.25 MHz FM subcarrier for the voice channel. Can operate in + two modes: - The signal chain is: test tone -> FM modulator -> upconvert to 1.25 MHz - -> real-valued output. With default parameters this produces a signal - matching the Apollo USB downlink voice subcarrier (+/-29 kHz deviation). + Internal mode (default): Uses a built-in sine test tone as the audio source. + This is useful for testing and signal validation. + + External mode (audio_input=True): Accepts an external float audio stream as + input. Use this to modulate actual Apollo mission voice recordings or live + audio onto the 1.25 MHz FM subcarrier with +/-29 kHz deviation. On the real spacecraft, voice audio (300-3000 Hz) drives an FM VCO at 113 kHz, which is mixed with the 512 kHz master clock and doubled to @@ -53,6 +70,7 @@ documentation: |- sample_rate: Output sample rate in Hz (default 5.12 MHz) subcarrier_freq: FM subcarrier center frequency in Hz (default 1.25 MHz) fm_deviation: FM deviation in Hz (default +/-29 kHz) - tone_freq: Internal test tone frequency in Hz (default 1 kHz) + tone_freq: Internal test tone frequency in Hz (default 1 kHz, ignored when audio_input=True) + audio_input: When True, accept external audio via float input port file_format: 1 diff --git a/grc/apollo_sco_mod.block.yml b/grc/apollo_sco_mod.block.yml new file mode 100644 index 0000000..4765d43 --- /dev/null +++ b/grc/apollo_sco_mod.block.yml @@ -0,0 +1,58 @@ +id: apollo_sco_mod +label: Apollo SCO Mod +category: '[Apollo USB]' +flags: [python] + +parameters: +- id: sco_number + label: SCO Channel (1-9) + dtype: int + default: '1' +- id: sample_rate + label: Sample Rate (Hz) + dtype: real + default: '5120000' + +inputs: +- label: in + domain: stream + dtype: float + +outputs: +- label: out + domain: stream + dtype: float + +templates: + imports: from apollo.sco_mod import sco_mod + make: >- + apollo.sco_mod.sco_mod( + sco_number=${sco_number}, + sample_rate=${sample_rate}) + +documentation: |- + Apollo Subcarrier Oscillator (SCO) Modulator + + Generates FM subcarrier oscillator signals for analog telemetry. Takes a + 0-5V sensor voltage input and produces an FM subcarrier tone at the + selected channel frequency with +/-7.5% deviation. + + Transmit-side counterpart to the SCO Demodulator. + + SCO Channels: + 1: 14,500 Hz 4: 40,000 Hz 7: 95,000 Hz + 2: 22,000 Hz 5: 52,500 Hz 8: 125,000 Hz + 3: 30,000 Hz 6: 70,000 Hz 9: 165,000 Hz + + Voltage mapping (linear): + 0V -> center - 7.5% (low frequency) + 2.5V -> center (nominal) + 5V -> center + 7.5% (high frequency) + + Only used in FM downlink mode. + + Parameters: + sco_number: SCO channel number (1-9) + sample_rate: Sample rate in Hz (default 5.12 MHz) + +file_format: 1 diff --git a/src/apollo/__init__.py b/src/apollo/__init__.py index 52a088b..e274c27 100644 --- a/src/apollo/__init__.py +++ b/src/apollo/__init__.py @@ -40,6 +40,7 @@ try: from apollo.nrz_encoder import nrz_encoder as nrz_encoder from apollo.pcm_frame_source import pcm_frame_source as pcm_frame_source from apollo.pm_mod import pm_mod as pm_mod + from apollo.sco_mod import sco_mod as sco_mod except ImportError: pass # GNU Radio not available — transmit-side GR blocks won't be importable diff --git a/src/apollo/fm_voice_subcarrier_mod.py b/src/apollo/fm_voice_subcarrier_mod.py index bc5de35..fd163d4 100644 --- a/src/apollo/fm_voice_subcarrier_mod.py +++ b/src/apollo/fm_voice_subcarrier_mod.py @@ -1,16 +1,18 @@ """ -Apollo FM Voice Subcarrier Modulator -- generates 1.25 MHz FM test tone. +Apollo FM Voice Subcarrier Modulator -- 1.25 MHz FM with internal tone or external audio. -Transmit-side counterpart to voice_subcarrier_demod. Generates a sinusoidal -test tone, FM-modulates it onto a 1.25 MHz subcarrier, and outputs the -real-valued subcarrier signal. +Transmit-side counterpart to voice_subcarrier_demod. FM-modulates audio onto a +1.25 MHz subcarrier and outputs the real-valued subcarrier signal. + +Two modes of operation: + - Internal test tone (default): generates a sine wave for testing. + - External audio input: accepts a float stream (e.g., Apollo mission voice + recordings) and modulates it onto the subcarrier. On the real spacecraft, voice audio (300-3000 Hz) drives an FM VCO at 113 kHz, which is mixed with the 512 kHz master clock and doubled to produce the 1.25 MHz FM subcarrier with +/-29 kHz deviation. -For testing, an internal sine-wave test tone replaces live audio. - Reference: IMPLEMENTATION_SPEC.md section 4.2 """ @@ -26,10 +28,17 @@ from apollo.constants import ( class fm_voice_subcarrier_mod(gr.hier_block2): - """Source block: FM-modulated 1.25 MHz voice subcarrier with test tone. + """FM-modulated voice subcarrier (1.25 MHz) with internal test tone or external audio. Outputs: float -- real-valued FM subcarrier at subcarrier_freq + + Inputs (when audio_input=True): + float -- external audio signal (e.g., mission voice recordings) + + When audio_input=False (default), an internal sine test tone is used. + When audio_input=True, the block accepts an external float stream -- for + example, actual Apollo mission crew voice recordings. """ def __init__( @@ -38,23 +47,23 @@ class fm_voice_subcarrier_mod(gr.hier_block2): subcarrier_freq: float = VOICE_SUBCARRIER_HZ, fm_deviation: float = VOICE_FM_DEVIATION_HZ, tone_freq: float = 1000.0, + audio_input: bool = False, ): + # Choose input signature based on mode + in_sig = gr.io_signature(1, 1, gr.sizeof_float) if audio_input else gr.io_signature(0, 0, 0) + gr.hier_block2.__init__( self, "apollo_fm_voice_subcarrier_mod", - gr.io_signature(0, 0, 0), # source -- no input - gr.io_signature(1, 1, gr.sizeof_float), # float output + in_sig, + gr.io_signature(1, 1, gr.sizeof_float), ) self._sample_rate = sample_rate self._tone_freq = tone_freq self._subcarrier_freq = subcarrier_freq self._fm_deviation = fm_deviation - - # Audio test tone generator - self.tone = analog.sig_source_f( - sample_rate, analog.GR_SIN_WAVE, tone_freq, 1.0, 0, - ) + self._audio_input = audio_input # FM modulator: sensitivity = 2*pi*deviation/sample_rate rad/sample # With unit-amplitude sine input this gives +/-fm_deviation Hz. @@ -72,8 +81,16 @@ class fm_voice_subcarrier_mod(gr.hier_block2): # Extract real part for float output self.to_real = blocks.complex_to_real(1) - # Connect: tone -> FM mod -> mixer(x LO) -> real - self.connect(self.tone, self.fm_mod, (self.mixer, 0)) + if audio_input: + # External audio input -> FM mod + self.connect(self, self.fm_mod, (self.mixer, 0)) + else: + # Internal test tone -> FM mod + self.tone = analog.sig_source_f( + sample_rate, analog.GR_SIN_WAVE, tone_freq, 1.0, 0, + ) + self.connect(self.tone, self.fm_mod, (self.mixer, 0)) + self.connect(self.lo, (self.mixer, 1)) self.connect(self.mixer, self.to_real, self) @@ -91,3 +108,8 @@ class fm_voice_subcarrier_mod(gr.hier_block2): def fm_deviation(self) -> float: """FM deviation in Hz.""" return self._fm_deviation + + @property + def audio_input(self) -> bool: + """Whether the block accepts external audio input.""" + return self._audio_input diff --git a/src/apollo/sco_mod.py b/src/apollo/sco_mod.py new file mode 100644 index 0000000..c4291a8 --- /dev/null +++ b/src/apollo/sco_mod.py @@ -0,0 +1,124 @@ +""" +Apollo Subcarrier Oscillator (SCO) Modulator — FM analog telemetry. + +In FM downlink mode, the Pre-Modulation Processor generates 9 subcarrier +oscillators (SCOs) that encode analog sensor voltages (0-5V DC) as frequency +deviations of +/-7.5% around each channel's center frequency. + +This is the transmit-side counterpart to sco_demod.py. It takes a 0-5V sensor +voltage input and produces an FM subcarrier tone at the configured SCO channel +frequency. + +Transmitter side (this block): + 0-5V input -> subtract 2.5V -> scale to +/-1.0 + -> FM modulator -> upconvert to center_freq + -> extract real part -> float output + +The mapping is linear: + 0V input -> center_freq - 7.5% = low frequency + 2.5V input -> center_freq (nominal) + 5V input -> center_freq + 7.5% = high frequency + +Reference: IMPLEMENTATION_SPEC.md section 4.3 +""" + +import math + +from gnuradio import analog, blocks, gr + +from apollo.constants import ( + SAMPLE_RATE_BASEBAND, + SCO_DEVIATION_PERCENT, + SCO_FREQUENCIES, + SCO_INPUT_RANGE_V, +) + + +class sco_mod(gr.hier_block2): + """Modulate a 0-5V sensor voltage onto an FM subcarrier oscillator tone. + + Only used in FM downlink mode. + + Inputs: + float -- sensor voltage (0.0 to 5.0 V) + + Outputs: + float -- FM subcarrier tone at the selected SCO channel frequency + """ + + def __init__( + self, + sco_number: int = 1, + sample_rate: float = SAMPLE_RATE_BASEBAND, + ): + gr.hier_block2.__init__( + self, + "apollo_sco_mod", + gr.io_signature(1, 1, gr.sizeof_float), + gr.io_signature(1, 1, gr.sizeof_float), + ) + + if sco_number not in SCO_FREQUENCIES: + raise ValueError( + f"SCO number must be 1-9, got {sco_number}. " + f"Valid channels: {sorted(SCO_FREQUENCIES.keys())}" + ) + + self._sco_number = sco_number + self._sample_rate = sample_rate + + center_freq = SCO_FREQUENCIES[sco_number] + self._center_freq = center_freq + + # Frequency deviation in Hz: +/-7.5% of center + deviation_hz = center_freq * (SCO_DEVIATION_PERCENT / 100.0) + self._deviation_hz = deviation_hz + + # Voltage range parameters + v_min, v_max = SCO_INPUT_RANGE_V + v_range = v_max - v_min # 5.0 + v_mid = (v_max + v_min) / 2.0 # 2.5 + + # Stage 1: Offset — subtract midpoint so 2.5V becomes 0 + self.offset = blocks.add_const_ff(-v_mid) + + # Stage 2: Scale — map +/-2.5V to +/-1.0 + self.scale = blocks.multiply_const_ff(1.0 / (v_range / 2.0)) + + # Stage 3: FM modulator — +/-1.0 input produces +/-deviation_hz + fm_sensitivity = 2.0 * math.pi * deviation_hz / sample_rate + self.fm_mod = analog.frequency_modulator_fc(fm_sensitivity) + + # Stage 4: Local oscillator at center_freq for upconversion + self.lo = analog.sig_source_c( + sample_rate, analog.GR_COS_WAVE, center_freq, 1.0, 0, + ) + + # Stage 5: Mixer — shift baseband FM signal up to center_freq + self.mixer = blocks.multiply_cc(1) + + # Stage 6: Extract real part for float output + self.to_real = blocks.complex_to_real(1) + + # Connect the chain: + # input -> offset -> scale -> fm_mod -> (mixer, 0) + # lo -> (mixer, 1) + # mixer -> to_real -> output + self.connect(self, self.offset, self.scale, self.fm_mod, (self.mixer, 0)) + self.connect(self.lo, (self.mixer, 1)) + self.connect(self.mixer, self.to_real, self) + + @property + def center_freq(self) -> float: + """Center frequency of this SCO channel in Hz.""" + return self._center_freq + + @property + def deviation_hz(self) -> float: + """FM deviation in Hz (+/- from center).""" + return self._deviation_hz + + @property + def sco_number(self) -> int: + """SCO channel number (1-9).""" + return self._sco_number diff --git a/tests/test_fm_voice_subcarrier_mod.py b/tests/test_fm_voice_subcarrier_mod.py index ce0b71d..0b26e17 100644 --- a/tests/test_fm_voice_subcarrier_mod.py +++ b/tests/test_fm_voice_subcarrier_mod.py @@ -10,10 +10,7 @@ try: except ImportError: HAS_GNURADIO = False -from apollo.constants import ( - SAMPLE_RATE_BASEBAND, - VOICE_SUBCARRIER_HZ, -) +from apollo.constants import SAMPLE_RATE_BASEBAND pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed") @@ -147,3 +144,104 @@ class TestFmVoiceModFunctional: # FM on a cosine carrier: peak should be around 1.0, certainly < 2.0 assert peak < 2.0, f"Output peak amplitude {peak:.3f} exceeds expected bound" assert peak > 0.1, f"Output peak amplitude {peak:.3f} is suspiciously low" + + +class TestFmVoiceModExternalAudio: + """Tests for external audio input mode.""" + + def test_default_is_source(self): + """Default mode should be source (no input, backward compatible).""" + from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod + + mod = fm_voice_subcarrier_mod() + assert not mod.audio_input + + def test_external_audio_instantiation(self): + """Block with audio_input=True should instantiate.""" + from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod + + mod = fm_voice_subcarrier_mod(audio_input=True) + assert mod is not None + + def test_external_audio_property(self): + """audio_input property should reflect constructor arg.""" + from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod + + mod_ext = fm_voice_subcarrier_mod(audio_input=True) + assert mod_ext.audio_input is True + + mod_int = fm_voice_subcarrier_mod(audio_input=False) + assert mod_int.audio_input is False + + def test_external_audio_produces_output(self): + """Feed a 1 kHz sine wave into external input, verify output.""" + from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod + + sample_rate = SAMPLE_RATE_BASEBAND + n_samples = 51200 + t = np.arange(n_samples, dtype=np.float32) / sample_rate + audio = np.sin(2 * np.pi * 1000 * t).astype(np.float32) + + tb = gr.top_block() + src = blocks.vector_source_f(audio.tolist()) + mod = fm_voice_subcarrier_mod(sample_rate=sample_rate, audio_input=True) + snk = blocks.vector_sink_f() + tb.connect(src, mod, snk) + tb.run() + + data = np.array(snk.data()) + assert len(data) == n_samples, f"Expected {n_samples} samples, got {len(data)}" + assert np.any(data != 0), "Output is all zeros with external audio input" + + def test_external_audio_spectral_energy(self): + """Feed audio, verify spectral energy near 1.25 MHz subcarrier.""" + from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod + + sample_rate = SAMPLE_RATE_BASEBAND + n_samples = 51200 + t = np.arange(n_samples, dtype=np.float32) / sample_rate + audio = np.sin(2 * np.pi * 1000 * t).astype(np.float32) + + tb = gr.top_block() + src = blocks.vector_source_f(audio.tolist()) + mod = fm_voice_subcarrier_mod(sample_rate=sample_rate, audio_input=True) + snk = blocks.vector_sink_f() + tb.connect(src, mod, snk) + tb.run() + + data = np.array(snk.data()) + fft_vals = np.fft.fft(data) + freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate) + + # Energy in the 1.2-1.3 MHz band (subcarrier +/- deviation margin) + voice_mask = (np.abs(freqs) > 1_200_000) & (np.abs(freqs) < 1_300_000) + voice_power = np.mean(np.abs(fft_vals[voice_mask]) ** 2) + total_power = np.mean(np.abs(fft_vals) ** 2) + + assert voice_power > total_power * 0.1, ( + f"Subcarrier band power ({voice_power:.1f}) is less than 10% of " + f"total power ({total_power:.1f})" + ) + + def test_external_audio_silence(self): + """Feed zeros (silence), verify output still present (carrier only).""" + from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod + + sample_rate = SAMPLE_RATE_BASEBAND + n_samples = 51200 + silence = np.zeros(n_samples, dtype=np.float32) + + tb = gr.top_block() + src = blocks.vector_source_f(silence.tolist()) + mod = fm_voice_subcarrier_mod(sample_rate=sample_rate, audio_input=True) + snk = blocks.vector_sink_f() + tb.connect(src, mod, snk) + tb.run() + + data = np.array(snk.data()) + assert len(data) == n_samples, f"Expected {n_samples} samples, got {len(data)}" + # With silence input, FM deviation is zero so the output is an + # unmodulated carrier at the subcarrier frequency -- still non-zero. + assert np.any(data != 0), "Output is all zeros with silence input" + peak = np.max(np.abs(data)) + assert peak > 0.1, f"Carrier amplitude {peak:.3f} is suspiciously low" diff --git a/tests/test_sco_mod.py b/tests/test_sco_mod.py new file mode 100644 index 0000000..15875ac --- /dev/null +++ b/tests/test_sco_mod.py @@ -0,0 +1,178 @@ +"""Tests for the SCO (Subcarrier Oscillator) modulator block.""" + +import numpy as np +import pytest + +try: + from gnuradio import blocks, gr + + HAS_GNURADIO = True +except ImportError: + HAS_GNURADIO = False + +from apollo.constants import ( + SAMPLE_RATE_BASEBAND, + SCO_FREQUENCIES, +) + +pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed") + + +class TestSCOModInstantiation: + """Test block creation and parameter validation.""" + + def test_all_channels(self): + """Should instantiate for each valid SCO channel (1-9).""" + from apollo.sco_mod import sco_mod + + for ch in range(1, 10): + mod = sco_mod(sco_number=ch) + assert mod is not None + assert mod.center_freq == SCO_FREQUENCIES[ch] + + def test_invalid_channel_zero(self): + """Channel 0 should raise ValueError.""" + from apollo.sco_mod import sco_mod + + with pytest.raises(ValueError, match="SCO number must be 1-9"): + sco_mod(sco_number=0) + + def test_invalid_channel_ten(self): + """Channel 10 should raise ValueError.""" + from apollo.sco_mod import sco_mod + + with pytest.raises(ValueError, match="SCO number must be 1-9"): + sco_mod(sco_number=10) + + def test_deviation_property(self): + """Deviation should be 7.5% of center frequency.""" + from apollo.sco_mod import sco_mod + + for ch in range(1, 10): + mod = sco_mod(sco_number=ch) + expected = SCO_FREQUENCIES[ch] * 0.075 + assert abs(mod.deviation_hz - expected) < 0.01 + + def test_custom_sample_rate(self): + """Should accept a custom sample rate.""" + from apollo.sco_mod import sco_mod + + mod = sco_mod(sco_number=1, sample_rate=10_240_000) + assert mod is not None + + +class TestSCOModFunctional: + """Functional tests with constant-voltage inputs.""" + + def _get_output(self, sco_number, voltage, n_samples=None, + sample_rate=SAMPLE_RATE_BASEBAND): + """Feed constant voltage through sco_mod and return output samples.""" + from apollo.sco_mod import sco_mod + + if n_samples is None: + n_samples = int(sample_rate * 0.1) # 100ms + + tb = gr.top_block() + src = blocks.vector_source_f([voltage] * n_samples) + mod = sco_mod(sco_number=sco_number, sample_rate=sample_rate) + snk = blocks.vector_sink_f() + tb.connect(src, mod, snk) + tb.run() + return np.array(snk.data()) + + def test_midscale_produces_center_freq(self): + """Feed 2.5V DC, verify spectral peak near center frequency.""" + sco_ch = 5 # 52,500 Hz + sample_rate = SAMPLE_RATE_BASEBAND + output = self._get_output(sco_ch, voltage=2.5, sample_rate=sample_rate) + + assert len(output) > 0, "Modulator produced no output" + + # Find dominant frequency via FFT + spectrum = np.abs(np.fft.rfft(output)) + freqs = np.fft.rfftfreq(len(output), d=1.0 / sample_rate) + peak_idx = np.argmax(spectrum[1:]) + 1 # skip DC + peak_freq = freqs[peak_idx] + + expected = SCO_FREQUENCIES[sco_ch] + tolerance = expected * 0.02 # 2% tolerance + assert abs(peak_freq - expected) < tolerance, ( + f"SCO ch{sco_ch} at 2.5V: peak at {peak_freq:.0f} Hz, " + f"expected {expected} Hz +/- {tolerance:.0f} Hz" + ) + + def test_produces_output(self): + """Feed 2.5V, verify non-zero output.""" + output = self._get_output(sco_number=5, voltage=2.5) + assert len(output) > 0, "Modulator produced no output" + assert np.any(output != 0.0), "Output is all zeros" + + def test_output_bounded(self): + """Peak amplitude should be reasonable (< 2.0, > 0.1).""" + output = self._get_output(sco_number=5, voltage=2.5) + peak = np.max(np.abs(output)) + assert peak > 0.1, f"Output too small: peak amplitude {peak:.4f}" + assert peak < 2.0, f"Output too large: peak amplitude {peak:.4f}" + + def test_all_channels_produce_output(self): + """All 9 channels should produce non-zero output with 2.5V input.""" + for ch in range(1, 10): + output = self._get_output(sco_number=ch, voltage=2.5) + assert len(output) > 0, f"SCO ch{ch} produced no output" + assert np.any(output != 0.0), f"SCO ch{ch} output is all zeros" + + +class TestSCOModDemodRoundtrip: + """Round-trip tests: sco_mod -> sco_demod should recover the input voltage.""" + + def _roundtrip(self, sco_number, voltage, n_samples=None, + sample_rate=SAMPLE_RATE_BASEBAND): + """Feed voltage through sco_mod -> sco_demod, return demod output.""" + from apollo.sco_demod import sco_demod + from apollo.sco_mod import sco_mod + + if n_samples is None: + n_samples = int(sample_rate * 0.2) # 200ms for settling + + tb = gr.top_block() + src = blocks.vector_source_f([voltage] * n_samples) + mod = sco_mod(sco_number=sco_number, sample_rate=sample_rate) + demod = sco_demod(sco_number=sco_number, sample_rate=sample_rate) + snk = blocks.vector_sink_f() + + tb.connect(src, mod, demod, snk) + tb.run() + return np.array(snk.data()) + + def test_roundtrip_midscale(self): + """sco_mod(2.5V) -> sco_demod should recover ~2.5V.""" + sco_ch = 5 # 52,500 Hz + output = self._roundtrip(sco_ch, voltage=2.5) + + assert len(output) > 0, "Round-trip produced no output" + + # Skip first 50% for filter settling + settled = output[len(output) // 2:] + if len(settled) > 10: + mean_v = np.mean(settled) + assert 1.5 < mean_v < 3.5, ( + f"SCO ch{sco_ch} round-trip at 2.5V: mean output {mean_v:.2f}V, " + f"expected near 2.5V" + ) + + def test_roundtrip_monotonic(self): + """Feed 0V, 2.5V, 5V through mod->demod; output should be monotonic.""" + sco_ch = 6 # 70,000 Hz + voltages = [0.0, 2.5, 5.0] + means = [] + + for v_in in voltages: + output = self._roundtrip(sco_ch, voltage=v_in) + settled = output[len(output) // 2:] + mean_v = np.mean(settled) if len(settled) > 10 else float("nan") + means.append(mean_v) + + assert means[0] < means[1] < means[2], ( + f"Non-monotonic round-trip: " + f"V_in={voltages}, V_out={[f'{v:.2f}' for v in means]}" + )