gr-apollo/examples/voice_subcarrier_demo.py
Ryan Malloy 8728d36a90 Add voice subcarrier demo with Apollo 11 crew audio
FM-modulates real Apollo 11 onboard audio onto the 1.25 MHz voice
subcarrier (+/-29 kHz deviation) and demodulates it back, achieving
94.1% correlation with the original. Audio source: NASA/Internet
Archive public domain (Collins bidding farewell to Eagle crew).
2026-02-22 18:00:45 -07:00

161 lines
5.4 KiB
Python

#!/usr/bin/env python3
"""
Apollo Voice Subcarrier Demo -- modulate real audio onto 1.25 MHz FM subcarrier.
Takes an audio file (e.g., actual Apollo mission crew recordings), modulates
it onto the 1.25 MHz FM voice subcarrier with +/-29 kHz deviation, then
demodulates it back to audio. This is exactly what the spacecraft's
Pre-Modulation Processor and the ground station receiver did.
Signal path:
audio file (8 kHz)
-> upsample to 5.12 MHz
-> fm_voice_subcarrier_mod (audio_input=True)
[audio -> FM mod -> upconvert to 1.25 MHz -> float subcarrier]
-> voice_subcarrier_demod
[BPF 1.25 MHz -> FM discriminator -> BPF 300-3000 Hz -> decimate to 8 kHz]
-> recovered audio (8 kHz WAV)
Usage:
uv run python examples/voice_subcarrier_demo.py examples/audio/apollo11_crew.wav
uv run python examples/voice_subcarrier_demo.py input.wav --output recovered.wav
uv run python examples/voice_subcarrier_demo.py input.wav --play
"""
import argparse
import time
from math import gcd
import numpy as np
from gnuradio import blocks, gr
from scipy.io import wavfile
from scipy.signal import resample_poly
from apollo.constants import SAMPLE_RATE_BASEBAND
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
from apollo.voice_subcarrier_demod import voice_subcarrier_demod
def main():
parser = argparse.ArgumentParser(
description="Modulate audio onto Apollo 1.25 MHz FM voice subcarrier"
)
parser.add_argument("input", help="Input WAV file (any sample rate)")
parser.add_argument("--output", "-o", default=None,
help="Output WAV file (default: <input>_recovered.wav)")
parser.add_argument("--play", action="store_true",
help="Play recovered audio with aplay")
parser.add_argument("--sample-rate", type=float, default=SAMPLE_RATE_BASEBAND,
help="Baseband sample rate (default: 5.12 MHz)")
args = parser.parse_args()
if args.output is None:
stem = args.input.rsplit(".", 1)[0]
args.output = f"{stem}_recovered.wav"
# Load input audio
print("=" * 60)
print("Apollo Voice Subcarrier Demo")
print("=" * 60)
print()
input_rate, audio_data = wavfile.read(args.input)
if audio_data.ndim > 1:
audio_data = audio_data[:, 0] # mono
# Normalize to [-1, 1] float
if audio_data.dtype == np.int16:
audio_float = audio_data.astype(np.float32) / 32768.0
elif audio_data.dtype == np.int32:
audio_float = audio_data.astype(np.float32) / 2147483648.0
else:
audio_float = audio_data.astype(np.float32)
duration = len(audio_float) / input_rate
print(f" Input: {args.input}")
print(f" Sample rate: {input_rate} Hz")
print(f" Duration: {duration:.2f} s")
print(f" Samples: {len(audio_float):,}")
print()
# Upsample to baseband rate
sample_rate = int(args.sample_rate)
audio_rate = 8000 # output rate from voice demod
# Resample input to 8 kHz first (the standard Apollo voice bandwidth)
if input_rate != audio_rate:
g = gcd(audio_rate, input_rate)
audio_float = resample_poly(audio_float, audio_rate // g, input_rate // g)
audio_float = audio_float.astype(np.float32)
print(f" Resampled to {audio_rate} Hz: {len(audio_float):,} samples")
# Upsample from 8 kHz to baseband (5.12 MHz)
# Factor: 5120000 / 8000 = 640
g = gcd(sample_rate, audio_rate)
up = sample_rate // g
down = audio_rate // g
print(f" Upsampling {audio_rate} Hz -> {sample_rate / 1e6:.2f} MHz "
f"(ratio {up}:{down})...")
t0 = time.time()
upsampled = resample_poly(audio_float, up, down)
upsampled = upsampled.astype(np.float32)
t_resample = time.time() - t0
print(f" Upsampled: {len(upsampled):,} samples ({t_resample:.1f}s)")
print()
# Build GNU Radio flowgraph: voice mod -> voice demod
print("Building flowgraph: FM mod (1.25 MHz) -> FM demod...")
tb = gr.top_block()
src = blocks.vector_source_f(upsampled.tolist())
voice_mod = fm_voice_subcarrier_mod(
sample_rate=sample_rate,
audio_input=True,
)
voice_demod = voice_subcarrier_demod(
sample_rate=sample_rate,
audio_rate=audio_rate,
)
snk = blocks.vector_sink_f()
tb.connect(src, voice_mod, voice_demod, snk)
print("Running flowgraph...")
t0 = time.time()
tb.run()
t_run = time.time() - t0
recovered = np.array(snk.data(), dtype=np.float32)
print(f" Processed in {t_run:.1f}s")
print(f" Recovered: {len(recovered):,} samples at {audio_rate} Hz")
print(f" Duration: {len(recovered) / audio_rate:.2f} s")
print()
# Normalize recovered audio for WAV output
peak = np.max(np.abs(recovered))
if peak > 0:
recovered = recovered / peak * 0.9 # normalize to 90% to avoid clipping
# Save as 16-bit WAV
recovered_int16 = (recovered * 32767).astype(np.int16)
wavfile.write(args.output, audio_rate, recovered_int16)
print(f" Saved: {args.output}")
print(f" Peak amplitude: {peak:.4f}")
print()
# Play if requested
if args.play:
import subprocess
print("Playing recovered audio...")
subprocess.run(["aplay", args.output], check=False)
else:
print(f"Play with: aplay {args.output}")
print()
print("Done.")
if __name__ == "__main__":
main()