Compare commits

...

7 Commits

Author SHA1 Message Date
cfc9ca03eb Integrate uplink chain and PRN ranging into shared files and docs
Consolidate locally-defined constants into constants.py (single source
of truth for all uplink modulation and ranging parameters). Update
__init__.py with new imports across all three tiers. Add signal
architecture, block reference, and demo guide documentation for the
uplink and ranging subsystems.
2026-02-24 14:32:45 -07:00
3dc8afdc08 Merge feature/prn-ranging: PRN code generator, modulator, demodulator 2026-02-24 14:23:34 -07:00
78e73ca38c Merge feature/uplink-chain: full DSKY command encode/decode over RF 2026-02-24 14:23:29 -07:00
c117f49fd2 Merge feature/audio-downloads: Apollo recording download + real signal demo 2026-02-24 14:23:26 -07:00
86a5b08e9d Add PRN ranging system: code generator, modulator, demodulator, and demo
Implements the Apollo composite PRN ranging code (5,456,682 chips) from
five component sequences (CL, X, A, B, C) combined via majority-vote
logic, matching Ken Shirriff's Teensy rangeGenerator.ino bit-for-bit.

LFSR taps corrected to produce maximal-length sequences:
  A: 5-bit, taps [2,0] (x^5+x^2+1, period 31)
  B: 6-bit, taps [1,0] (x^6+x+1, period 63)
  C: 7-bit, taps [1,0] (x^7+x+1, period 127)

New files:
  src/apollo/ranging.py          -- pure-Python code generator and correlator
  src/apollo/ranging_source.py   -- GR sync_block streaming PRN chips
  src/apollo/ranging_mod.py      -- GR hier_block2 NRZ chip modulator
  src/apollo/ranging_demod.py    -- GR basic_block FFT-based range correlator
  grc/apollo_ranging_*.block.yml -- GRC block definitions (3 files)
  examples/ranging_demo.py       -- standalone demo with delay simulation
2026-02-24 14:21:02 -07:00
0e77373ea4 Add uplink chain: DSKY command encoder to RF and back
Uplink word codec (uplink_word_codec.py):
- UplinkSerializerEngine: (channel, value) pairs to 15-bit NRZ bit stream
  with configurable inter-word gap for UPRUPT timing
- UplinkDeserializerEngine: two-phase state machine (acquisition + fixed
  framing) recovers words from NRZ bits, handles leading-zero data words
- GR wrappers: uplink_word_serializer (sync_block source) and
  uplink_word_deserializer (basic_block sink with message output)

TX source (usb_uplink_source.py):
- hier_block2 wiring: word_serializer -> nrz_encoder -> FM mod (4 kHz dev)
  -> 70 kHz upconvert -> complex_to_real -> PM mod (1.0 rad) -> [AWGN]
- Message input "words" forwards PDUs from uplink_encoder

RX receiver (usb_uplink_receiver.py):
- hier_block2 wiring: PM demod -> subcarrier_extract (70 kHz, 20 kHz BW)
  -> quadrature_demod -> matched filter -> decimate -> slicer -> deserializer
- Message output "commands" emits recovered (channel, value) PDUs

GRC block definitions for both source and receiver.

Loopback demo (uplink_loopback_demo.py):
- Encodes V16N36E, serializes with pure-Python engine, runs through GR RF
  chain (FM + PM + noise + demod), deserializes, compares TX vs RX words
2026-02-24 14:17:58 -07:00
77ddec149c Add audio download script and real signal demo
fetch_apollo_audio.py downloads Apollo 11 audio highlights from Archive.org
and extracts clips using ffmpeg (48 kHz mono WAV). Supports --list, --clip,
--all with idempotent downloads and progress reporting.

real_signal_demo.py auto-discovers downloaded clips and runs them through the
full USB downlink TX/RX chain (PCM telemetry + FM voice), saving recovered
audio for comparison. Falls back to the bundled demo clip if no downloads exist.

Also adds .gitignore to keep large audio files out of the repo while preserving
the small apollo11_crew.wav demo clip.
2026-02-24 14:15:23 -07:00
22 changed files with 3863 additions and 4 deletions

17
.gitignore vendored Normal file
View File

@ -0,0 +1,17 @@
# Python
__pycache__/
*.pyc
*.egg-info/
dist/
build/
# Audio files (large, downloaded on demand)
examples/audio/*.wav
examples/audio/*.flac
# Keep the existing small demo clip
!examples/audio/apollo11_crew.wav
# Environment
.env
*.env.local
node_modules/

View File

@ -1,6 +1,6 @@
---
title: "Signal Architecture"
description: "How the Apollo Unified S-Band downlink signal is structured, and how gr-apollo decomposes it into telemetry"
description: "How the Apollo Unified S-Band signal is structured -- downlink, uplink, and ranging -- and how gr-apollo decomposes it"
---
import { Aside, Tabs, TabItem } from '@astrojs/starlight/components';
@ -282,3 +282,158 @@ graph LR
```
The `fm_signal_source` and `fm_downlink_receiver` convenience blocks wire the full chain together, just as `usb_signal_source` and `usb_downlink_receiver` do for PM mode. The FM receiver uses streaming float outputs (one per SCO channel) rather than PDU messages, since SCO telemetry is continuous analog data.
## The uplink signal
The downlink carries telemetry from spacecraft to ground. The uplink does the reverse -- delivering commands from Mission Control to the spacecraft on 2106.40625 MHz.
The modulation scheme is the same family as the downlink (PM carrier with FM subcarriers), but the parameters are dramatically different. Where the downlink tiptoes along at 0.133 rad peak deviation, the uplink runs at 1.0 rad. This asymmetry is not a design inconsistency -- it reflects a fundamental physical reality. The ground station has megawatt-class transmitters and 26-meter dish antennas. The spacecraft has a 20-watt traveling-wave tube and a dish measured in inches. The ground can afford the power overhead of higher modulation index; the spacecraft cannot waste a single fraction of a watt.
The uplink carries two information streams:
- **Command data** -- DSKY commands encoded as 15-bit AGC words, transmitted at 2 kbps NRZ on a 70 kHz FM subcarrier with +/-4 kHz deviation
- **Voice** (optional) -- Crew-directed audio on a 30 kHz FM subcarrier with +/-7.5 kHz deviation
```mermaid
graph LR
subgraph "TX — Ground Station"
A["DSKY Commands<br/>15-bit AGC words"] --> B["Word Serializer<br/>15 bits → NRZ<br/>2 kbps"]
B --> C["NRZ Encoder<br/>0/1 → +1/-1"]
C --> D["FM Mod<br/>70 kHz<br/>±4 kHz dev"]
D --> E["Σ"]
V1["Voice Audio<br/>300-3000 Hz"] -->|"FM<br/>30 kHz<br/>±7.5 kHz dev"| E
E --> F["PM Mod<br/>1.0 rad peak"]
F --> G["2106.4 MHz<br/>RF Carrier"]
end
G -->|"240/221<br/>turnaround"| H
subgraph "RX — Spacecraft"
H["RF Input<br/>2106.4 MHz"] --> I["PM Demod<br/>Carrier PLL"]
I --> J["Extract<br/>70 kHz"]
J --> K["FM Demod"]
K --> L["Slicer<br/>2 kbps"]
L --> M["Word<br/>Deserializer"]
M --> N["AGC<br/>Commands"]
end
style A fill:#2d5016,stroke:#4a8c2a
style V1 fill:#2d5016,stroke:#4a8c2a
style G fill:#1a3a5c,stroke:#3a7abd
style H fill:#1a3a5c,stroke:#3a7abd
style F fill:#3a1a5c,stroke:#7a3abd
style I fill:#3a1a5c,stroke:#7a3abd
style N fill:#5c3a1a,stroke:#bd7a3a
```
<Tabs>
<TabItem label="TX Blocks">
The uplink transmit chain assembles commands into the modulated carrier:
1. **uplink_encoder** -- Accepts DSKY command PDUs and formats them as 15-bit AGC uplink words. The word format matches the Apollo AGC's INLINK channel (channel 45) protocol.
2. **uplink_word_serializer** -- Serializes 15-bit words into a continuous NRZ bit stream at 2 kbps. Each bit is held for the full bit period.
3. **nrz_encoder** -- Converts the 0/1 byte stream to +1.0/-1.0 floats, identical to the downlink NRZ encoder.
4. **FM modulator** -- Frequency-modulates the NRZ data onto a 70 kHz subcarrier with +/-4 kHz deviation.
5. **PM modulator** -- Phase-modulates the composite subcarrier signal onto the carrier at 1.0 rad peak deviation.
The `usb_uplink_source` hierarchical block wires this chain together.
</TabItem>
<TabItem label="RX Blocks">
The uplink receiver disassembles the signal in reverse order:
1. **PM demodulator** -- Same carrier PLL approach as the downlink, but configured for the wider 1.0 rad deviation. The larger modulation index means the small-angle approximation no longer holds (15.9% error at 1.0 rad), but FM demodulation of the subcarrier is tolerant of this distortion.
2. **70 kHz extraction** -- Bandpass filter centered on the command subcarrier.
3. **FM demodulator** -- Recovers the NRZ data stream from the 70 kHz subcarrier.
4. **Slicer and deserializer** -- Threshold detection at 2 kbps, then reassembly of 15-bit AGC words from the serial bit stream.
The `usb_uplink_receiver` hierarchical block wires this chain together.
</TabItem>
</Tabs>
<Aside type="note">
The 1.0 rad uplink deviation pushes well beyond the linear small-angle regime. At 1.0 rad, `sin(theta)` deviates from `theta` by nearly 16%. The designers accepted this because the uplink data rate is only 2 kbps (versus 51.2 kbps on the downlink) and FM demodulation is inherently robust against amplitude and phase nonlinearity. The uplink could afford to trade linearity for link margin.
</Aside>
## PRN ranging
Voice and telemetry tell the ground station what the spacecraft is doing. Ranging tells them where it is. The Apollo USB system measures Earth-to-spacecraft distance by timing how long a known signal takes to make the round trip.
The ranging signal is a composite pseudo-random noise (PRN) code, transmitted by the ground station and transponded (retransmitted) by the spacecraft. The ground station correlates the received code against its own reference copy, and the time offset between them is the two-way light-time delay.
```mermaid
graph LR
A["PRN Generator<br/>~994 kchip/s"] --> B["NRZ Encode<br/>0/1 → +1/-1"]
B --> C["PM Mod onto<br/>Uplink Carrier"]
C --> D["Spacecraft<br/>Transponder"]
D --> E["PM Mod onto<br/>Downlink Carrier"]
E --> F["Ground RX<br/>PRN Correlator"]
F --> G["Range<br/>Measurement"]
A --> H["Reference Copy<br/>(delayed)"]
H --> F
style A fill:#5c3a1a,stroke:#bd7a3a
style D fill:#1a3a5c,stroke:#3a7abd
style F fill:#3a1a5c,stroke:#7a3abd
style G fill:#2d5016,stroke:#4a8c2a
```
### The composite code
The PRN code is not a single sequence -- it is five component codes combined with Boolean logic:
| Code | Length (chips) | Type |
|------|----------------|------|
| CL | 2 | Clock (alternating 0,1) |
| X | 11 | Fixed pattern |
| A | 31 | Maximal-length LFSR |
| B | 63 | Maximal-length LFSR |
| C | 127 | Maximal-length LFSR |
The combination logic produces one output chip per clock:
```
output = (NOT(X) AND majority(A, B, C)) XOR CL
```
where `majority(A, B, C)` outputs 1 when two or more inputs are 1. The combined code length is the product of the component lengths: 2 x 11 x 31 x 63 x 127 = **5,456,682 chips**. At the chip rate of approximately 994 kchip/s, one full code period takes about 5.49 seconds.
The maximal-length LFSR sequences (A, B, C) give the composite code excellent autocorrelation properties -- a sharp peak when aligned, low sidelobes everywhere else. This is what makes unambiguous range measurement possible even at the signal-to-noise ratios encountered at lunar distance.
### Sequential acquisition
Correlating against the full 5.4-million-chip code in one pass would be computationally ruinous, even by modern standards. The Apollo system uses a sequential strategy that exploits the composite structure:
1. **Correlate CL first** (length 2) -- resolves range to within half the CL period
2. **Correlate X** (length 11) -- refines within the CL ambiguity window
3. **Correlate A** (length 31) -- further refinement
4. **Correlate B** (length 63) -- further refinement
5. **Correlate C** (length 127) -- final resolution
Each stage resolves the ambiguity left by the previous one. The total number of trial positions searched is 2 + 11 + 31 + 63 + 127 = 234, rather than 5,456,682. This makes acquisition practical in seconds rather than hours.
### Range resolution
The two-way range resolution is determined by the chip rate:
```
resolution = c / (2 x chip_rate) = 299,792,458 / (2 x 994,000) ~ 150.8 meters
```
This is the minimum distinguishable range difference. The actual measurement precision is much better than this, because the correlator interpolates between chips. NASA routinely achieved ranging precision on the order of 15 meters to the Moon -- about one-tenth of a chip.
<Aside type="tip">
Ken Shirriff's [detailed analysis of the Apollo ranging system](https://www.righto.com/2022/09/the-ranging-system-that-measured.html) traces the code generation logic down to the gate level. His work on reverse-engineering the actual hardware provides an invaluable cross-reference for validating the gr-apollo implementation.
</Aside>
### gr-apollo ranging blocks
<Tabs>
<TabItem label="Transmit">
- **ranging_source** -- Generates the composite PRN chip stream by clocking the five component code generators and combining their outputs. Configurable chip rate (default ~994 kchip/s). Output is one byte per chip (0 or 1).
- **ranging_mod** -- NRZ-encodes the chip stream (+1/-1) and scales it for injection into the PM modulator's composite baseband signal.
</TabItem>
<TabItem label="Receive">
- **ranging_demod** -- Performs FFT-based cross-correlation between the received ranging signal and a locally generated reference. Sequential acquisition is implemented as a state machine that steps through the CL, X, A, B, C stages. Outputs range measurement PDUs containing the measured delay in chips, the equivalent range in meters, and a confidence metric.
</TabItem>
</Tabs>

View File

@ -1,11 +1,11 @@
---
title: "Run the Demos"
description: "How to run gr-apollo's example scripts for loopback testing, voice modulation, full downlink simulation, and AGC integration."
description: "How to run gr-apollo's example scripts for loopback testing, voice modulation, full downlink simulation, AGC integration, uplink commands, and PRN ranging."
---
import { Steps, Aside, Tabs, TabItem, Code, LinkCard, CardGrid } from '@astrojs/starlight/components';
gr-apollo ships with four demo scripts that exercise different parts of the TX/RX chain. They progress from a self-contained streaming loopback to a full mission simulation with crew voice, and finally to live integration with the Virtual AGC emulator.
gr-apollo ships with eight demo scripts that exercise different parts of the TX/RX chain. They progress from a self-contained streaming loopback to a full mission simulation with crew voice, live integration with the Virtual AGC emulator, uplink command encoding, and PRN ranging.
| Demo | Requires | What It Does |
|------|----------|-------------|
@ -13,6 +13,10 @@ gr-apollo ships with four demo scripts that exercise different parts of the TX/R
| `voice_subcarrier_demo.py` | GNU Radio, scipy | Real audio through 1.25 MHz FM |
| `full_downlink_demo.py` | GNU Radio, scipy | PCM telemetry + crew voice on one carrier |
| `agc_loopback_demo.py` | yaAGC (no GR) | Live AGC telemetry over TCP |
| `fetch_apollo_audio.py` | ffmpeg | Download real Apollo recordings from Archive.org |
| `real_signal_demo.py` | GNU Radio, scipy | Process real Apollo audio through full USB chain |
| `uplink_loopback_demo.py` | GNU Radio | Encode DSKY commands, modulate, demodulate, verify |
| `ranging_demo.py` | None (pure Python) | PRN code generation, delay simulation, correlation |
## Prerequisites
@ -367,6 +371,401 @@ The `AGCBridgeClient` auto-reconnects with exponential backoff. You can start th
---
## Audio Downloads
**Script:** `examples/fetch_apollo_audio.py`
**Requires:** ffmpeg
This utility downloads Apollo 11 audio highlights from the Internet Archive as a FLAC file, then extracts individual clips using ffmpeg. The clips are saved as 48 kHz mono WAV files in `examples/audio/` for use with the other signal processing demos.
```mermaid
graph LR
A["Archive.org\nApollo11Highlights.flac"]:::data --> B["urllib\n(download)"]:::timing --> C["FLAC file\n(local)"]:::data
C --> D["ffmpeg\n(seek + extract)"]:::rf --> E["apollo11_liftoff.wav"]:::data
C --> F["ffmpeg\n(seek + extract)"]:::rf --> G["apollo11_eagle_has_landed.wav"]:::data
C --> H["ffmpeg\n(seek + extract)"]:::rf --> I["apollo11_one_small_step.wav"]:::data
classDef data fill:#2d5016,stroke:#4a8c2a,color:#fff
classDef rf fill:#1a3a5c,stroke:#3a7abd,color:#fff
classDef timing fill:#5c3a1a,stroke:#bd7a3a,color:#fff
```
Five clips are defined in the script, covering key mission moments from liftoff through splashdown. The FLAC source is removed after extraction by default to save disk space.
### Usage
```bash
uv run python examples/fetch_apollo_audio.py --list
uv run python examples/fetch_apollo_audio.py --all
uv run python examples/fetch_apollo_audio.py --clip eagle_has_landed
uv run python examples/fetch_apollo_audio.py --clip liftoff --force
uv run python examples/fetch_apollo_audio.py --all --keep-flac
```
### Arguments
| Argument | Default | Description |
|----------|---------|-------------|
| `--list` | off | List available clip names and timestamps |
| `--clip` | -- | Extract a specific clip by name |
| `--all` | off | Extract all five defined clips |
| `--keep-flac` | off | Keep the downloaded FLAC file after extraction |
| `--force` | off | Re-download and re-extract even if files already exist |
| `--output-dir` | `examples/audio/` | Output directory for WAV files |
### Expected Output (--list)
```
Available clips:
liftoff 00:00:05 (00:00:30) Apollo 11 liftoff
eagle_has_landed 00:06:45 (00:00:30) The Eagle has landed
one_small_step 00:15:30 (00:00:25) One small step for man
houston_problem 00:20:00 (00:00:15) Houston, we've had a problem
splashdown 00:42:00 (00:00:20) Splashdown
5 clips defined.
Extract with: --clip NAME or --all
```
### Expected Output (--all)
```
============================================================
Apollo 11 Audio Fetch
============================================================
Step 1: Download source FLAC
Downloading: https://archive.org/download/Apollo11AudioHighlights/Apollo11Highlights.flac
Saving to: examples/audio/Apollo11Highlights.flac
[########################################] 100.0% 45.2/45.2 MB
Downloaded 45.2 MB
Step 2: Extract 5 clip(s)
[liftoff] Extracting: Apollo 11 liftoff
start=00:00:05 duration=00:00:30
-> examples/audio/apollo11_liftoff.wav (2880 KB)
[eagle_has_landed] Extracting: The Eagle has landed
start=00:06:45 duration=00:00:30
-> examples/audio/apollo11_eagle_has_landed.wav (2880 KB)
[one_small_step] Extracting: One small step for man
start=00:15:30 duration=00:00:25
-> examples/audio/apollo11_one_small_step.wav (2400 KB)
[houston_problem] Extracting: Houston, we've had a problem
start=00:20:00 duration=00:00:15
-> examples/audio/apollo11_houston_problem.wav (1440 KB)
[splashdown] Extracting: Splashdown
start=00:42:00 duration=00:00:20
-> examples/audio/apollo11_splashdown.wav (1920 KB)
Removed source FLAC (45.2 MB). Use --keep-flac to retain.
============================================================
Extracted: 5 Failed: 0
Output: examples/audio/apollo11_*.wav
============================================================
```
<Aside type="tip">
Run this script first if you want to use `real_signal_demo.py` with actual Apollo recordings. The bundled `apollo11_crew.wav` clip works as a fallback, but the Archive.org recordings are the real thing.
</Aside>
---
## Real Signal Processing
**Script:** `examples/real_signal_demo.py`
**Requires:** GNU Radio, scipy
This demo auto-discovers WAV files in `examples/audio/` (downloaded by `fetch_apollo_audio.py`) and runs them through the full USB downlink chain: transmit (NRZ + BPSK + voice FM onto PM carrier) then receive (PCM frame recovery + voice demodulation). It proves the gr-apollo signal chain works on real-world audio, not just synthetic test tones.
If no downloaded clips are found, the demo falls back to the bundled `examples/audio/apollo11_crew.wav`.
```mermaid
graph TB
subgraph discover ["Audio Discovery"]
direction LR
A["examples/audio/\napollo11_*.wav"]:::data --> B["auto-discover\n(skip output files)"]:::timing
end
subgraph TX ["TX (spacecraft)"]
direction LR
C["pcm_frame_source\n→ nrz → bpsk_mod"]:::data --> F["add_ff"]:::rf
D["real audio clip\n→ resample → upsample"]:::data --> E["fm_voice_mod\n× 0.764"]:::rf --> F
F --> G["pm_mod"]:::rf
end
subgraph RX ["RX (ground station)"]
direction LR
H["pm_demod"]:::rf --> I["bpsk_demod\n→ frame_sync"]:::rf
H --> J["voice_demod"]:::rf
I --> K["PCM frames"]:::data
J --> L["recovered WAV"]:::data
end
B --> D
G --> H
classDef data fill:#2d5016,stroke:#4a8c2a,color:#fff
classDef rf fill:#1a3a5c,stroke:#3a7abd,color:#fff
classDef timing fill:#5c3a1a,stroke:#bd7a3a,color:#fff
```
### Usage
```bash
uv run python examples/real_signal_demo.py
uv run python examples/real_signal_demo.py --clip eagle_has_landed
uv run python examples/real_signal_demo.py --snr 25
uv run python examples/real_signal_demo.py --clip liftoff --play
```
### Arguments
| Argument | Default | Description |
|----------|---------|-------------|
| `--clip` | first discovered | Process a specific clip by name |
| `--snr` | None | Add AWGN noise at this SNR in dB |
| `--play` | off | Play recovered audio with `aplay` after processing |
### Expected Output
```
============================================================
Apollo Real Signal Demo
Full USB downlink: PCM telemetry + crew voice
============================================================
Found 5 clip(s): liftoff, eagle_has_landed, one_small_step, houston_problem, splashdown
Processing: eagle_has_landed
------------------------------------------------------------
Loading: examples/audio/apollo11_eagle_has_landed.wav
Duration: 30.00s, 153,600,000 baseband samples
TX: 153,600,000 samples, ~1502 PCM frames, SNR=clean
TX complete: 153,600,000 complex samples (28.5s)
RX PCM: 1498 frames recovered (41.2s)
RX voice: 240,000 samples, 30.00s (25.1s)
Saved: examples/audio/apollo11_eagle_has_landed_recovered.wav
============================================================
Summary
============================================================
Clip: eagle_has_landed
Input duration: 30.00s
Recovered audio: 30.00s
PCM frames: 1498 recovered (expected ~1502)
SNR: clean
Processing time: TX=28.5s PCM-RX=41.2s Voice-RX=25.1s
Output: examples/audio/apollo11_eagle_has_landed_recovered.wav
============================================================
Play recovered: aplay examples/audio/apollo11_eagle_has_landed_recovered.wav
```
<Aside type="note">
The signal path is identical to `full_downlink_demo.py` -- the difference is that this script auto-discovers audio clips and works with whatever `fetch_apollo_audio.py` has downloaded. If the `examples/audio/` directory only contains the bundled `apollo11_crew.wav`, the demo uses that instead.
</Aside>
---
## Uplink Loopback
**Script:** `examples/uplink_loopback_demo.py`
**Requires:** GNU Radio
This demo exercises the full uplink signal chain. It encodes a DSKY command (V16N36E by default), serializes the words to a bit stream, modulates through the RF path (NRZ, FM onto a 70 kHz data subcarrier, then PM), demodulates on the other end, and deserializes the recovered bits back to uplink words. The TX and RX word sequences are compared for a word-for-word match.
```mermaid
graph LR
subgraph TX ["TX (ground station)"]
direction LR
A["UplinkEncoder\n(V16N36E)"]:::data --> B["UplinkSerializer\n(word→bits)"]:::data --> C["nrz_encoder"]:::timing
C --> D["FM mod\n(±4 kHz)"]:::rf --> E["upconvert\n70 kHz"]:::rf --> F["pm_mod\n(1.0 rad)"]:::rf
end
subgraph RX ["RX (spacecraft)"]
direction LR
G["pm_demod"]:::rf --> H["subcarrier_extract\n70 kHz"]:::rf --> I["FM demod"]:::rf
I --> J["matched filter\n+ slicer"]:::timing --> K["UplinkDeserializer\n(bits→words)"]:::data
end
F --> G
classDef data fill:#2d5016,stroke:#4a8c2a,color:#fff
classDef rf fill:#1a3a5c,stroke:#3a7abd,color:#fff
classDef timing fill:#5c3a1a,stroke:#bd7a3a,color:#fff
```
The pure-Python engines handle word-to-bit conversion at the endpoints, while the GNU Radio streaming chain proves the RF modulation path works end-to-end.
### Usage
```bash
uv run python examples/uplink_loopback_demo.py
uv run python examples/uplink_loopback_demo.py --snr 20
uv run python examples/uplink_loopback_demo.py --snr 10 --verb 37 --noun 0
```
### Arguments
| Argument | Default | Description |
|----------|---------|-------------|
| `--verb` | 16 | Verb number for the DSKY command |
| `--noun` | 36 | Noun number for the DSKY command |
| `--snr` | None | SNR in dB (None = clean, no noise) |
### Expected Output
```
============================================================
Apollo Uplink Loopback Demo
============================================================
Command: V16N36E
Uplink words: 7
SNR: clean (no noise)
TX word sequence:
[0] ch=032 val=10000 (4096) bits=001000000000000
[1] ch=032 val=10001 (4097) bits=001000000000001
[2] ch=032 val=10110 (4168) bits=001000001110000
[3] ch=032 val=00011 ( 3) bits=000000000000011
[4] ch=032 val=00110 ( 6) bits=000000000000110
[5] ch=032 val=11000 (6144) bits=001100000000000
[6] ch=032 val=10010 (4106) bits=001000000001010
Total bits: 8036 (504 data + 7532 idle)
Samples per bit: 2560
Total samples: 20,572,160
Duration: 4.018 s
Building flowgraph...
Running flowgraph (TX -> RX)...
Recovered 8036 bits from slicer
Recovered 7 words (expected 7)
RX word sequence:
[0] ch=032 val=10000 (4096) bits=001000000000000
[1] ch=032 val=10001 (4097) bits=001000000000001
[2] ch=032 val=10110 (4168) bits=001000001110000
[3] ch=032 val=00011 ( 3) bits=000000000000011
[4] ch=032 val=00110 ( 6) bits=000000000000110
[5] ch=032 val=11000 (6144) bits=001100000000000
[6] ch=032 val=10010 (4106) bits=001000000001010
------------------------------------------------------------
Words transmitted: 7
Words recovered: 7
Matches: 7/7
Word error rate: 0.0%
------------------------------------------------------------
V16N36E round-trip: all 7 words match.
```
<Aside type="tip">
The bit rate is 2 kbps and the data subcarrier is 70 kHz -- both per the Apollo USB uplink specification. The 0.5-second idle periods before and after the data give the PLL time to settle. If you lower the SNR below about 8 dB, you may start seeing word errors in the recovered sequence.
</Aside>
---
## PRN Ranging
**Script:** `examples/ranging_demo.py`
**Requires:** None (pure Python, no GNU Radio needed)
This demo exercises the Apollo ranging subsystem. It generates the composite PRN ranging code from its five component codes (CL, X, A, B, C), verifies their algebraic properties, NRZ-encodes the chip stream, applies a known propagation delay to simulate spacecraft distance, optionally adds noise, and cross-correlates to recover the delay. The measured range is compared against the true range.
```mermaid
graph LR
A["RangingCodeGenerator\n(CL×X×A×B×C)"]:::data --> B["NRZ encode\n(bipolar ±1)"]:::timing --> C["np.roll\n(apply delay)"]:::rf
C --> D["+ AWGN\n(optional)"]:::rf --> E["RangingCorrelator\n(cross-correlate)"]:::data
E --> F["range estimate\n(km)"]:::data
classDef data fill:#2d5016,stroke:#4a8c2a,color:#fff
classDef rf fill:#1a3a5c,stroke:#3a7abd,color:#fff
classDef timing fill:#5c3a1a,stroke:#bd7a3a,color:#fff
```
The demo works at chip rate (1 sample per chip) for simplicity and speed. The composite PRN code length is 2,037,675 chips, giving an unambiguous range that covers Earth-to-Moon distances.
### Usage
```bash
uv run python examples/ranging_demo.py
uv run python examples/ranging_demo.py --range-km 100
uv run python examples/ranging_demo.py --range-km 384400 # Moon distance
uv run python examples/ranging_demo.py --snr 20
uv run python examples/ranging_demo.py --chips 200000
```
### Arguments
| Argument | Default | Description |
|----------|---------|-------------|
| `--range-km` | 100.0 | Target range in km |
| `--snr` | None | SNR in dB (None = clean, no noise) |
| `--chips` | 50,000 | Number of chips for correlation |
### Expected Output
```
============================================================
Apollo PRN Ranging Demo
============================================================
1. Component code verification
----------------------------------------
CL: length= 2 (1s=1, 0s=1), periodic=True, balance=OK [OK]
X: length= 11 (1s=6, 0s=5), periodic=True, balance=OK [OK]
A: length= 5 (1s=3, 0s=2), periodic=True, balance=OK [OK]
B: length= 7 (1s=4, 0s=3), periodic=True, balance=OK [OK]
C: length= 5 (1s=3, 0s=2), periodic=True, balance=OK [OK]
Code length: 2,037,675 = 2*11*5*7*5*... [OK]
Composite sample (50,000 chips): balance=0.500 (ideal ~0.5)
2. Generating 50,000 PRN chips...
Generated in 2.3 ms
Ones: 25,012 / 50,000 (50.0%)
3. Simulating range: 100.0 km
Round-trip distance: 200.0 km
Round-trip time: 0.6671 ms
Delay: 6.67 chips (7 samples)
Added AWGN noise at 20 dB SNR (noise power = 0.0100)
4. Cross-correlating...
Correlation time: 4.1 ms
Peak-to-average ratio: 224.3
5. Range measurement results
----------------------------------------
True range: 100.0 km
Measured range: 104.9 km
Error: 4950.0 m
Quantization step: 14984.4 m (1 chip, two-way)
Delay (chips): 7.00
Delay (samples): 7
Correlation peak: 50000
Error is within one quantization step -- measurement is correct.
============================================================
```
<Aside type="note">
At chip rate (1 sample per chip), the range quantization step is about 15 km. This is the coarse acquisition mode. The real Apollo system achieved finer resolution by oversampling the chip stream and interpolating the correlation peak. The `--chips` argument controls how many chips are correlated -- more chips improve the peak-to-average ratio and noise resistance, but the quantization step stays the same unless you oversample.
</Aside>
<Aside type="tip">
Try `--range-km 384400` to simulate ranging to the Moon. The composite code is long enough to handle the round-trip without ambiguity. With `--snr 10`, you can see how the correlation peak rises above the noise floor even in degraded conditions.
</Aside>
---
## Which Demo to Start With
If you are new to gr-apollo:
@ -379,12 +778,25 @@ If you are new to gr-apollo:
4. **Connect to yaAGC** when you are ready to interact with a running Apollo Guidance Computer.
5. **Run the ranging demo** to see PRN code generation and correlation at work. It is pure Python with no GNU Radio dependency, so it runs anywhere.
6. **Try the uplink loopback** to see DSKY commands travel through the RF chain and come back intact on the other side.
<Aside type="tip">
If you want to work with real Apollo recordings rather than the bundled test clip, run `fetch_apollo_audio.py --all` first. The downloaded clips are automatically picked up by `real_signal_demo.py` and can also be passed directly to the voice and full-downlink demos.
</Aside>
<CardGrid>
<LinkCard
title="Build a Transmit Signal"
description="Detailed walkthrough of the TX block chain"
href="/guides/transmit-signal/"
/>
<LinkCard
title="Signal Architecture"
description="Uplink, downlink, and ranging signal paths"
href="/reference/signal-architecture/"
/>
<LinkCard
title="Block Reference"
description="Full API docs for all blocks used in the demos"

View File

@ -8,7 +8,7 @@ import { Tabs, TabItem, Aside } from '@astrojs/starlight/components';
Every component in gr-apollo falls into one of two categories: **GNU Radio blocks** that require the `gnuradio` runtime, and **pure-Python engines** that work standalone. The engines power the GR blocks internally, but you can also use them directly for testing, scripting, or integration without GNU Radio.
<Aside type="note">
Blocks that require GNU Radio are imported lazily. If `gnuradio` is not installed, the pure-Python engines (`FrameSyncEngine`, `DemuxEngine`, `DownlinkEngine`, `AGCBridgeClient`, `UplinkEncoder`, `generate_usb_baseband`) remain available.
Blocks that require GNU Radio are imported lazily. If `gnuradio` is not installed, the pure-Python engines (`FrameSyncEngine`, `DemuxEngine`, `DownlinkEngine`, `AGCBridgeClient`, `UplinkEncoder`, `UplinkSerializerEngine`, `UplinkDeserializerEngine`, `RangingCodeGenerator`, `RangingCorrelator`, `generate_usb_baseband`) remain available.
</Aside>
---
@ -1467,3 +1467,610 @@ complex in -> fm_demod -> sco_demod(ch1) -> output[0]
-> sco_demod(ch2) -> output[1]
-> sco_demod(chN) -> output[N-1]
```
---
## Uplink Chain
The uplink carries ground-station commands to the spacecraft as 15-bit AGC words at 2 kbps NRZ on a 70 kHz FM subcarrier, phase-modulated onto the 2106.40625 MHz uplink carrier at 1.0 rad peak deviation. Each word triggers an UPRUPT interrupt in the AGC flight software.
### `UplinkSerializerEngine` / `UplinkDeserializerEngine`
<Tabs>
<TabItem label="Serializer Engine">
**Module:** `apollo.uplink_word_codec`
**Type:** Pure-Python class
**Purpose:** Serialize (channel, value) pairs into a continuous NRZ bit stream with configurable inter-word gap.
```python
from apollo.uplink_word_codec import UplinkSerializerEngine
engine = UplinkSerializerEngine(inter_word_gap=3)
engine.add_words([(37, 0x4400), (37, 0x0400)])
bits = engine.next_bits(36)
```
#### Constructor Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `inter_word_gap` | `int` | `3` | Number of zero-bit periods between consecutive words (`UPLINK_INTER_WORD_GAP`) |
#### Methods
| Method | Signature | Description |
|--------|-----------|-------------|
| `add_words` | `(pairs: list[tuple[int, int]]) -> None` | Queue (channel, value) pairs for serialization. Each value is serialized as 15 bits MSB-first, followed by inter-word gap zeros. The channel is not transmitted -- it is used only for metadata/logging |
| `next_bits` | `(n: int) -> list[int]` | Pull exactly `n` bits from the queue. Returns queued data bits when available, zeros otherwise (idle carrier) |
#### Properties
| Property | Type | Description |
|----------|------|-------------|
| `pending` | `int` | Number of bits remaining in the queue |
</TabItem>
<TabItem label="Deserializer Engine">
**Module:** `apollo.uplink_word_codec`
**Type:** Pure-Python class
**Purpose:** Reassemble 15-bit AGC words from a recovered NRZ bit stream using a two-phase state machine (acquisition and locked framing).
```python
from apollo.uplink_word_codec import UplinkDeserializerEngine
engine = UplinkDeserializerEngine(inter_word_gap=3)
pairs = engine.process_bits([1, 0, 1, 1, ...])
# Returns: [(37, 0x4400), ...]
```
#### Constructor Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `inter_word_gap` | `int` | `3` | Expected number of zero bits between words (`UPLINK_INTER_WORD_GAP`) |
| `channel` | `int` | `37` | AGC channel to assign to recovered words (`AGC_CH_INLINK`, 045 octal) |
#### Methods
| Method | Signature | Description |
|--------|-----------|-------------|
| `process_bits` | `(bits: list[int]) -> list[tuple[int, int]]` | Process a batch of recovered bits. Returns list of (channel, value) tuples for each completed word |
| `reset` | `() -> None` | Clear internal state for a fresh decode pass |
#### State Machine
The deserializer uses a two-phase approach:
1. **Acquisition** -- Scans for the first non-zero bit (all DSKY keycodes have at least one set bit in the upper 5 bits, so the first transmitted word always starts with a 1).
2. **Locked** -- Uses fixed framing: collects exactly 15 bits per word, skips exactly `inter_word_gap` bits, then collects the next 15, etc. The lock releases after seeing a null word (all zeros), returning to acquisition.
<Aside type="note">
Fixed framing after the first word boundary is necessary because data words can start with leading zeros that would be indistinguishable from the inter-word gap in a bit-synchronous scheme.
</Aside>
</TabItem>
</Tabs>
---
### `uplink_word_serializer`
**Module:** `apollo.uplink_word_codec`
**Type:** `gr.sync_block`
**Purpose:** GNU Radio source block wrapping `UplinkSerializerEngine`. Accepts word PDUs on message input, outputs continuous NRZ byte stream.
```python
from apollo.uplink_word_codec import uplink_word_serializer
blk = uplink_word_serializer(inter_word_gap=3)
```
#### I/O Signature
| Port | Direction | Type | Description |
|------|-----------|------|-------------|
| (none) | Input | (none) | Source block -- no streaming input |
| `out0` | Output | `byte` | NRZ bit stream (values 0 or 1). Zeros when idle |
| `"words"` | Input | Message (PDU) | Word injection: PDU with metadata dict containing `"channel"` and `"value"` keys, or a pair `(channel . value)` |
#### Constructor Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `inter_word_gap` | `int` | `3` | Number of zero-bit periods between words (`UPLINK_INTER_WORD_GAP`) |
#### Input PDU Format
The `"words"` message port accepts the same format emitted by `uplink_encoder`:
| Key | PMT Type | Description |
|-----|----------|-------------|
| `"channel"` | `pmt.from_long` | AGC channel number (37 for INLINK) |
| `"value"` | `pmt.from_long` | 15-bit word value |
---
### `uplink_word_deserializer`
**Module:** `apollo.uplink_word_codec`
**Type:** `gr.basic_block`
**Purpose:** GNU Radio block wrapping `UplinkDeserializerEngine`. Consumes byte stream, emits PDU messages for each recovered 15-bit word.
```python
from apollo.uplink_word_codec import uplink_word_deserializer
blk = uplink_word_deserializer(inter_word_gap=3, channel=37)
```
#### I/O Signature
| Port | Direction | Type | Description |
|------|-----------|------|-------------|
| `in0` | Input | `byte` (streaming) | NRZ bit stream (values 0 or 1) from binary slicer |
| `"commands"` | Output | Message (PDU) | Recovered word PDUs |
#### Constructor Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `inter_word_gap` | `int` | `3` | Expected number of zero bits between words (`UPLINK_INTER_WORD_GAP`) |
| `channel` | `int` | `37` | AGC channel to assign to recovered words (`AGC_CH_INLINK`) |
#### Output PDU Format
Each output PDU is `pmt.cons(metadata, pmt.PMT_NIL)`:
**Metadata dict:**
| Key | PMT Type | Description |
|-----|----------|-------------|
| `"channel"` | `pmt.from_long` | AGC channel number (37) |
| `"value"` | `pmt.from_long` | Recovered 15-bit word value |
---
### `usb_uplink_source`
**Module:** `apollo.usb_uplink_source`
**Type:** `gr.hier_block2`
**Purpose:** Complete Apollo USB uplink transmit chain -- serializes 15-bit AGC words into NRZ bits, FM-modulates onto 70 kHz subcarrier, and applies PM modulation to produce complex baseband.
```python
from apollo.usb_uplink_source import usb_uplink_source
blk = usb_uplink_source(sample_rate=5_120_000, snr_db=20.0)
```
#### I/O Signature
| Port | Direction | Type | Description |
|------|-----------|------|-------------|
| (none) | Input | (none) | Source block -- no streaming input |
| `out0` | Output | `complex` | PM-modulated complex baseband at `sample_rate` |
| `"words"` | Input | Message | Forwarded to `uplink_word_serializer` for command injection. Accepts the same PDU format emitted by `uplink_encoder` |
#### Constructor Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `sample_rate` | `float` | `5120000` | Baseband sample rate in Hz (`SAMPLE_RATE_BASEBAND`) |
| `bit_rate` | `int` | `2000` | Uplink data bit rate in bps (`UPLINK_DATA_BIT_RATE`) |
| `pm_deviation` | `float` | `1.0` | Peak PM deviation in radians (`UPLINK_PM_DEVIATION_RAD`) |
| `snr_db` | `float \| None` | `None` | Add AWGN at this SNR. `None` means no noise |
#### Internal Signal Chain
```
word_ser -> nrz_encoder(2 kbps) -> fm_mod(sensitivity)
-> (mixer, 0); sig_source_c(70 kHz) -> (mixer, 1)
-> mixer -> complex_to_real -> pm_mod(1.0 rad) -> [+AWGN] -> output
```
#### Sub-Block Access
| Attribute | Type | Block |
|-----------|------|-------|
| `self.word_ser` | `uplink_word_serializer` | Word serializer |
| `self.nrz` | `nrz_encoder` | NRZ line encoder |
| `self.fm_mod` | `frequency_modulator_fc` | FM modulator |
| `self.lo` | `sig_source_c` | 70 kHz LO |
| `self.mixer` | `multiply_cc` | Subcarrier upconverter |
| `self.to_real` | `complex_to_real` | Complex-to-real conversion |
| `self.pm` | `pm_mod` | Phase modulator |
| `self.noise` | `noise_source_c` | AWGN source (when `snr_db` is set) |
---
### `usb_uplink_receiver`
**Module:** `apollo.usb_uplink_receiver`
**Type:** `gr.hier_block2`
**Purpose:** Complete Apollo USB uplink receiver chain -- complex baseband input to recovered command PDUs. The spacecraft-side counterpart to `usb_uplink_source`.
```python
from apollo.usb_uplink_receiver import usb_uplink_receiver
blk = usb_uplink_receiver(sample_rate=5_120_000)
```
#### I/O Signature
| Port | Direction | Type | Description |
|------|-----------|------|-------------|
| `in0` | Input | `complex` (streaming) | Baseband IQ samples at `sample_rate` |
| `"commands"` | Output | Message (PDU) | Decoded (channel, value) PDUs for AGC bridge |
<Aside type="note">
This block has one streaming input and zero streaming outputs. All output is via the `"commands"` message port.
</Aside>
#### Constructor Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `sample_rate` | `float` | `5120000` | Input sample rate in Hz (`SAMPLE_RATE_BASEBAND`) |
| `bit_rate` | `int` | `2000` | Uplink data bit rate in bps (`UPLINK_DATA_BIT_RATE`) |
| `carrier_pll_bw` | `float` | `0.02` | PM demodulator PLL bandwidth (rad/sample) |
| `subcarrier_bw` | `float` | `20000` | 70 kHz subcarrier bandpass filter width (Hz) |
#### Internal Signal Chain
```
complex in -> pm_demod -> subcarrier_extract(70 kHz, BW=20 kHz)
-> quadrature_demod_cf(FM) -> matched_filter(1/samp_per_bit)
-> keep_one_in_n(samp_per_bit) -> binary_slicer_fb
-> uplink_word_deserializer -> "commands" message output
```
#### Sub-Block Access
| Attribute | Type | Block |
|-----------|------|-------|
| `self.pm` | `pm_demod` | PM demodulator |
| `self.sc_extract` | `subcarrier_extract` | 70 kHz subcarrier extractor |
| `self.fm_demod` | `quadrature_demod_cf` | FM discriminator |
| `self.matched_filter` | `fir_filter_fff` | Integrate-and-dump matched filter |
| `self.decimator` | `keep_one_in_n` | Bit-rate decimator |
| `self.slicer` | `binary_slicer_fb` | Hard-decision binary slicer |
| `self.deser` | `uplink_word_deserializer` | Word reassembler |
---
## PRN Ranging
The Apollo ranging system measures spacecraft distance by transmitting a composite pseudo-random noise (PRN) code and correlating the echo. The code combines five component sequences (CL, X, A, B, C) using majority-vote logic and XOR operations. Combined code length: 5,456,682 chips (~5.49 seconds at 993,963 chips/sec).
### `RangingCodeGenerator`
**Module:** `apollo.ranging`
**Type:** Pure-Python class
**Purpose:** Generate Apollo PRN ranging code sequences -- individual component codes or the full composite sequence.
```python
from apollo.ranging import RangingCodeGenerator
gen = RangingCodeGenerator()
composite = gen.generate_sequence() # Full 5.4M chip code
a_code = gen.generate_a() # 31-chip A component
x_code = gen.generate_component("x") # By name
```
#### Methods
| Method | Signature | Description |
|--------|-----------|-------------|
| `generate_cl` | `(n_chips: int \| None) -> np.ndarray` | Generate CL component: alternating 0, 1 clock. Default length: 2 |
| `generate_x` | `(n_chips: int \| None) -> np.ndarray` | Generate X component: 11-chip non-LFSR sequence with custom feedback logic |
| `generate_a` | `(n_chips: int \| None) -> np.ndarray` | Generate A component: 31-chip LFSR, 5 bits, taps [2,0] (x^5+x^2+1) |
| `generate_b` | `(n_chips: int \| None) -> np.ndarray` | Generate B component: 63-chip LFSR, 6 bits, taps [1,0] (x^6+x+1) |
| `generate_c` | `(n_chips: int \| None) -> np.ndarray` | Generate C component: 127-chip LFSR, 7 bits, taps [1,0] (x^7+x+1) |
| `generate_component` | `(name: str, n_chips: int \| None) -> np.ndarray` | Generate a named component (`"cl"`, `"x"`, `"a"`, `"b"`, `"c"`). Raises `ValueError` if name is not recognized |
| `generate_sequence` | `(n_chips: int \| None) -> np.ndarray` | Generate the full composite PRN code. Default: one full period (5,456,682 chips) |
All methods return `uint8` numpy arrays of 0/1 values. When `n_chips` is `None`, generates one full period for that component.
#### Component Code Properties
| Component | Length | Register | Taps | Init | Type |
|-----------|--------|----------|------|------|------|
| CL | 2 | -- | -- | -- | Alternating clock |
| X | 11 | 5-bit | Custom feedback | `0b10110` (22) | Non-LFSR |
| A | 31 | 5-bit | [2, 0] | `0x1F` (all ones) | Maximal-length LFSR |
| B | 63 | 6-bit | [1, 0] | `0x3F` (all ones) | Maximal-length LFSR |
| C | 127 | 7-bit | [1, 0] | `0x7F` (all ones) | Maximal-length LFSR |
#### Composite Code Generation
The composite code reproduces Shirriff's `calc()` algorithm: on even output chips (ck=0), all shift registers advance and the output is computed from feedback bits via majority logic. On odd chips (ck=1), the output is flipped.
Combination logic per step: `out = (NOT(xnew) AND maj(anew, bnew, cnew)) XOR ck`
where `maj(A,B,C) = (A&B) | (A&C) | (B&C)`.
---
### `RangingCorrelator`
**Module:** `apollo.ranging`
**Type:** Pure-Python class
**Purpose:** Cross-correlate received NRZ samples with the known PRN code using FFT-based correlation for range measurement.
```python
from apollo.ranging import RangingCorrelator
import numpy as np
correlator = RangingCorrelator(sample_rate=5_120_000, two_way=True)
result = correlator.correlate(received_samples)
print(f"Range: {result['range_m']:.1f} m")
```
#### Constructor Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `chip_rate` | `int` | `993963` | PRN chip rate in chips/sec (`RANGING_CHIP_RATE_HZ`) |
| `sample_rate` | `float` | `993963` | Input sample rate in Hz. When equal to `chip_rate`, each chip is one sample |
| `two_way` | `bool` | `True` | If `True`, range is halved (signal traveled ground to spacecraft and back) |
#### Methods
| Method | Signature | Description |
|--------|-----------|-------------|
| `correlate` | `(received: np.ndarray, code_chips: int \| None) -> dict` | Cross-correlate received float samples with PRN reference. Returns measurement dict |
#### Properties
| Property | Type | Description |
|----------|------|-------------|
| `chip_rate` | `int` | PRN chip rate in chips/sec |
| `sample_rate` | `float` | Input sample rate in Hz |
| `two_way` | `bool` | Two-way range mode |
| `samples_per_chip` | `float` | Ratio `sample_rate / chip_rate` |
#### `correlate` Return Value
| Key | Type | Description |
|-----|------|-------------|
| `delay_samples` | `int` | Peak correlation index in samples |
| `delay_chips` | `float` | Delay measured in chip periods |
| `range_m` | `float` | Computed range in meters (halved if `two_way=True`) |
| `correlation_peak` | `float` | Absolute value of the correlation peak |
| `peak_to_avg_ratio` | `float` | Peak divided by mean correlation magnitude. Higher values indicate cleaner detection |
---
### `chips_to_range_m`
**Module:** `apollo.ranging`
**Type:** Pure-Python function
**Purpose:** Convert chip delay to range in meters.
```python
from apollo.ranging import chips_to_range_m
distance = chips_to_range_m(delay_chips=100.0, two_way=True)
```
#### Signature
```python
def chips_to_range_m(delay_chips: float, two_way: bool = True) -> float
```
#### Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `delay_chips` | `float` | -- | Delay measured in chip periods |
| `two_way` | `bool` | `True` | If `True`, divide distance by 2 (round-trip signal path) |
#### Return Value
Range in meters (`float`). Uses `SPEED_OF_LIGHT_M_S` (299,792,458 m/s) and `RANGING_CHIP_RATE_HZ` (993,963 chip/s).
---
### `range_m_to_chips`
**Module:** `apollo.ranging`
**Type:** Pure-Python function
**Purpose:** Convert range in meters to chip delay.
```python
from apollo.ranging import range_m_to_chips
chips = range_m_to_chips(range_m=384_400_000, two_way=True)
```
#### Signature
```python
def range_m_to_chips(range_m: float, two_way: bool = True) -> float
```
#### Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `range_m` | `float` | -- | Distance in meters |
| `two_way` | `bool` | `True` | If `True`, compute round-trip delay (distance is doubled before conversion) |
#### Return Value
Delay in chip periods (`float`).
---
### `verify_code_properties`
**Module:** `apollo.ranging`
**Type:** Pure-Python function
**Purpose:** Self-test for code correctness -- verify all component codes have correct length, periodicity, and balance properties.
```python
from apollo.ranging import verify_code_properties
results = verify_code_properties()
for name, props in results.items():
print(f"{name}: {props}")
```
#### Signature
```python
def verify_code_properties() -> dict
```
#### Parameters
None.
#### Return Value
Dict with component names as keys (`"cl"`, `"x"`, `"a"`, `"b"`, `"c"`, `"length_product"`, `"composite_sample"`) and property dicts as values.
**Per-component dict:**
| Key | Type | Description |
|-----|------|-------------|
| `length` | `int` | Actual generated length |
| `length_correct` | `bool` | Whether length matches the expected constant |
| `ones_count` | `int` | Number of 1-chips in the sequence |
| `zeros_count` | `int` | Number of 0-chips in the sequence |
| `periodic` | `bool` | Whether 2x generation repeats exactly |
| `balance_correct` | `bool` | Whether ones/zeros ratio matches LFSR theory (for A, B, C, CL) |
**`length_product` dict:**
| Key | Type | Description |
|-----|------|-------------|
| `expected` | `int` | Product of all component lengths (2 x 11 x 31 x 63 x 127 = 5,456,682) |
| `matches_constant` | `bool` | Whether `RANGING_CODE_LENGTH` equals the product |
**`composite_sample` dict:**
| Key | Type | Description |
|-----|------|-------------|
| `length` | `int` | Length of the test sample (10,000 chips) |
| `ones_count` | `int` | Number of 1-chips |
| `zeros_count` | `int` | Number of 0-chips |
| `balance` | `float` | Ratio of ones to total (near 0.5 for a balanced code) |
---
### `ranging_source`
**Module:** `apollo.ranging_source`
**Type:** `gr.sync_block`
**Purpose:** GNU Radio source block producing a continuous PRN ranging chip stream. Pre-generates the full code period and cycles through it.
```python
from apollo.ranging_source import ranging_source
blk = ranging_source()
```
#### I/O Signature
| Port | Direction | Type | Description |
|------|-----------|------|-------------|
| (none) | Input | (none) | Source block -- no streaming input |
| `out0` | Output | `byte` | PRN chip stream (values 0 or 1), repeating every 5,456,682 chips |
#### Constructor Parameters
None. The code period and chip rate are determined by the ranging constants.
<Aside type="note">
The full 5,456,682-chip code is pre-generated at construction time and stored in memory (~5.2 MB). Streaming is zero-allocation after startup.
</Aside>
---
### `ranging_mod`
**Module:** `apollo.ranging_mod`
**Type:** `gr.hier_block2`
**Purpose:** NRZ-encode PRN chips at the baseband sample rate. Converts chip stream (bytes 0/1) to float NRZ waveform (+1/-1) suitable for summing with other subcarriers before PM modulation.
```python
from apollo.ranging_mod import ranging_mod
blk = ranging_mod(chip_rate=993_963, sample_rate=5_120_000)
```
#### I/O Signature
| Port | Direction | Type | Description |
|------|-----------|------|-------------|
| `in0` | Input | `byte` | Chip stream from `ranging_source` (values 0 or 1) |
| `out0` | Output | `float` | NRZ waveform (+1.0 / -1.0) at `sample_rate` |
#### Constructor Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `chip_rate` | `int` | `993963` | PRN chip rate in chips/sec (`RANGING_CHIP_RATE_HZ`) |
| `sample_rate` | `float` | `5120000` | Output sample rate in Hz (`SAMPLE_RATE_BASEBAND`) |
#### Properties
| Property | Type | Description |
|----------|------|-------------|
| `samples_per_chip` | `int` | Samples per chip period: `int(sample_rate / chip_rate)` |
#### Internal Chain
`input -> char_to_float -> multiply_const_ff(2.0) -> add_const_ff(-1.0) -> repeat(samples_per_chip) -> output`
<Aside type="note">
At 5.12 MHz sample rate with 993,963 chip/s, `samples_per_chip` is 5 (integer truncation of 5.1509...). This is functionally identical to `nrz_encoder` but configured for the ranging chip rate instead of the PCM bit rate.
</Aside>
---
### `ranging_demod`
**Module:** `apollo.ranging_demod`
**Type:** `gr.basic_block`
**Purpose:** FFT-based ranging demodulator. Accumulates samples in batches, cross-correlates with the known PRN code, and emits range measurement PDUs.
```python
from apollo.ranging_demod import ranging_demod
blk = ranging_demod(sample_rate=5_120_000, correlation_length=100_000)
```
#### I/O Signature
| Port | Direction | Type | Description |
|------|-----------|------|-------------|
| `in0` | Input | `float` (streaming) | PM demod output or filtered ranging signal |
| `"range"` | Output | Message (PDU) | Range measurement PDUs, one per correlation batch |
<Aside type="note">
This block has one streaming input and zero streaming outputs. All output is via the `"range"` message port.
</Aside>
#### Constructor Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `chip_rate` | `int` | `993963` | PRN chip rate in chips/sec (`RANGING_CHIP_RATE_HZ`) |
| `sample_rate` | `float` | `5120000` | Input sample rate in Hz (`SAMPLE_RATE_BASEBAND`) |
| `correlation_length` | `int` | `100000` | Number of samples to accumulate per correlation batch. Longer batches improve SNR at the cost of measurement rate |
| `two_way` | `bool` | `True` | If `True`, range is halved (round-trip signal path) |
#### Output PDU Format
Each PDU is `pmt.cons(metadata, pmt.PMT_NIL)`:
**Metadata dict:**
| Key | PMT Type | Description |
|-----|----------|-------------|
| `"delay_chips"` | `pmt.from_double` | Measured delay in chip periods |
| `"range_m"` | `pmt.from_double` | Computed range in meters |
| `"correlation_peak"` | `pmt.from_double` | Absolute value of the correlation peak |
| `"peak_to_avg_ratio"` | `pmt.from_double` | Peak-to-average ratio (detection quality metric) |
| `"timestamp"` | `pmt.from_double` | `time.time()` when the measurement was emitted |

289
examples/fetch_apollo_audio.py Executable file
View File

@ -0,0 +1,289 @@
#!/usr/bin/env python3
"""
Fetch Apollo audio recordings from Archive.org.
Downloads the Apollo 11 audio highlights compilation and extracts individual
clips using ffmpeg. Source material is from the Internet Archive's Apollo 11
collection.
Clips are saved as 48 kHz mono WAV files in examples/audio/ for use with
the gr-apollo signal processing demos.
Usage:
uv run python examples/fetch_apollo_audio.py --list
uv run python examples/fetch_apollo_audio.py --all
uv run python examples/fetch_apollo_audio.py --clip eagle_has_landed
uv run python examples/fetch_apollo_audio.py --clip liftoff --force
"""
import argparse
import os
import shutil
import subprocess
import sys
import urllib.request
# Output directory: examples/audio/ relative to this script
AUDIO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "audio")
# Source FLAC from the Internet Archive
FLAC_URL = "https://archive.org/download/Apollo11AudioHighlights/Apollo11Highlights.flac"
FLAC_FILENAME = "Apollo11Highlights.flac"
# Clip definitions -- timestamps are approximate offsets into the highlights reel.
# The important thing is having a working extraction pipeline; timestamps can be
# refined once someone listens through the actual source file.
CLIPS = {
"liftoff": {
"start": "00:00:05",
"duration": "00:00:30",
"description": "Apollo 11 liftoff",
},
"eagle_has_landed": {
"start": "00:06:45",
"duration": "00:00:30",
"description": "The Eagle has landed",
},
"one_small_step": {
"start": "00:15:30",
"duration": "00:00:25",
"description": "One small step for man",
},
"houston_problem": {
"start": "00:20:00",
"duration": "00:00:15",
"description": "Houston, we've had a problem",
},
"splashdown": {
"start": "00:42:00",
"duration": "00:00:20",
"description": "Splashdown",
},
}
def check_ffmpeg():
"""Verify ffmpeg is available on PATH."""
if shutil.which("ffmpeg") is None:
print("ERROR: ffmpeg not found on PATH.", file=sys.stderr)
print("Install it with your package manager:", file=sys.stderr)
print(" Arch: pacman -S ffmpeg", file=sys.stderr)
print(" Debian: apt install ffmpeg", file=sys.stderr)
print(" macOS: brew install ffmpeg", file=sys.stderr)
sys.exit(1)
def _progress_hook(block_num, block_size, total_size):
"""Report download progress to stderr."""
downloaded = block_num * block_size
if total_size > 0:
pct = min(100.0, downloaded * 100.0 / total_size)
mb_down = downloaded / (1024 * 1024)
mb_total = total_size / (1024 * 1024)
bar_width = 40
filled = int(bar_width * pct / 100.0)
bar = "#" * filled + "-" * (bar_width - filled)
sys.stderr.write(f"\r [{bar}] {pct:5.1f}% {mb_down:.1f}/{mb_total:.1f} MB")
sys.stderr.flush()
else:
mb_down = downloaded / (1024 * 1024)
sys.stderr.write(f"\r Downloaded {mb_down:.1f} MB (unknown total)")
sys.stderr.flush()
def download_flac(output_dir, force=False):
"""Download the FLAC source file with progress reporting.
Returns the path to the downloaded file, or None on failure.
"""
os.makedirs(output_dir, exist_ok=True)
flac_path = os.path.join(output_dir, FLAC_FILENAME)
if os.path.exists(flac_path) and not force:
size_mb = os.path.getsize(flac_path) / (1024 * 1024)
print(f" FLAC already exists: {flac_path} ({size_mb:.1f} MB)")
print(" Use --force to re-download.")
return flac_path
print(f" Downloading: {FLAC_URL}")
print(f" Saving to: {flac_path}")
print()
try:
urllib.request.urlretrieve(FLAC_URL, flac_path, reporthook=_progress_hook)
sys.stderr.write("\n")
sys.stderr.flush()
except (urllib.error.URLError, OSError) as exc:
print(f"\n Download failed: {exc}", file=sys.stderr)
# Clean up partial file
if os.path.exists(flac_path):
os.remove(flac_path)
return None
size_mb = os.path.getsize(flac_path) / (1024 * 1024)
print(f" Downloaded {size_mb:.1f} MB")
return flac_path
def extract_clip(flac_path, clip_name, clip_info, output_dir, force=False):
"""Extract a clip segment from the FLAC source using ffmpeg.
Outputs a 48 kHz mono WAV file.
Returns True on success, False on failure.
"""
out_path = os.path.join(output_dir, f"apollo11_{clip_name}.wav")
if os.path.exists(out_path) and not force:
print(f" [{clip_name}] Already exists: {out_path}")
return True
cmd = [
"ffmpeg",
"-y", # overwrite without asking
"-ss", clip_info["start"], # seek to start
"-t", clip_info["duration"], # extract duration
"-i", flac_path, # input
"-ac", "1", # mono
"-ar", "48000", # 48 kHz
"-sample_fmt", "s16", # 16-bit
out_path,
]
print(f" [{clip_name}] Extracting: {clip_info['description']}")
print(f" start={clip_info['start']} duration={clip_info['duration']}")
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
print(f" [{clip_name}] ffmpeg failed (exit {result.returncode}):", file=sys.stderr)
stderr_text = result.stderr.decode("utf-8", errors="replace")
# Print last few lines of ffmpeg output for diagnostics
for line in stderr_text.strip().splitlines()[-5:]:
print(f" {line}", file=sys.stderr)
return False
size_kb = os.path.getsize(out_path) / 1024
print(f" -> {out_path} ({size_kb:.0f} KB)")
return True
def list_clips():
"""Print available clip names and descriptions."""
print("Available clips:")
print()
max_name = max(len(n) for n in CLIPS)
for name, info in CLIPS.items():
print(f" {name:<{max_name}} {info['start']} ({info['duration']}) {info['description']}")
print()
print(f" {len(CLIPS)} clips defined.")
print(" Extract with: --clip NAME or --all")
def main():
parser = argparse.ArgumentParser(
description="Fetch Apollo 11 audio from Archive.org and extract clips.",
epilog="Clips are saved as 48 kHz mono WAV in examples/audio/.",
)
parser.add_argument(
"--list",
action="store_true",
help="List available clip names and timestamps",
)
parser.add_argument(
"--clip",
metavar="NAME",
help="Extract a specific clip by name",
)
parser.add_argument(
"--all",
action="store_true",
help="Extract all defined clips",
)
parser.add_argument(
"--keep-flac",
action="store_true",
help="Keep the downloaded FLAC file after extraction",
)
parser.add_argument(
"--force",
action="store_true",
help="Re-download and re-extract even if files already exist",
)
parser.add_argument(
"--output-dir",
default=AUDIO_DIR,
help=f"Output directory (default: {AUDIO_DIR})",
)
args = parser.parse_args()
# --list doesn't need ffmpeg
if args.list:
list_clips()
return
# Everything else requires ffmpeg
check_ffmpeg()
# Validate arguments
if not args.clip and not args.all:
parser.print_help()
print()
print("Specify --clip NAME, --all, or --list.")
sys.exit(1)
if args.clip and args.clip not in CLIPS:
print(f"Unknown clip: {args.clip}", file=sys.stderr)
print(f"Available: {', '.join(CLIPS.keys())}", file=sys.stderr)
sys.exit(1)
# Determine which clips to extract
clip_names = list(CLIPS.keys()) if args.all else [args.clip]
print("=" * 60)
print("Apollo 11 Audio Fetch")
print("=" * 60)
print()
# Download the source FLAC
print("Step 1: Download source FLAC")
flac_path = download_flac(args.output_dir, force=args.force)
if flac_path is None:
sys.exit(1)
print()
# Extract clips
print(f"Step 2: Extract {len(clip_names)} clip(s)")
print()
ok_count = 0
fail_count = 0
for name in clip_names:
success = extract_clip(flac_path, name, CLIPS[name], args.output_dir, force=args.force)
if success:
ok_count += 1
else:
fail_count += 1
print()
# Clean up FLAC unless --keep-flac
if not args.keep_flac and os.path.exists(flac_path):
size_mb = os.path.getsize(flac_path) / (1024 * 1024)
os.remove(flac_path)
print(f"Removed source FLAC ({size_mb:.1f} MB). Use --keep-flac to retain.")
elif args.keep_flac and os.path.exists(flac_path):
print(f"Kept source FLAC: {flac_path}")
print()
# Summary
print("=" * 60)
print(f" Extracted: {ok_count} Failed: {fail_count}")
if ok_count > 0:
print(f" Output: {args.output_dir}/apollo11_*.wav")
print("=" * 60)
if fail_count > 0:
sys.exit(1)
if __name__ == "__main__":
main()

204
examples/ranging_demo.py Normal file
View File

@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""
Apollo PRN Ranging Demo -- generate, delay, correlate, measure.
Demonstrates the Apollo ranging system by:
1. Verifying component code properties (lengths, periodicity, balance)
2. Generating the PRN ranging code
3. NRZ encoding to a bipolar waveform
4. Applying a known propagation delay (simulating spacecraft distance)
5. Optionally adding AWGN noise
6. Cross-correlating to recover the delay
7. Comparing measured range to true range
The demo works at chip rate (1 sample per chip) for simplicity and speed.
No GNU Radio runtime is required.
Usage:
uv run python examples/ranging_demo.py
uv run python examples/ranging_demo.py --range-km 100
uv run python examples/ranging_demo.py --range-km 384400 # Moon distance
uv run python examples/ranging_demo.py --snr 20
uv run python examples/ranging_demo.py --chips 200000
"""
import argparse
import time
import numpy as np
from apollo.ranging import (
RANGING_A_LENGTH,
RANGING_B_LENGTH,
RANGING_C_LENGTH,
RANGING_CHIP_RATE_HZ,
RANGING_CL_LENGTH,
RANGING_X_LENGTH,
SPEED_OF_LIGHT_M_S,
RangingCodeGenerator,
RangingCorrelator,
range_m_to_chips,
verify_code_properties,
)
def main():
parser = argparse.ArgumentParser(description="Apollo PRN ranging demo")
parser.add_argument(
"--range-km",
type=float,
default=100.0,
help="Target range in km (default: 100)",
)
parser.add_argument(
"--snr",
type=float,
default=None,
help="SNR in dB (default: no noise)",
)
parser.add_argument(
"--chips",
type=int,
default=50_000,
help="Number of chips for correlation (default: 50000)",
)
args = parser.parse_args()
print("=" * 60)
print("Apollo PRN Ranging Demo")
print("=" * 60)
# ------------------------------------------------------------------
# Step 1: Verify code properties
# ------------------------------------------------------------------
print()
print("1. Component code verification")
print("-" * 40)
props = verify_code_properties()
for name in ("cl", "x", "a", "b", "c"):
p = props[name]
status = "OK" if (p["length_correct"] and p["periodic"]) else "FAIL"
balance = ""
if "balance_correct" in p:
balance = f", balance={'OK' if p['balance_correct'] else 'FAIL'}"
print(
f" {name.upper():>2}: length={p['length']:>3} "
f"(1s={p['ones_count']}, 0s={p['zeros_count']}), "
f"periodic={p['periodic']}{balance} [{status}]"
)
lp = props["length_product"]
print(
f" Code length: {lp['expected']:,} "
f"= {RANGING_CL_LENGTH}*{RANGING_X_LENGTH}*{RANGING_A_LENGTH}"
f"*{RANGING_B_LENGTH}*{RANGING_C_LENGTH} "
f"[{'OK' if lp['matches_constant'] else 'FAIL'}]"
)
cs = props["composite_sample"]
print(f" Composite sample ({cs['length']:,} chips): balance={cs['balance']:.3f} (ideal ~0.5)")
# ------------------------------------------------------------------
# Step 2: Generate PRN code
# ------------------------------------------------------------------
print()
print(f"2. Generating {args.chips:,} PRN chips...")
t0 = time.time()
gen = RangingCodeGenerator()
code = gen.generate_sequence(n_chips=args.chips)
t_gen = time.time() - t0
print(f" Generated in {t_gen * 1000:.1f} ms")
print(
f" Ones: {int(np.sum(code)):,} / {args.chips:,} ({100 * np.sum(code) / args.chips:.1f}%)"
)
# ------------------------------------------------------------------
# Step 3: NRZ encode
# ------------------------------------------------------------------
nrz = code.astype(np.float32) * 2.0 - 1.0
# ------------------------------------------------------------------
# Step 4: Apply delay
# ------------------------------------------------------------------
true_range_m = args.range_km * 1000.0
delay_chips = range_m_to_chips(true_range_m, two_way=True)
delay_samples = int(round(delay_chips)) # At chip rate, 1 sample = 1 chip
# Wrap delay within sequence length
delay_samples = delay_samples % args.chips
print()
print(f"3. Simulating range: {args.range_km:.1f} km")
print(f" Round-trip distance: {true_range_m * 2 / 1000:.1f} km")
print(f" Round-trip time: {2 * true_range_m / SPEED_OF_LIGHT_M_S * 1000:.4f} ms")
print(f" Delay: {delay_chips:.2f} chips ({delay_samples} samples)")
received = np.roll(nrz, delay_samples)
# ------------------------------------------------------------------
# Step 5: Add noise
# ------------------------------------------------------------------
if args.snr is not None:
noise_power = 1.0 / (10.0 ** (args.snr / 10.0))
noise = np.random.default_rng(42).standard_normal(len(received)).astype(np.float32)
noise *= np.sqrt(noise_power)
received = received + noise
print(f" Added AWGN noise at {args.snr:.0f} dB SNR (noise power = {noise_power:.4f})")
# ------------------------------------------------------------------
# Step 6: Correlate
# ------------------------------------------------------------------
print()
print("4. Cross-correlating...")
correlator = RangingCorrelator(
chip_rate=RANGING_CHIP_RATE_HZ,
sample_rate=RANGING_CHIP_RATE_HZ, # 1 sample per chip
two_way=True,
)
t0 = time.time()
result = correlator.correlate(received)
t_corr = time.time() - t0
measured_range_m = result["range_m"]
error_m = abs(measured_range_m - true_range_m)
# The quantization error at chip rate:
# one chip = c / (2 * chip_rate) meters (two-way)
quant_m = SPEED_OF_LIGHT_M_S / (2.0 * RANGING_CHIP_RATE_HZ)
print(f" Correlation time: {t_corr * 1000:.1f} ms")
print(f" Peak-to-average ratio: {result['peak_to_avg_ratio']:.1f}")
# ------------------------------------------------------------------
# Step 7: Results
# ------------------------------------------------------------------
print()
print("5. Range measurement results")
print("-" * 40)
print(f" True range: {args.range_km:>12.1f} km")
print(f" Measured range: {measured_range_m / 1000:>12.1f} km")
print(f" Error: {error_m:>12.1f} m")
print(f" Quantization step: {quant_m:>12.1f} m (1 chip, two-way)")
print(f" Delay (chips): {result['delay_chips']:>12.2f}")
print(f" Delay (samples): {result['delay_samples']:>12d}")
print(f" Correlation peak: {result['correlation_peak']:>12.0f}")
if error_m <= quant_m:
print()
print(" Error is within one quantization step -- measurement is correct.")
elif error_m <= quant_m * 2:
print()
print(" Error is within two quantization steps -- acceptable.")
else:
print()
print(" Error exceeds quantization limit. Try more --chips or higher --snr.")
print()
print("=" * 60)
if __name__ == "__main__":
main()

379
examples/real_signal_demo.py Executable file
View File

@ -0,0 +1,379 @@
#!/usr/bin/env python3
"""
Apollo Real Signal Demo -- process downloaded Apollo recordings through USB.
Auto-discovers WAV files in examples/audio/ (from fetch_apollo_audio.py) and
runs them through the full USB downlink chain: transmit (NRZ + BPSK + voice FM
onto PM carrier) then receive (PCM frame recovery + voice demodulation).
This proves the gr-apollo signal chain works on real-world audio, not just
synthetic test tones.
Signal path (same as full_downlink_demo.py):
TX:
pcm_frame_source -> nrz -> bpsk_mod (1.024 MHz) --+
audio_clip -> fm_voice_mod (1.25 MHz, +/-29kHz) ---+-> add -> pm_mod -> [signal]
RX:
[signal] -> usb_downlink_receiver -> PCM frames
[signal] -> pm_demod -> voice_subcarrier_demod -> recovered audio
Usage:
uv run python examples/real_signal_demo.py
uv run python examples/real_signal_demo.py --clip eagle_has_landed
uv run python examples/real_signal_demo.py --snr 25
uv run python examples/real_signal_demo.py --clip liftoff --play
"""
import argparse
import glob
import os
import sys
import time
from math import gcd
import numpy as np
from gnuradio import blocks, gr
from scipy.io import wavfile
from scipy.signal import resample_poly
from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod
from apollo.constants import (
PCM_HIGH_BIT_RATE,
PCM_HIGH_WORDS_PER_FRAME,
PCM_SUBCARRIER_HZ,
PCM_WORD_LENGTH,
PM_PEAK_DEVIATION_RAD,
SAMPLE_RATE_BASEBAND,
VOICE_FM_DEVIATION_HZ,
VOICE_SUBCARRIER_HZ,
)
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
from apollo.nrz_encoder import nrz_encoder
from apollo.pcm_frame_source import pcm_frame_source
from apollo.pm_demod import pm_demod
from apollo.pm_mod import pm_mod
from apollo.usb_downlink_receiver import usb_downlink_receiver
from apollo.voice_subcarrier_demod import voice_subcarrier_demod
# Audio directory relative to this script
AUDIO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "audio")
# Fallback clip if no downloaded audio exists
FALLBACK_CLIP = os.path.join(AUDIO_DIR, "apollo11_crew.wav")
def discover_clips():
"""Find WAV files in the audio directory.
Returns a dict of {name: path} for all apollo11_*.wav files,
excluding *_recovered.wav and *_fullchain.wav (our own output).
"""
clips = {}
pattern = os.path.join(AUDIO_DIR, "apollo11_*.wav")
for path in sorted(glob.glob(pattern)):
basename = os.path.basename(path)
# Skip output files from previous runs
if basename.endswith("_recovered.wav") or basename.endswith("_fullchain.wav"):
continue
# Extract clip name: apollo11_eagle_has_landed.wav -> eagle_has_landed
name = basename.replace("apollo11_", "").replace(".wav", "")
# Skip the small demo clip unless it's the only option
if name == "crew":
continue
clips[name] = path
return clips
def load_and_upsample_audio(audio_path, sample_rate):
"""Load audio file and upsample to baseband rate."""
input_rate, audio_data = wavfile.read(audio_path)
if audio_data.ndim > 1:
audio_data = audio_data[:, 0]
# Normalize to [-1, 1]
if audio_data.dtype == np.int16:
audio_float = audio_data.astype(np.float32) / 32768.0
elif audio_data.dtype == np.int32:
audio_float = audio_data.astype(np.float32) / 2147483648.0
else:
audio_float = audio_data.astype(np.float32)
duration = len(audio_float) / input_rate
# Resample to 8 kHz first (Apollo voice bandwidth)
audio_rate = 8000
if input_rate != audio_rate:
g = gcd(audio_rate, input_rate)
audio_float = resample_poly(
audio_float, audio_rate // g, input_rate // g
).astype(np.float32)
# Upsample to baseband
g = gcd(sample_rate, audio_rate)
upsampled = resample_poly(
audio_float, sample_rate // g, audio_rate // g
).astype(np.float32)
return upsampled, duration, audio_rate
def build_tx_signal(audio_samples, n_samples, sample_rate, snr_db):
"""Build the combined TX signal: PCM + voice -> PM modulation.
Same manual assembly as full_downlink_demo.py so we can inject
external audio into the voice channel.
"""
tb = gr.top_block()
# --- PCM telemetry path ---
frame_src = pcm_frame_source(bit_rate=PCM_HIGH_BIT_RATE)
nrz = nrz_encoder(bit_rate=PCM_HIGH_BIT_RATE, sample_rate=sample_rate)
bpsk = bpsk_subcarrier_mod(
subcarrier_freq=PCM_SUBCARRIER_HZ,
sample_rate=sample_rate,
)
tb.connect(frame_src, nrz, bpsk)
# --- Voice subcarrier path (real audio) ---
voice_src = blocks.vector_source_f(audio_samples[:n_samples].tolist())
voice_mod = fm_voice_subcarrier_mod(
sample_rate=sample_rate,
subcarrier_freq=VOICE_SUBCARRIER_HZ,
fm_deviation=VOICE_FM_DEVIATION_HZ,
audio_input=True,
)
# Scale voice relative to PCM: 1.68/2.2 per IMPL_SPEC
voice_gain = blocks.multiply_const_ff(1.68 / 2.2)
tb.connect(voice_src, voice_mod, voice_gain)
# --- Sum subcarriers ---
adder = blocks.add_ff(1)
tb.connect(bpsk, (adder, 0))
tb.connect(voice_gain, (adder, 1))
# --- PM modulation ---
pm = pm_mod(pm_deviation=PM_PEAK_DEVIATION_RAD, sample_rate=sample_rate)
head = blocks.head(gr.sizeof_gr_complex, n_samples)
tb.connect(adder, pm, head)
# --- Optional AWGN ---
if snr_db is not None:
import math
noise_power = 1.0 / (10.0 ** (snr_db / 10.0))
noise_amp = math.sqrt(noise_power / 2.0)
noise = blocks.vector_source_c(
(np.random.randn(n_samples) + 1j * np.random.randn(n_samples)).astype(
np.complex64
)
* noise_amp
)
summer = blocks.add_cc(1)
snk = blocks.vector_sink_c()
tb.connect(head, (summer, 0))
tb.connect(noise, (summer, 1))
tb.connect(summer, snk)
else:
snk = blocks.vector_sink_c()
tb.connect(head, snk)
tb.run()
return np.array(snk.data())
def receive_pcm(signal_data, sample_rate):
"""Run the PCM receive chain and return the message debug sink."""
tb = gr.top_block()
src = blocks.vector_source_c(signal_data.tolist())
rx = usb_downlink_receiver(
sample_rate=sample_rate,
bit_rate=PCM_HIGH_BIT_RATE,
output_format="raw",
)
snk = blocks.message_debug()
tb.connect(src, rx)
tb.msg_connect(rx, "frames", snk, "store")
tb.run()
return snk
def receive_voice(signal_data, sample_rate, audio_rate=8000):
"""Run the voice receive chain and return recovered audio samples."""
tb = gr.top_block()
src = blocks.vector_source_c(signal_data.tolist())
pm = pm_demod(sample_rate=sample_rate)
voice = voice_subcarrier_demod(sample_rate=sample_rate, audio_rate=audio_rate)
snk = blocks.vector_sink_f()
tb.connect(src, pm, voice, snk)
tb.run()
return np.array(snk.data(), dtype=np.float32)
def process_clip(clip_name, clip_path, sample_rate, audio_rate, snr_db):
"""Process a single audio clip through the full TX/RX chain.
Returns a dict with stats about the processing.
"""
print(f" Loading: {clip_path}")
audio_upsampled, duration, _ = load_and_upsample_audio(clip_path, sample_rate)
print(f" Duration: {duration:.2f}s, {len(audio_upsampled):,} baseband samples")
# Calculate frame timing
bits_per_frame = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
samples_per_frame = int(bits_per_frame * sample_rate / PCM_HIGH_BIT_RATE)
n_frames = int(duration * 50) + 2 # 50 fps + margin
n_samples = min(len(audio_upsampled), n_frames * samples_per_frame)
snr_desc = f"{snr_db} dB" if snr_db is not None else "clean"
print(f" TX: {n_samples:,} samples, ~{n_frames} PCM frames, SNR={snr_desc}")
# === TRANSMIT ===
t0 = time.time()
signal = build_tx_signal(audio_upsampled, n_samples, sample_rate, snr_db)
t_tx = time.time() - t0
print(f" TX complete: {len(signal):,} complex samples ({t_tx:.1f}s)")
# === RECEIVE: PCM ===
t0 = time.time()
frame_sink = receive_pcm(signal, sample_rate)
t_pcm = time.time() - t0
n_recovered_frames = frame_sink.num_messages()
print(f" RX PCM: {n_recovered_frames} frames recovered ({t_pcm:.1f}s)")
# === RECEIVE: Voice ===
t0 = time.time()
recovered_audio = receive_voice(signal, sample_rate, audio_rate)
t_voice = time.time() - t0
recovered_duration = len(recovered_audio) / audio_rate
print(
f" RX voice: {len(recovered_audio):,} samples,"
f" {recovered_duration:.2f}s ({t_voice:.1f}s)"
)
# Normalize and save recovered audio
output_path = os.path.join(AUDIO_DIR, f"apollo11_{clip_name}_recovered.wav")
peak = np.max(np.abs(recovered_audio))
if peak > 0:
recovered_audio = recovered_audio / peak * 0.9
recovered_int16 = (recovered_audio * 32767).astype(np.int16)
wavfile.write(output_path, audio_rate, recovered_int16)
print(f" Saved: {output_path}")
return {
"clip_name": clip_name,
"input_path": clip_path,
"output_path": output_path,
"input_duration": duration,
"recovered_duration": recovered_duration,
"pcm_frames": n_recovered_frames,
"expected_frames": n_frames,
"snr": snr_desc,
"time_tx": t_tx,
"time_pcm": t_pcm,
"time_voice": t_voice,
}
def main():
parser = argparse.ArgumentParser(
description="Process real Apollo audio through the full USB downlink chain."
)
parser.add_argument(
"--clip",
metavar="NAME",
default=None,
help="Process a specific clip (default: first discovered)",
)
parser.add_argument(
"--snr",
type=float,
default=None,
help="Add AWGN noise at this SNR in dB",
)
parser.add_argument(
"--play",
action="store_true",
help="Play recovered audio with aplay after processing",
)
args = parser.parse_args()
sample_rate = int(SAMPLE_RATE_BASEBAND)
audio_rate = 8000
print("=" * 60)
print("Apollo Real Signal Demo")
print(" Full USB downlink: PCM telemetry + crew voice")
print("=" * 60)
print()
# Discover available clips
clips = discover_clips()
if not clips:
# Fall back to the bundled demo clip
if os.path.exists(FALLBACK_CLIP):
print(" No downloaded clips found. Using bundled demo clip.")
clips = {"crew": FALLBACK_CLIP}
else:
print("No audio files found in examples/audio/.", file=sys.stderr)
print("Run fetch_apollo_audio.py first:", file=sys.stderr)
print(" uv run python examples/fetch_apollo_audio.py --all", file=sys.stderr)
sys.exit(1)
print(f" Found {len(clips)} clip(s): {', '.join(clips.keys())}")
print()
# Select which clip to process
if args.clip:
if args.clip not in clips:
print(f"Clip not found: {args.clip}", file=sys.stderr)
print(f"Available: {', '.join(clips.keys())}", file=sys.stderr)
sys.exit(1)
selected_name = args.clip
else:
selected_name = next(iter(clips))
selected_path = clips[selected_name]
print(f"Processing: {selected_name}")
print("-" * 60)
stats = process_clip(selected_name, selected_path, sample_rate, audio_rate, args.snr)
# === SUMMARY ===
print()
print("=" * 60)
print("Summary")
print("=" * 60)
print(f" Clip: {stats['clip_name']}")
print(f" Input duration: {stats['input_duration']:.2f}s")
print(f" Recovered audio: {stats['recovered_duration']:.2f}s")
pcm_f = stats['pcm_frames']
exp_f = stats['expected_frames']
print(f" PCM frames: {pcm_f} recovered (expected ~{exp_f})")
print(f" SNR: {stats['snr']}")
t_tx = stats['time_tx']
t_pcm = stats['time_pcm']
t_voice = stats['time_voice']
print(
f" Processing time: TX={t_tx:.1f}s"
f" PCM-RX={t_pcm:.1f}s Voice-RX={t_voice:.1f}s"
)
print(f" Output: {stats['output_path']}")
print("=" * 60)
if args.play:
import subprocess
print()
print("Playing recovered audio...")
subprocess.run(["aplay", stats["output_path"]], check=False)
else:
print()
print(f"Play recovered: aplay {stats['output_path']}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,250 @@
#!/usr/bin/env python3
"""
Apollo Uplink Loopback Demo -- encode V16N36E, modulate, demodulate, verify.
Demonstrates the full uplink signal chain using a mix of pure-Python engines
(for bit-level serialization/deserialization) and GNU Radio blocks (for the
RF modulation/demodulation path):
TX (ground station):
UplinkEncoder -> UplinkSerializerEngine -> [bits]
-> GR: nrz_encoder -> FM mod -> 70 kHz upconvert -> PM mod
RX (spacecraft):
GR: PM demod -> 70 kHz extract -> FM demod -> matched filter -> slicer
-> [bits] -> UplinkDeserializerEngine
The pure-Python engines handle word<->bit conversion at the endpoints, while
the GR streaming chain proves the RF modulation path works end-to-end.
Usage:
uv run python examples/uplink_loopback_demo.py
uv run python examples/uplink_loopback_demo.py --snr 20
uv run python examples/uplink_loopback_demo.py --snr 10 --verb 37 --noun 0
"""
import argparse
import math
import sys
import numpy as np
from gnuradio import analog, blocks, digital, filter, gr
from apollo.constants import (
SAMPLE_RATE_BASEBAND,
UPLINK_DATA_SUBCARRIER_HZ,
)
from apollo.nrz_encoder import nrz_encoder
from apollo.pm_demod import pm_demod
from apollo.pm_mod import pm_mod
from apollo.subcarrier_extract import subcarrier_extract
from apollo.uplink_encoder import UplinkEncoder
from apollo.uplink_word_codec import (
UPLINK_WORD_BITS,
UplinkDeserializerEngine,
UplinkSerializerEngine,
)
# Uplink parameters (local definitions)
UPLINK_PM_DEVIATION_RAD = 1.0
UPLINK_DATA_BIT_RATE = 2_000
UPLINK_DATA_FM_DEVIATION_HZ = 4_000
UPLINK_INTER_WORD_GAP = 3
def main():
parser = argparse.ArgumentParser(description="Apollo uplink loopback demo")
parser.add_argument(
"--verb", type=int, default=16, help="Verb number (default: 16)"
)
parser.add_argument(
"--noun", type=int, default=36, help="Noun number (default: 36)"
)
parser.add_argument(
"--snr", type=float, default=None, help="SNR in dB (None = no noise)"
)
args = parser.parse_args()
sample_rate = SAMPLE_RATE_BASEBAND
bit_rate = UPLINK_DATA_BIT_RATE
# --- Encode the command ---
encoder = UplinkEncoder()
tx_pairs = encoder.encode_verb_noun(verb=args.verb, noun=args.noun)
print("=" * 60)
print("Apollo Uplink Loopback Demo")
print("=" * 60)
print(f" Command: V{args.verb:02d}N{args.noun:02d}E")
print(f" Uplink words: {len(tx_pairs)}")
print(f" SNR: {'clean (no noise)' if args.snr is None else f'{args.snr} dB'}")
print()
print("TX word sequence:")
for i, (ch, val) in enumerate(tx_pairs):
print(f" [{i}] ch={ch:03o} val={val:05o} ({val:>5d}) "
f"bits={val:015b}")
print()
# --- Serialize to bits using pure-Python engine ---
serializer = UplinkSerializerEngine(inter_word_gap=UPLINK_INTER_WORD_GAP)
serializer.add_words(tx_pairs)
bits_per_word = UPLINK_WORD_BITS + UPLINK_INTER_WORD_GAP
total_data_bits = len(tx_pairs) * bits_per_word
# Add leading and trailing idle for PLL settling
pll_settle_bits = int(bit_rate * 0.5) # 0.5 seconds of idle
total_bits = pll_settle_bits + total_data_bits + pll_settle_bits
tx_bits = serializer.next_bits(total_bits)
tx_bytes = np.array(tx_bits, dtype=np.byte)
samples_per_bit = int(sample_rate / bit_rate)
n_samples = total_bits * samples_per_bit
print(f" Total bits: {total_bits} ({total_data_bits} data + "
f"{2 * pll_settle_bits} idle)")
print(f" Samples per bit: {samples_per_bit}")
print(f" Total samples: {n_samples:,}")
print(f" Duration: {n_samples / sample_rate:.3f} s")
print()
# --- Build GR flowgraph for the RF path ---
#
# TX: vector_source_b -> nrz -> FM mod -> upconvert 70 kHz -> to_real -> PM mod
# RX: PM demod -> extract 70 kHz -> FM demod -> matched filter
# -> decimate -> slicer -> vector_sink
print("Building flowgraph...")
tb = gr.top_block()
# TX chain
src = blocks.vector_source_b(tx_bytes.tolist(), False)
nrz = nrz_encoder(bit_rate=bit_rate, sample_rate=sample_rate)
fm_sensitivity = 2.0 * math.pi * UPLINK_DATA_FM_DEVIATION_HZ / sample_rate
fm_mod = analog.frequency_modulator_fc(fm_sensitivity)
lo = analog.sig_source_c(
sample_rate, analog.GR_COS_WAVE,
UPLINK_DATA_SUBCARRIER_HZ, 1.0, 0,
)
mixer = blocks.multiply_cc(1)
to_real = blocks.complex_to_real(1)
pm = pm_mod(pm_deviation=UPLINK_PM_DEVIATION_RAD, sample_rate=sample_rate)
# RX chain
pm_rx = pm_demod(carrier_pll_bw=0.02, sample_rate=sample_rate)
sc_extract = subcarrier_extract(
center_freq=UPLINK_DATA_SUBCARRIER_HZ,
bandwidth=20_000,
sample_rate=sample_rate,
)
fm_gain = sample_rate / (2.0 * math.pi * UPLINK_DATA_FM_DEVIATION_HZ)
fm_demod = analog.quadrature_demod_cf(fm_gain)
matched_taps = [1.0 / samples_per_bit] * samples_per_bit
matched = filter.fir_filter_fff(1, matched_taps)
decim = blocks.keep_one_in_n(gr.sizeof_float, samples_per_bit)
slicer = digital.binary_slicer_fb()
snk = blocks.vector_sink_b()
# Optional noise
if args.snr is not None:
noise_power = 1.0 / (10.0 ** (args.snr / 10.0))
noise_amplitude = math.sqrt(noise_power / 2.0)
noise = analog.noise_source_c(analog.GR_GAUSSIAN, noise_amplitude, 0)
add_noise = blocks.add_cc(1)
tb.connect(pm, (add_noise, 0))
tb.connect(noise, (add_noise, 1))
noise_out = add_noise
else:
noise_out = pm
# Wire TX
tb.connect(src, nrz, fm_mod, (mixer, 0))
tb.connect(lo, (mixer, 1))
tb.connect(mixer, to_real, pm)
# Wire RX
tb.connect(noise_out, pm_rx, sc_extract, fm_demod, matched, decim, slicer, snk)
print("Running flowgraph (TX -> RX)...")
tb.run()
print()
# --- Deserialize recovered bits ---
rx_bits = list(snk.data())
print(f"Recovered {len(rx_bits)} bits from slicer")
deserializer = UplinkDeserializerEngine()
rx_pairs = deserializer.process_bits(rx_bits)
print(f"Recovered {len(rx_pairs)} words (expected {len(tx_pairs)})")
print()
if not rx_pairs:
print("No words recovered. PLL may need more settling time or")
print("the subcarrier filter bandwidth may need adjustment.")
sys.exit(1)
# --- Compare TX vs RX ---
print("RX word sequence:")
for i, (ch, val) in enumerate(rx_pairs):
print(f" [{i}] ch={ch:03o} val={val:05o} ({val:>5d}) "
f"bits={val:015b}")
print()
# Match comparison
matches = 0
n_compare = min(len(tx_pairs), len(rx_pairs))
errors = []
for i in range(n_compare):
tx_ch, tx_val = tx_pairs[i]
rx_ch, rx_val = rx_pairs[i]
if tx_val == rx_val:
matches += 1
else:
errors.append((i, tx_val, rx_val))
print("-" * 60)
print(f" Words transmitted: {len(tx_pairs)}")
print(f" Words recovered: {len(rx_pairs)}")
print(f" Matches: {matches}/{n_compare}")
if errors:
print(f" Errors: {len(errors)}")
for idx, tx_v, rx_v in errors:
# Count differing bits
diff = tx_v ^ rx_v
n_bit_err = bin(diff).count("1")
print(f" Word {idx}: TX={tx_v:05o} RX={rx_v:05o} "
f"({n_bit_err} bit errors)")
if n_compare > 0:
wer = 1.0 - (matches / n_compare)
print(f" Word error rate: {wer:.1%}")
print("-" * 60)
if matches == n_compare and len(rx_pairs) == len(tx_pairs):
print()
print(f"V{args.verb:02d}N{args.noun:02d}E round-trip: all {matches} words match.")
elif matches == n_compare:
print()
print(f"All compared words match, but word count differs "
f"({len(rx_pairs)} recovered vs {len(tx_pairs)} sent).")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,64 @@
id: apollo_ranging_demod
label: Apollo Ranging Demodulator
category: '[Apollo USB]'
flags: [python]
parameters:
- id: chip_rate
label: Chip Rate (Hz)
dtype: int
default: '993963'
- id: sample_rate
label: Sample Rate (Hz)
dtype: real
default: '5120000'
- id: correlation_length
label: Correlation Length (samples)
dtype: int
default: '100000'
- id: two_way
label: Two-Way Ranging
dtype: bool
default: 'True'
options: ['True', 'False']
option_labels: ['Two-way (ground-SC-ground)', 'One-way']
inputs:
- label: in
domain: stream
dtype: float
outputs:
- label: range
domain: message
templates:
imports: from apollo.ranging_demod import ranging_demod
make: >-
apollo.ranging_demod.ranging_demod(
chip_rate=${chip_rate},
sample_rate=${sample_rate},
correlation_length=${correlation_length},
two_way=${two_way})
documentation: |-
Apollo Ranging Demodulator
Correlates received signal against known PRN ranging code to measure
spacecraft range. Accumulates samples in batches, performs FFT-based
cross-correlation, and emits range measurement PDUs.
Message output (range):
delay_chips: Measured delay in chip periods
range_m: Computed range in meters
correlation_peak: Peak correlation value
peak_to_avg_ratio: Peak-to-average ratio (signal quality)
timestamp: Measurement timestamp
Parameters:
chip_rate: Expected PRN chip rate (default 993,963 Hz)
sample_rate: Input sample rate (default 5.12 MHz)
correlation_length: Samples per correlation batch (default 100,000)
two_way: If True, divide range by 2 for round-trip measurement
file_format: 1

View File

@ -0,0 +1,46 @@
id: apollo_ranging_mod
label: Apollo Ranging Modulator
category: '[Apollo USB]'
flags: [python]
parameters:
- id: chip_rate
label: Chip Rate (Hz)
dtype: int
default: '993963'
- id: sample_rate
label: Sample Rate (Hz)
dtype: real
default: '5120000'
inputs:
- label: in
domain: stream
dtype: byte
outputs:
- label: out
domain: stream
dtype: float
templates:
imports: from apollo.ranging_mod import ranging_mod
make: apollo.ranging_mod.ranging_mod(chip_rate=${chip_rate}, sample_rate=${sample_rate})
documentation: |-
Apollo Ranging Modulator
NRZ-encodes PRN ranging chips (byte 0/1) to float (+1/-1) at the
specified sample rate. Output is suitable for summing with other
subcarriers before PM carrier modulation.
At the default 5.12 MHz sample rate, each chip occupies ~5 samples.
This is the same NRZ transform as the PCM NRZ Encoder but configured
for the ranging chip rate (~994 kchip/s) instead of the PCM bit rate.
Parameters:
chip_rate: PRN chip rate (default 993,963 Hz)
sample_rate: Output sample rate (default 5.12 MHz)
file_format: 1

View File

@ -0,0 +1,30 @@
id: apollo_ranging_source
label: Apollo Ranging Source
category: '[Apollo USB]'
flags: [python]
parameters: []
outputs:
- label: out
domain: stream
dtype: byte
templates:
imports: from apollo.ranging_source import ranging_source
make: apollo.ranging_source.ranging_source()
documentation: |-
Apollo PRN Ranging Source
Generates a continuous stream of PRN ranging code chips (bytes 0 or 1).
The composite code combines CL, X, A, B, and C component sequences
using majority-vote logic: output = (NOT(X) AND maj(A,B,C)) XOR CL
Code length: 5,456,682 chips (~5.49 seconds at 993,963 chips/sec)
The code repeats cyclically.
The full code period is pre-generated at startup, so the first call
has a brief delay while the sequence is computed.
file_format: 1

View File

@ -0,0 +1,57 @@
id: apollo_usb_uplink_receiver
label: Apollo USB Uplink Receiver
category: '[Apollo USB]'
flags: [python]
parameters:
- id: sample_rate
label: Sample Rate (Hz)
dtype: float
default: '5120000'
- id: bit_rate
label: Uplink Bit Rate
dtype: int
default: '2000'
- id: carrier_pll_bw
label: Carrier PLL Bandwidth
dtype: float
default: '0.02'
- id: subcarrier_bw
label: Subcarrier Bandwidth (Hz)
dtype: float
default: '20000'
inputs:
- label: in
domain: stream
dtype: complex
outputs:
- label: commands
domain: message
templates:
imports: from apollo.usb_uplink_receiver import usb_uplink_receiver
make: >-
apollo.usb_uplink_receiver.usb_uplink_receiver(
sample_rate=${sample_rate},
bit_rate=${bit_rate},
carrier_pll_bw=${carrier_pll_bw},
subcarrier_bw=${subcarrier_bw})
documentation: |-
Apollo USB Uplink Receiver -- spacecraft command receiver.
Demodulates uplink commands from complex baseband:
PM demod -> 70 kHz subcarrier extract -> FM demod -> bit recovery -> word assembly
Message output:
commands -- decoded (channel, value) PDUs for AGC bridge
Parameters:
sample_rate: Input sample rate (default 5.12 MHz)
bit_rate: Expected uplink data rate (default 2000 bps)
carrier_pll_bw: PM carrier recovery loop bandwidth (default 0.02)
subcarrier_bw: 70 kHz subcarrier filter bandwidth (default 20 kHz)
file_format: 1

View File

@ -0,0 +1,60 @@
id: apollo_usb_uplink_source
label: Apollo USB Uplink Source
category: '[Apollo USB]'
flags: [python]
parameters:
- id: sample_rate
label: Sample Rate (Hz)
dtype: float
default: '5120000'
- id: bit_rate
label: Uplink Bit Rate
dtype: int
default: '2000'
- id: pm_deviation
label: PM Deviation (rad)
dtype: float
default: '1.0'
- id: snr_db
label: SNR (dB)
dtype: raw
default: 'None'
inputs:
- label: words
domain: message
optional: true
outputs:
- label: out
domain: stream
dtype: complex
templates:
imports: from apollo.usb_uplink_source import usb_uplink_source
make: >-
apollo.usb_uplink_source.usb_uplink_source(
sample_rate=${sample_rate},
bit_rate=${bit_rate},
pm_deviation=${pm_deviation},
snr_db=${snr_db})
documentation: |-
Apollo USB Uplink Source -- ground station command transmitter.
Generates a PM-modulated complex baseband signal carrying uplink commands
on a 70 kHz FM data subcarrier at 2 kbps NRZ.
This is the transmit-side counterpart to the USB Uplink Receiver.
Message input:
words -- (channel, value) PDUs from uplink_encoder
Parameters:
sample_rate: Output sample rate (default 5.12 MHz)
bit_rate: Uplink data rate (default 2000 bps)
pm_deviation: Peak PM deviation in radians (default 1.0)
snr_db: Add AWGN noise at this SNR (None = no noise)
file_format: 1

View File

@ -18,7 +18,12 @@ from apollo.downlink_decoder import DownlinkEngine as DownlinkEngine
from apollo.pcm_demux import DemuxEngine as DemuxEngine
from apollo.pcm_frame_source import FrameSourceEngine as FrameSourceEngine
from apollo.pcm_frame_sync import FrameSyncEngine as FrameSyncEngine
from apollo.ranging import RangingCodeGenerator as RangingCodeGenerator
from apollo.ranging import chips_to_range_m as chips_to_range_m
from apollo.ranging import range_m_to_chips as range_m_to_chips
from apollo.uplink_encoder import UplinkEncoder as UplinkEncoder
from apollo.uplink_word_codec import UplinkDeserializerEngine as UplinkDeserializerEngine
from apollo.uplink_word_codec import UplinkSerializerEngine as UplinkSerializerEngine
from apollo.usb_signal_gen import generate_usb_baseband as generate_usb_baseband
# GNU Radio receive-side blocks (require gnuradio runtime)
@ -30,6 +35,7 @@ try:
from apollo.pm_demod import pm_demod as pm_demod
from apollo.sco_demod import sco_demod as sco_demod
from apollo.subcarrier_extract import subcarrier_extract as subcarrier_extract
from apollo.uplink_word_codec import uplink_word_deserializer as uplink_word_deserializer
from apollo.voice_subcarrier_demod import voice_subcarrier_demod as voice_subcarrier_demod
except ImportError:
pass # GNU Radio not available — receive-side GR blocks won't be importable
@ -42,7 +48,10 @@ try:
from apollo.nrz_encoder import nrz_encoder as nrz_encoder
from apollo.pcm_frame_source import pcm_frame_source as pcm_frame_source
from apollo.pm_mod import pm_mod as pm_mod
from apollo.ranging_mod import ranging_mod as ranging_mod
from apollo.ranging_source import ranging_source as ranging_source
from apollo.sco_mod import sco_mod as sco_mod
from apollo.uplink_word_codec import uplink_word_serializer as uplink_word_serializer
except ImportError:
pass # GNU Radio not available — transmit-side GR blocks won't be importable
@ -54,8 +63,11 @@ try:
from apollo.fm_signal_source import fm_signal_source as fm_signal_source
from apollo.pcm_demux import pcm_demux as pcm_demux
from apollo.pcm_frame_sync import pcm_frame_sync as pcm_frame_sync
from apollo.ranging_demod import ranging_demod as ranging_demod
from apollo.uplink_encoder import uplink_encoder as uplink_encoder
from apollo.usb_downlink_receiver import usb_downlink_receiver as usb_downlink_receiver
from apollo.usb_signal_source import usb_signal_source as usb_signal_source
from apollo.usb_uplink_receiver import usb_uplink_receiver as usb_uplink_receiver
from apollo.usb_uplink_source import usb_uplink_source as usb_uplink_source
except (ImportError, NameError):
pass

View File

@ -174,3 +174,36 @@ TX_IMPEDANCE_OHM = 50
TWT_LOW_POWER_W = 5
TWT_HIGH_POWER_W = 20
TWT_WARMUP_S = 90
# ---------------------------------------------------------------------------
# Uplink Modulation Parameters (IMPL_SPEC section 2.2)
# ---------------------------------------------------------------------------
UPLINK_PM_DEVIATION_RAD = 1.0 # 1.0 rad peak phase deviation
UPLINK_DATA_BIT_RATE = 2_000 # 2 kbps NRZ
UPLINK_DATA_FM_DEVIATION_HZ = 4_000 # ±4 kHz on 70 kHz subcarrier
UPLINK_VOICE_FM_DEVIATION_HZ = 7_500 # ±7.5 kHz on 30 kHz subcarrier
UPLINK_WORD_BITS = 15 # AGC word width
UPLINK_INTER_WORD_GAP = 3 # bit periods of zeros between words
# ---------------------------------------------------------------------------
# PRN Ranging (Ken Shirriff / NASA docs)
# ---------------------------------------------------------------------------
RANGING_CHIP_RATE_HZ = 993_963 # ~994 kchip/s
RANGING_CODE_LENGTH = 5_456_682 # 2 × 11 × 31 × 63 × 127
RANGING_CL_LENGTH = 2
RANGING_X_LENGTH = 11
RANGING_A_LENGTH = 31
RANGING_B_LENGTH = 63
RANGING_C_LENGTH = 127
RANGING_X_INIT = 22 # 0b10110
RANGING_A_INIT = 0x1F # all ones (5-bit)
RANGING_B_INIT = 0x3F # all ones (6-bit)
RANGING_C_INIT = 0x7F # all ones (7-bit)
# LFSR taps: zero-indexed bit positions for XOR feedback (Shirriff's Teensy code)
RANGING_A_TAPS = (2, 0) # 5-bit: x^5 + x^2 + 1
RANGING_B_TAPS = (1, 0) # 6-bit: x^6 + x + 1
RANGING_C_TAPS = (1, 0) # 7-bit: x^7 + x + 1
SPEED_OF_LIGHT_M_S = 299_792_458

450
src/apollo/ranging.py Normal file
View File

@ -0,0 +1,450 @@
"""
Apollo PRN Ranging Code Generator.
Generates the composite pseudo-random noise code used for spacecraft range
measurement. The code combines 5 component sequences (CL, X, A, B, C) using
majority-vote logic and XOR operations.
Combined code length: 5,456,682 chips (~5.49 seconds at 993,963 chips/sec)
The composite output is produced by Shirriff's algorithm: on even-numbered
output chips (ck=0), all shift registers advance and the output is computed
from the feedback bits via majority logic. On odd-numbered chips (ck=1), the
output is simply flipped (XOR with CL clock).
Combination logic (per even chip):
out = (NOT(xnew) AND maj(anew, bnew, cnew)) XOR ck
where maj = (A&B) | (A&C) | (B&C)
Component LFSR taps (from Shirriff's Teensy rangeGenerator.ino):
A: 5-bit, taps at bit positions 2 and 0 (polynomial x^5+x^2+1)
B: 6-bit, taps at bit positions 1 and 0 (polynomial x^6+x+1)
C: 7-bit, taps at bit positions 1 and 0 (polynomial x^7+x+1)
Reference: Ken Shirriff's analysis of the Apollo ranging system
http://www.righto.com/2022/04/the-digital-ranging-system-that.html
https://github.com/shirriff/apollo-ranging
"""
import numpy as np
from apollo.constants import (
RANGING_A_INIT,
RANGING_A_LENGTH,
RANGING_A_TAPS,
RANGING_B_INIT,
RANGING_B_LENGTH,
RANGING_B_TAPS,
RANGING_C_INIT,
RANGING_C_LENGTH,
RANGING_C_TAPS,
RANGING_CHIP_RATE_HZ,
RANGING_CL_LENGTH,
RANGING_CODE_LENGTH,
RANGING_X_INIT,
RANGING_X_LENGTH,
SPEED_OF_LIGHT_M_S,
)
class RangingCodeGenerator:
"""Generate Apollo PRN ranging code sequences.
Each component code can be generated independently (useful for
sequential correlation in the receiver) or the full composite
sequence can be generated at once.
The component generators produce *feedback bit* sequences, which is
what Shirriff's combination logic uses. These feedback sequences have
the same period and balance properties as the standard LFSR output
sequences.
"""
def generate_cl(self, n_chips: int | None = None) -> np.ndarray:
"""Generate CL component: alternating 0, 1, 0, 1, ...
Args:
n_chips: Number of chips. Default: RANGING_CL_LENGTH (2).
Returns:
uint8 array of 0/1 values.
"""
if n_chips is None:
n_chips = RANGING_CL_LENGTH
return np.array([i % 2 for i in range(n_chips)], dtype=np.uint8)
def generate_x(self, n_chips: int | None = None) -> np.ndarray:
"""Generate X component feedback sequence.
The X code is an 11-chip sequence with non-LFSR feedback.
Register bits: [xg, xf, xf1, xf2, xf3] = [bit4..bit0]
Feedback: xnew = (!xg & !xf2) | (!xf & !xf3) | (!xf1 & xf2 & xf3)
The output is the feedback bit (xnew), not the shift-out bit.
Args:
n_chips: Number of chips. Default: RANGING_X_LENGTH (11).
Returns:
uint8 array of 0/1 values.
"""
if n_chips is None:
n_chips = RANGING_X_LENGTH
state = RANGING_X_INIT # 22 = 0b10110
chips = np.zeros(n_chips, dtype=np.uint8)
for i in range(n_chips):
# Extract individual bits
xg = (state >> 4) & 1 # bit 4 (MSB)
xf = (state >> 3) & 1 # bit 3
xf1 = (state >> 2) & 1 # bit 2
xf2 = (state >> 1) & 1 # bit 1
xf3 = state & 1 # bit 0
# Custom feedback per Shirriff's Teensy rangeGenerator.ino
xnew = ((xg ^ 1) & (xf2 ^ 1)) | ((xf ^ 1) & (xf3 ^ 1)) | ((xf1 ^ 1) & xf2 & xf3)
# Output is the feedback bit
chips[i] = xnew & 1
# Shift right, new bit enters at MSB (bit 4)
state = ((xnew & 1) << 4) | (state >> 1)
return chips
def _generate_lfsr_feedback(
self,
n_bits: int,
taps: tuple[int, int],
init: int,
n_chips: int,
) -> np.ndarray:
"""Generate LFSR feedback bit sequence.
Shift direction: right (MSB in, LSB out). XOR feedback from the
two tap positions. Output is the feedback bit (which becomes the
new MSB after shift), matching Shirriff's combination logic.
Args:
n_bits: Register width.
taps: Pair of zero-indexed bit positions for XOR feedback.
init: Initial register state.
n_chips: Number of output chips.
Returns:
uint8 array of 0/1 values (feedback bits).
"""
state = init
mask = (1 << n_bits) - 1
chips = np.zeros(n_chips, dtype=np.uint8)
for i in range(n_chips):
# XOR feedback from tap positions
fb = ((state >> taps[0]) & 1) ^ ((state >> taps[1]) & 1)
# Output is the feedback bit
chips[i] = fb
# Shift right, feedback enters at MSB
state = ((fb << (n_bits - 1)) | (state >> 1)) & mask
return chips
def generate_a(self, n_chips: int | None = None) -> np.ndarray:
"""Generate A component: 31-chip LFSR, 5 bits, taps [2,0]."""
if n_chips is None:
n_chips = RANGING_A_LENGTH
return self._generate_lfsr_feedback(5, RANGING_A_TAPS, RANGING_A_INIT, n_chips)
def generate_b(self, n_chips: int | None = None) -> np.ndarray:
"""Generate B component: 63-chip LFSR, 6 bits, taps [1,0]."""
if n_chips is None:
n_chips = RANGING_B_LENGTH
return self._generate_lfsr_feedback(6, RANGING_B_TAPS, RANGING_B_INIT, n_chips)
def generate_c(self, n_chips: int | None = None) -> np.ndarray:
"""Generate C component: 127-chip LFSR, 7 bits, taps [1,0]."""
if n_chips is None:
n_chips = RANGING_C_LENGTH
return self._generate_lfsr_feedback(7, RANGING_C_TAPS, RANGING_C_INIT, n_chips)
def generate_component(self, name: str, n_chips: int | None = None) -> np.ndarray:
"""Generate a named component sequence.
Args:
name: One of "cl", "x", "a", "b", "c" (case-insensitive).
n_chips: Number of chips (default: one full period).
Returns:
uint8 array of 0/1 values.
Raises:
ValueError: If name is not a recognized component.
"""
generators = {
"cl": self.generate_cl,
"x": self.generate_x,
"a": self.generate_a,
"b": self.generate_b,
"c": self.generate_c,
}
gen = generators.get(name.lower())
if gen is None:
raise ValueError(f"Unknown component: {name!r}. Valid: {list(generators)}")
return gen(n_chips)
def generate_sequence(self, n_chips: int | None = None) -> np.ndarray:
"""Generate the full composite PRN ranging code.
Reproduces Shirriff's calc() function: on even output chips (ck=0),
all shift registers advance and the output is computed from feedback
bits. On odd chips (ck=1), the output is flipped (XOR with clock).
The component feedback sequences are tiled to fill the requested
length. Each component advances once per 2 output chips (CL period),
so component index = output_chip_index // 2.
Args:
n_chips: Total chips to generate.
Default: RANGING_CODE_LENGTH (one full period).
Returns:
uint8 array of 0/1 values, length n_chips.
"""
if n_chips is None:
n_chips = RANGING_CODE_LENGTH
# Number of register advance steps (one per CL period of 2 chips)
n_steps = (n_chips + 1) // 2
# Generate feedback bit sequences for each component, one per step
x_period = self.generate_x()
a_period = self.generate_a()
b_period = self.generate_b()
c_period = self.generate_c()
def _tile(period: np.ndarray, length: int, total: int) -> np.ndarray:
reps = (total + length - 1) // length
return np.tile(period, reps)[:total]
x_fb = _tile(x_period, RANGING_X_LENGTH, n_steps)
a_fb = _tile(a_period, RANGING_A_LENGTH, n_steps)
b_fb = _tile(b_period, RANGING_B_LENGTH, n_steps)
c_fb = _tile(c_period, RANGING_C_LENGTH, n_steps)
# Majority vote on feedback bits: maj(A,B,C) = (A&B) | (A&C) | (B&C)
maj = (a_fb & b_fb) | (a_fb & c_fb) | (b_fb & c_fb)
# Composite per step: (NOT(xnew) AND maj) -- before CL XOR
base = ((x_fb ^ 1) & maj).astype(np.uint8)
# Expand to output chips: each step produces 2 chips.
# Chip 0 (ck=1 after advance): base XOR 1
# Chip 1 (ck=0 before advance): base XOR 0 = base
# Wait -- re-examine Shirriff's calc():
# On entry with ck=0: advance registers, set ck=1, compute out with XOR ck(=1)
# On entry with ck=1: set ck=0, flip out (out ^= 1)
# So the first chip of each pair uses ck=1 (XOR with 1),
# and the second uses the flip (equivalent to XOR with 0).
chip0 = base ^ 1 # ck=1 at time of computation
chip1 = base # flipped = (base ^ 1) ^ 1 = base
# Interleave: [chip0[0], chip1[0], chip0[1], chip1[1], ...]
output = np.empty(n_steps * 2, dtype=np.uint8)
output[0::2] = chip0
output[1::2] = chip1
return output[:n_chips]
class RangingCorrelator:
"""Cross-correlate received NRZ samples with the known PRN code.
Uses FFT-based cross-correlation for efficiency. Works at any sample
rate -- when sample_rate equals chip_rate, each chip is one sample.
"""
def __init__(
self,
chip_rate: int = RANGING_CHIP_RATE_HZ,
sample_rate: float = RANGING_CHIP_RATE_HZ,
two_way: bool = True,
):
self.chip_rate = chip_rate
self.sample_rate = sample_rate
self.two_way = two_way
self.samples_per_chip = sample_rate / chip_rate
self._gen = RangingCodeGenerator()
def correlate(self, received: np.ndarray, code_chips: int | None = None) -> dict:
"""Cross-correlate received samples with PRN code.
Args:
received: Float samples (NRZ-like, +1/-1 or similar).
code_chips: Number of PRN chips for reference.
Default: enough to cover the received samples.
Returns:
Dict with delay_chips, delay_samples, range_m,
correlation_peak, peak_to_avg_ratio.
"""
if code_chips is None:
code_chips = int(len(received) / self.samples_per_chip) + 1
# Generate reference code (at most one full period)
ref_chips = self._gen.generate_sequence(min(code_chips, RANGING_CODE_LENGTH))
# NRZ encode reference: 0 -> -1, 1 -> +1
ref_nrz = ref_chips.astype(np.float32) * 2.0 - 1.0
# Upsample reference to match sample rate
spc = int(self.samples_per_chip)
ref_samples = np.repeat(ref_nrz, spc) if spc > 1 else ref_nrz
# Truncate to match lengths for correlation
min_len = min(len(received), len(ref_samples))
rx = received[:min_len].astype(np.float32)
ref = ref_samples[:min_len]
# Cross-correlate using FFT
n_fft = 1
while n_fft < 2 * min_len:
n_fft *= 2
rx_fft = np.fft.rfft(rx, n_fft)
ref_fft = np.fft.rfft(ref, n_fft)
xcorr = np.fft.irfft(rx_fft * np.conj(ref_fft), n_fft)
# Find peak in the valid region
peak_idx = int(np.argmax(np.abs(xcorr[:min_len])))
peak_val = float(np.abs(xcorr[peak_idx]))
avg_val = float(np.mean(np.abs(xcorr[:min_len])))
# Convert sample delay to chip delay
delay_chips = peak_idx / self.samples_per_chip
range_m = chips_to_range_m(delay_chips, two_way=self.two_way)
return {
"delay_samples": peak_idx,
"delay_chips": delay_chips,
"range_m": range_m,
"correlation_peak": peak_val,
"peak_to_avg_ratio": peak_val / avg_val if avg_val > 0 else 0.0,
}
# ---------------------------------------------------------------------------
# Utility functions
# ---------------------------------------------------------------------------
def chips_to_range_m(delay_chips: float, two_way: bool = True) -> float:
"""Convert chip delay to range in meters.
Args:
delay_chips: Delay measured in chip periods.
two_way: If True, signal traveled ground -> spacecraft -> ground.
Returns:
Range in meters.
"""
chip_period_s = 1.0 / RANGING_CHIP_RATE_HZ
delay_s = delay_chips * chip_period_s
distance = SPEED_OF_LIGHT_M_S * delay_s
if two_way:
distance /= 2.0
return distance
def range_m_to_chips(range_m: float, two_way: bool = True) -> float:
"""Convert range in meters to chip delay.
Args:
range_m: Distance in meters.
two_way: If True, compute round-trip delay.
Returns:
Delay in chip periods.
"""
distance = range_m * 2.0 if two_way else range_m
delay_s = distance / SPEED_OF_LIGHT_M_S
return delay_s * RANGING_CHIP_RATE_HZ
def verify_code_properties() -> dict:
"""Verify all component codes have correct properties.
Checks length, periodicity, and balance for each component and the
full composite sequence.
Returns:
Dict with component names as keys and property dicts as values.
"""
gen = RangingCodeGenerator()
results = {}
components = {
"cl": {"length": RANGING_CL_LENGTH, "n_bits": None},
"x": {"length": RANGING_X_LENGTH, "n_bits": 5},
"a": {"length": RANGING_A_LENGTH, "n_bits": 5},
"b": {"length": RANGING_B_LENGTH, "n_bits": 6},
"c": {"length": RANGING_C_LENGTH, "n_bits": 7},
}
for name, props in components.items():
code = gen.generate_component(name)
exp_len = props["length"]
result = {
"length": len(code),
"length_correct": len(code) == exp_len,
"ones_count": int(np.sum(code)),
"zeros_count": int(exp_len - np.sum(code)),
}
# Check periodicity: 2x generation should repeat exactly
code_2x = gen.generate_component(name, exp_len * 2)
result["periodic"] = bool(np.array_equal(code_2x[:exp_len], code_2x[exp_len:]))
# For maximal-length LFSRs, verify balance: ones = 2^(n-1), zeros = 2^(n-1)-1
if name in ("a", "b", "c") and props["n_bits"] is not None:
n = props["n_bits"]
expected_ones = 2 ** (n - 1)
expected_zeros = 2 ** (n - 1) - 1
result["balance_correct"] = (
result["ones_count"] == expected_ones and result["zeros_count"] == expected_zeros
)
# CL should be perfectly balanced
if name == "cl":
result["balance_correct"] = result["ones_count"] == 1 and result["zeros_count"] == 1
results[name] = result
# Verify combined length is the product of all component lengths
product = (
RANGING_CL_LENGTH
* RANGING_X_LENGTH
* RANGING_A_LENGTH
* RANGING_B_LENGTH
* RANGING_C_LENGTH
)
results["length_product"] = {
"expected": product,
"matches_constant": product == RANGING_CODE_LENGTH,
}
# Full composite -- generate a shorter segment for speed
# (full 5.4M chips is fine but we verify length math above)
short_len = 10_000
short_seq = gen.generate_sequence(n_chips=short_len)
results["composite_sample"] = {
"length": len(short_seq),
"ones_count": int(np.sum(short_seq)),
"zeros_count": int(short_len - np.sum(short_seq)),
"balance": float(np.sum(short_seq)) / short_len,
}
return results

113
src/apollo/ranging_demod.py Normal file
View File

@ -0,0 +1,113 @@
"""
Apollo Ranging Demodulator -- correlates received signal to measure range.
Takes the PM-demodulated float signal (containing the ranging subcarrier),
cross-correlates with the known PRN code, and outputs range measurements
as PDU messages.
The correlator accumulates a configurable number of samples per batch,
then performs FFT-based cross-correlation against the reference PRN code.
Each correlation batch produces one range measurement PDU.
Reference: Ken Shirriff's Apollo ranging analysis
http://www.righto.com/2022/04/the-digital-ranging-system-that.html
"""
import time
import numpy as np
from apollo.constants import SAMPLE_RATE_BASEBAND
from apollo.ranging import (
RANGING_CHIP_RATE_HZ,
RangingCorrelator,
)
# ---------------------------------------------------------------------------
# GNU Radio block (optional -- only if gnuradio is available)
# ---------------------------------------------------------------------------
try:
import pmt
from gnuradio import gr
class ranging_demod(gr.basic_block):
"""GNU Radio block: ranging demodulator with correlation.
Accumulates samples, runs batch correlation, emits range PDUs.
Input: float (PM demod output or filtered ranging signal)
Output: message PDUs with range measurements (port "range")
"""
def __init__(
self,
chip_rate: int = RANGING_CHIP_RATE_HZ,
sample_rate: float = SAMPLE_RATE_BASEBAND,
correlation_length: int = 100_000,
two_way: bool = True,
):
gr.basic_block.__init__(
self,
name="apollo_ranging_demod",
in_sig=[np.float32],
out_sig=None,
)
self.message_port_register_out(pmt.intern("range"))
self._correlator = RangingCorrelator(
chip_rate=chip_rate,
sample_rate=sample_rate,
two_way=two_way,
)
self._buffer = np.array([], dtype=np.float32)
self._correlation_length = correlation_length
def general_work(self, input_items, output_items):
n = len(input_items[0])
samples = np.array(input_items[0][:n], dtype=np.float32)
self.consume(0, n)
self._buffer = np.concatenate([self._buffer, samples])
while len(self._buffer) >= self._correlation_length:
chunk = self._buffer[: self._correlation_length]
self._buffer = self._buffer[self._correlation_length :]
result = self._correlator.correlate(chunk)
meta = pmt.make_dict()
meta = pmt.dict_add(
meta,
pmt.intern("delay_chips"),
pmt.from_double(result["delay_chips"]),
)
meta = pmt.dict_add(
meta,
pmt.intern("range_m"),
pmt.from_double(result["range_m"]),
)
meta = pmt.dict_add(
meta,
pmt.intern("correlation_peak"),
pmt.from_double(result["correlation_peak"]),
)
meta = pmt.dict_add(
meta,
pmt.intern("peak_to_avg_ratio"),
pmt.from_double(result["peak_to_avg_ratio"]),
)
meta = pmt.dict_add(
meta,
pmt.intern("timestamp"),
pmt.from_double(time.time()),
)
pdu = pmt.cons(meta, pmt.PMT_NIL)
self.message_port_pub(pmt.intern("range"), pdu)
return 0
except ImportError:
pass

63
src/apollo/ranging_mod.py Normal file
View File

@ -0,0 +1,63 @@
"""
Apollo Ranging Modulator -- NRZ-encodes PRN chips for carrier modulation.
Converts the chip stream (bytes 0/1) to a float NRZ waveform (+1/-1)
at the baseband sample rate. This output is suitable for summing with
other subcarriers before PM modulation.
The chip rate is ~994 kchip/s, so at 5.12 MHz sample rate there are
approximately 5.15 samples per chip.
This is essentially the same transform as nrz_encoder but configured
for the ranging chip rate instead of the PCM bit rate.
Reference: IMPLEMENTATION_SPEC.md ranging modulation path
"""
from gnuradio import blocks, gr
from apollo.constants import SAMPLE_RATE_BASEBAND
from apollo.ranging import RANGING_CHIP_RATE_HZ
class ranging_mod(gr.hier_block2):
"""Ranging chip NRZ modulator: byte (0/1) -> float (+1/-1) at sample_rate.
Input: byte stream (chip values 0 or 1)
Output: float NRZ waveform at sample_rate
"""
def __init__(
self,
chip_rate: int = RANGING_CHIP_RATE_HZ,
sample_rate: float = SAMPLE_RATE_BASEBAND,
):
gr.hier_block2.__init__(
self,
"apollo_ranging_mod",
gr.io_signature(1, 1, gr.sizeof_char),
gr.io_signature(1, 1, gr.sizeof_float),
)
self._chip_rate = chip_rate
self._sample_rate = sample_rate
samples_per_chip = int(sample_rate / chip_rate)
# byte (0/1) -> float (0.0/1.0)
self.to_float = blocks.char_to_float(1, 1)
# float (0.0/1.0) -> float (0.0/2.0)
self.scale = blocks.multiply_const_ff(2.0)
# float (0.0/2.0) -> float (-1.0/+1.0)
self.offset = blocks.add_const_ff(-1.0)
# Upsample: repeat each value samples_per_chip times
self.upsample = blocks.repeat(gr.sizeof_float, samples_per_chip)
# Connect chain
self.connect(self, self.to_float, self.scale, self.offset, self.upsample, self)
@property
def samples_per_chip(self) -> int:
return int(self._sample_rate / self._chip_rate)

View File

@ -0,0 +1,60 @@
"""
Apollo Ranging Source -- streams PRN ranging chips.
Outputs a continuous stream of bytes (0 or 1) representing the composite
PRN ranging code. The code repeats every 5,456,682 chips (~5.49 seconds
at the nominal 993,963 chip/s rate).
The full code period is pre-generated and cycled through, so startup cost
is paid once and streaming is zero-allocation.
Reference: Ken Shirriff's Apollo ranging analysis
http://www.righto.com/2022/04/the-digital-ranging-system-that.html
"""
import numpy as np
from apollo.ranging import RANGING_CODE_LENGTH, RangingCodeGenerator
# ---------------------------------------------------------------------------
# GNU Radio block (optional -- only if gnuradio is available)
# ---------------------------------------------------------------------------
try:
from gnuradio import gr
class ranging_source(gr.sync_block):
"""GNU Radio source: continuous PRN ranging chip stream.
Outputs bytes (0 or 1) at the chip rate. Pre-generates the full
code period and cycles through it.
"""
def __init__(self):
gr.sync_block.__init__(
self,
name="apollo_ranging_source",
in_sig=None,
out_sig=[np.byte],
)
self._gen = RangingCodeGenerator()
self._code = self._gen.generate_sequence()
self._pos = 0
def work(self, input_items, output_items):
out = output_items[0]
n = len(out)
produced = 0
while produced < n:
remaining = RANGING_CODE_LENGTH - self._pos
chunk = min(n - produced, remaining)
out[produced : produced + chunk] = self._code[self._pos : self._pos + chunk]
self._pos = (self._pos + chunk) % RANGING_CODE_LENGTH
produced += chunk
return produced
except ImportError:
pass

View File

@ -0,0 +1,332 @@
"""
Apollo Uplink Word Codec -- serializes and deserializes 15-bit AGC words for RF transport.
The uplink carries commands as 15-bit words at 2 kbps NRZ on a 70 kHz FM subcarrier.
Each word triggers an UPRUPT interrupt in the AGC flight software.
Serializer: (channel, value) pairs -> NRZ bit stream (0/1 bytes)
Deserializer: NRZ bit stream -> (channel, value) pairs
The serializer inserts a configurable inter-word gap (default 3 bit periods of zeros)
to allow the AGC time to service the UPRUPT between consecutive words.
Reference: IMPLEMENTATION_SPEC.md section 1 (INLINK / UPRUPT)
"""
import logging
from collections import deque
import numpy as np
from apollo.constants import (
AGC_CH_INLINK,
UPLINK_DATA_BIT_RATE,
UPLINK_INTER_WORD_GAP,
UPLINK_WORD_BITS,
)
logger = logging.getLogger(__name__)
# Minimum consecutive zeros to consider the channel idle (for deserializer sync)
IDLE_THRESHOLD = UPLINK_INTER_WORD_GAP + 2
class UplinkSerializerEngine:
"""Serializes (channel, value) pairs into a continuous NRZ bit stream.
Queues incoming words and produces bits on demand. Between words, inserts
a gap of zeros (default 3 bits) representing idle time for UPRUPT servicing.
When the queue is empty, outputs continuous zeros (carrier idle).
Args:
inter_word_gap: Number of zero-bit periods between consecutive words.
"""
def __init__(self, inter_word_gap: int = UPLINK_INTER_WORD_GAP):
self._gap = inter_word_gap
self._bit_queue: deque[int] = deque()
def add_words(self, pairs: list[tuple[int, int]]):
"""Queue (channel, value) pairs for serialization.
Each value is serialized as 15 bits MSB-first, followed by inter-word
gap zeros. The channel is not transmitted (the AGC always receives on
channel 045); it is used only for metadata/logging.
Args:
pairs: List of (channel, value) tuples from UplinkEncoder.
"""
for _channel, value in pairs:
# Serialize 15 bits MSB-first
for bit_pos in range(UPLINK_WORD_BITS - 1, -1, -1):
self._bit_queue.append((value >> bit_pos) & 1)
# Inter-word gap
for _ in range(self._gap):
self._bit_queue.append(0)
def next_bits(self, n: int) -> list[int]:
"""Pull up to n bits from the queue.
Returns queued data bits when available, zeros otherwise (idle carrier).
Args:
n: Maximum number of bits to return.
Returns:
List of 0/1 values, always exactly n elements long.
"""
result = []
for _ in range(n):
if self._bit_queue:
result.append(self._bit_queue.popleft())
else:
result.append(0)
return result
@property
def pending(self) -> int:
"""Number of bits remaining in the queue."""
return len(self._bit_queue)
class UplinkDeserializerEngine:
"""Reassembles 15-bit AGC words from a recovered NRZ bit stream.
Uses a two-phase state machine:
1. **Acquisition**: Scans for the first non-zero bit, which marks the start
of the first word (all DSKY keycodes have at least one set bit in the
upper 5 bits, so the first transmitted word always starts with a 1).
2. **Locked**: Once the first word boundary is found, uses fixed framing --
collects exactly 15 bits per word, then skips exactly `inter_word_gap`
bits, then collects the next 15 bits, etc. This is necessary because
data words can start with leading zeros that would be indistinguishable
from the inter-word gap.
The lock is released after seeing more than `gap + word_bits` consecutive
zeros (indicating the transmitter has gone idle).
The recovered words are emitted as (channel, value) pairs where channel is
always AGC_CH_INLINK (045 octal = 37 decimal).
Args:
inter_word_gap: Expected number of zero bits between words.
channel: AGC channel to assign to recovered words.
"""
# State constants
_ACQUIRING = 0
_IN_WORD = 1
_IN_GAP = 2
def __init__(
self,
inter_word_gap: int = UPLINK_INTER_WORD_GAP,
channel: int = AGC_CH_INLINK,
):
self._gap = inter_word_gap
self._channel = channel
self._bit_buffer: list[int] = []
self._gap_count = 0
self._idle_count = 0
self._state = self._ACQUIRING
def process_bits(self, bits: list[int]) -> list[tuple[int, int]]:
"""Process a batch of recovered bits and return any completed words.
Args:
bits: List of 0/1 values from the slicer output.
Returns:
List of (channel, value) tuples for each completed word.
"""
results: list[tuple[int, int]] = []
for bit in bits:
if self._state == self._ACQUIRING:
# Scanning for first non-zero bit (start of first word)
if bit == 0:
self._idle_count += 1
else:
self._state = self._IN_WORD
self._bit_buffer = [bit]
self._idle_count = 0
elif self._state == self._IN_WORD:
# Collecting word bits (fixed 15-bit frame)
self._bit_buffer.append(bit)
# Track consecutive zeros for idle detection
if bit == 0:
self._idle_count += 1
else:
self._idle_count = 0
if len(self._bit_buffer) == UPLINK_WORD_BITS:
# Assemble 15-bit value MSB-first
value = 0
for b in self._bit_buffer:
value = (value << 1) | b
if value == 0:
# Null word (all zeros) means the transmitter has
# gone idle -- not a valid command. Drop it and
# return to acquisition.
self._state = self._ACQUIRING
self._bit_buffer = []
else:
results.append((self._channel, value))
self._bit_buffer = []
if self._gap > 0:
self._state = self._IN_GAP
self._gap_count = 0
else:
self._state = self._IN_WORD
elif self._state == self._IN_GAP:
# Skipping inter-word gap bits
self._gap_count += 1
if self._gap_count >= self._gap:
# Gap complete -- start collecting next word
self._state = self._IN_WORD
self._bit_buffer = []
return results
def reset(self):
"""Clear internal state for a fresh decode pass."""
self._bit_buffer = []
self._gap_count = 0
self._idle_count = 0
self._state = self._ACQUIRING
# ---------------------------------------------------------------------------
# GNU Radio block wrappers (optional -- only if gnuradio is available)
# ---------------------------------------------------------------------------
try:
import pmt
from gnuradio import gr
class uplink_word_serializer(gr.sync_block):
"""GNU Radio source block: serializes uplink word PDUs into a NRZ bit stream.
Accepts (channel, value) PDUs on the ``words`` message input (same
format emitted by uplink_encoder) and outputs a continuous stream of
bytes (values 0 or 1) carrying the serialized data.
When no words are queued, outputs zeros (idle carrier).
Message input:
words -- PDU with metadata dict containing "channel" and "value" keys,
or a pair (cons) of (channel . value).
Output:
byte stream -- 0/1 values at the uplink bit rate
"""
def __init__(self, inter_word_gap: int = UPLINK_INTER_WORD_GAP):
gr.sync_block.__init__(
self,
name="apollo_uplink_word_serializer",
in_sig=None,
out_sig=[np.byte],
)
self._engine = UplinkSerializerEngine(inter_word_gap=inter_word_gap)
# Message input for word injection
self.message_port_register_in(pmt.intern("words"))
self.set_msg_handler(pmt.intern("words"), self._handle_words)
def _handle_words(self, msg):
"""Parse incoming word PDU and queue for serialization."""
if not pmt.is_pair(msg):
return
meta = pmt.car(msg)
data = pmt.cdr(msg)
# Try metadata dict first (preferred format from uplink_encoder)
if pmt.is_dict(meta):
ch_pmt = pmt.dict_ref(meta, pmt.intern("channel"), pmt.PMT_NIL)
val_pmt = pmt.dict_ref(meta, pmt.intern("value"), pmt.PMT_NIL)
if not pmt.is_null(ch_pmt) and not pmt.is_null(val_pmt):
channel = pmt.to_long(ch_pmt)
value = pmt.to_long(val_pmt)
self._engine.add_words([(channel, value)])
return
# Fallback: data is a pair (channel . value)
if pmt.is_pair(data):
channel = pmt.to_long(pmt.car(data))
value = pmt.to_long(pmt.cdr(data))
self._engine.add_words([(channel, value)])
def work(self, input_items, output_items):
out = output_items[0]
n_out = len(out)
bits = self._engine.next_bits(n_out)
for i in range(n_out):
out[i] = bits[i]
return n_out
class uplink_word_deserializer(gr.basic_block):
"""GNU Radio block: reassembles 15-bit uplink words from a recovered bit stream.
Consumes a stream of bytes (0/1 from binary slicer) and emits PDU
messages for each recovered word.
Input:
byte stream -- 0/1 values from the slicer
Message output:
commands -- PDU with metadata dict {"channel": int, "value": int}
"""
def __init__(
self,
inter_word_gap: int = UPLINK_INTER_WORD_GAP,
channel: int = AGC_CH_INLINK,
):
gr.basic_block.__init__(
self,
name="apollo_uplink_word_deserializer",
in_sig=[np.byte],
out_sig=[],
)
self._engine = UplinkDeserializerEngine(
inter_word_gap=inter_word_gap,
channel=channel,
)
self.message_port_register_out(pmt.intern("commands"))
def general_work(self, input_items, output_items):
n_input = len(input_items[0])
bits = [int(input_items[0][i]) for i in range(n_input)]
self.consume(0, n_input)
pairs = self._engine.process_bits(bits)
for channel, value in pairs:
meta = pmt.make_dict()
meta = pmt.dict_add(
meta, pmt.intern("channel"), pmt.from_long(channel)
)
meta = pmt.dict_add(
meta, pmt.intern("value"), pmt.from_long(value)
)
self.message_port_pub(
pmt.intern("commands"),
pmt.cons(meta, pmt.PMT_NIL),
)
return 0
except ImportError:
pass

View File

@ -0,0 +1,102 @@
"""
Apollo USB Uplink Receiver -- spacecraft command receiver.
The receive-side counterpart to usb_uplink_source. Demodulates uplink
commands from complex baseband:
complex in -> pm_demod -> subcarrier_extract (70 kHz)
-> quadrature_demod (FM) -> matched filter -> decimate -> slicer
-> uplink_word_deserializer -> message output
Recovers 15-bit AGC words originally serialized at 2 kbps NRZ on a 70 kHz
FM data subcarrier, phase-modulated onto the uplink carrier.
For finer control, use the individual blocks directly.
Reference: IMPLEMENTATION_SPEC.md -- uplink receive path (section 2.2)
"""
from gnuradio import analog, blocks, digital, filter, gr
from apollo.constants import (
SAMPLE_RATE_BASEBAND,
UPLINK_DATA_BIT_RATE,
UPLINK_DATA_FM_DEVIATION_HZ,
UPLINK_DATA_SUBCARRIER_HZ,
)
from apollo.pm_demod import pm_demod
from apollo.subcarrier_extract import subcarrier_extract
from apollo.uplink_word_codec import uplink_word_deserializer
class usb_uplink_receiver(gr.hier_block2):
"""Apollo USB uplink receiver -- complex baseband to command PDUs.
Inputs:
complex -- baseband IQ samples at sample_rate (default 5.12 MHz)
Message outputs (no streaming output):
commands -- decoded (channel, value) PDUs for AGC bridge
The block chains: PM demod -> 70 kHz subcarrier extract -> FM demod ->
matched filter -> decimate -> binary slicer -> word deserializer.
"""
def __init__(
self,
sample_rate: float = SAMPLE_RATE_BASEBAND,
bit_rate: int = UPLINK_DATA_BIT_RATE,
carrier_pll_bw: float = 0.02,
subcarrier_bw: float = 20_000,
):
gr.hier_block2.__init__(
self,
"apollo_usb_uplink_receiver",
gr.io_signature(1, 1, gr.sizeof_gr_complex),
gr.io_signature(0, 0, 0), # message-only output
)
# Register message output port
self.message_port_register_hier_out("commands")
# Stage 1: PM demodulator -- carrier PLL + phase extraction
self.pm = pm_demod(
carrier_pll_bw=carrier_pll_bw,
sample_rate=sample_rate,
)
# Stage 2: Subcarrier extractor -- bandpass + downconvert 70 kHz
self.sc_extract = subcarrier_extract(
center_freq=UPLINK_DATA_SUBCARRIER_HZ,
bandwidth=subcarrier_bw,
sample_rate=sample_rate,
)
# Stage 3: FM discriminator
# Gain normalizes the FM deviation to unity amplitude
fm_gain = sample_rate / (2.0 * 3.141592653589793 * UPLINK_DATA_FM_DEVIATION_HZ)
self.fm_demod = analog.quadrature_demod_cf(fm_gain)
# Stage 4: Matched filter + decimation for bit recovery
# Average over one bit period, then keep one sample per bit
samples_per_bit = int(sample_rate / bit_rate)
matched_taps = [1.0 / samples_per_bit] * samples_per_bit
self.matched_filter = filter.fir_filter_fff(1, matched_taps)
self.decimator = blocks.keep_one_in_n(gr.sizeof_float, samples_per_bit)
# Stage 5: Binary slicer -- hard decision (> 0 -> 1, <= 0 -> 0)
self.slicer = digital.binary_slicer_fb()
# Stage 6: Word deserializer -- reassemble 15-bit words from bits
self.deser = uplink_word_deserializer()
# Connect streaming chain:
# complex in -> PM demod -> subcarrier extract -> FM demod
# -> matched filter -> decimate -> slicer -> deserializer
self.connect(
self, self.pm, self.sc_extract, self.fm_demod,
self.matched_filter, self.decimator, self.slicer, self.deser,
)
# Connect message port: deserializer -> hier output
self.msg_connect(self.deser, "commands", self, "commands")

View File

@ -0,0 +1,124 @@
"""
Apollo USB Uplink Source -- ground station command transmitter.
The transmit-side counterpart to usb_uplink_receiver. Wires together the
full uplink modulation chain:
uplink_word_serializer -> nrz_encoder -> FM mod -> 70 kHz upconvert
-> complex_to_real -> pm_mod -> [optional AWGN] -> complex out
The ground station transmits commands on a 70 kHz FM data subcarrier at
2 kbps NRZ, phase-modulated onto the 2106.40625 MHz uplink carrier at
1.0 rad peak deviation.
For finer control, use the individual blocks directly.
Reference: IMPLEMENTATION_SPEC.md -- uplink transmit path (section 2.2)
"""
import math
from gnuradio import analog, blocks, gr
from apollo.constants import (
SAMPLE_RATE_BASEBAND,
UPLINK_DATA_BIT_RATE,
UPLINK_DATA_FM_DEVIATION_HZ,
UPLINK_DATA_SUBCARRIER_HZ,
UPLINK_PM_DEVIATION_RAD,
)
from apollo.nrz_encoder import nrz_encoder
from apollo.pm_mod import pm_mod
from apollo.uplink_word_codec import uplink_word_serializer
class usb_uplink_source(gr.hier_block2):
"""Apollo USB uplink signal source -- complex baseband output.
Outputs:
complex -- PM-modulated baseband at sample_rate (default 5.12 MHz)
Message inputs:
words -- forwarded to uplink_word_serializer for command injection.
Accepts the same PDU format emitted by uplink_encoder.
The block serializes 15-bit AGC words into NRZ bits, FM-modulates them
onto a 70 kHz subcarrier, and applies PM modulation to produce complex
baseband suitable for transmission or loopback testing.
Optional AWGN noise can be added by setting snr_db to a finite value.
"""
def __init__(
self,
sample_rate: float = SAMPLE_RATE_BASEBAND,
bit_rate: int = UPLINK_DATA_BIT_RATE,
pm_deviation: float = UPLINK_PM_DEVIATION_RAD,
snr_db: float | None = None,
):
gr.hier_block2.__init__(
self,
"apollo_usb_uplink_source",
gr.io_signature(0, 0, 0), # source -- no input
gr.io_signature(1, 1, gr.sizeof_gr_complex),
)
self._sample_rate = sample_rate
# Forward the words message port from uplink_word_serializer
self.message_port_register_hier_in("words")
# --- Uplink data path ---
# Stage 1: Serialize 15-bit words into 0/1 byte stream
self.word_ser = uplink_word_serializer()
# Forward message port: hier input -> serializer
self.msg_connect(self, "words", self.word_ser, "words")
# Stage 2: NRZ encode (byte 0/1 -> float +1/-1 at sample_rate)
self.nrz = nrz_encoder(bit_rate=bit_rate, sample_rate=sample_rate)
# Stage 3: FM modulate onto 70 kHz subcarrier
# Sensitivity converts amplitude to instantaneous frequency deviation
fm_sensitivity = 2.0 * math.pi * UPLINK_DATA_FM_DEVIATION_HZ / sample_rate
self.fm_mod = analog.frequency_modulator_fc(fm_sensitivity)
# Stage 4: Upconvert to 70 kHz -- multiply by exp(j*2*pi*70000*t)
self.lo = analog.sig_source_c(
sample_rate, analog.GR_COS_WAVE,
UPLINK_DATA_SUBCARRIER_HZ, 1.0, 0,
)
self.mixer = blocks.multiply_cc(1)
# Stage 5: Convert complex subcarrier to real (PM modulator expects float)
self.to_real = blocks.complex_to_real(1)
# Stage 6: PM modulation onto carrier
self.pm = pm_mod(pm_deviation=pm_deviation, sample_rate=sample_rate)
# Connect data chain:
# word_ser -> nrz -> fm_mod -> (mixer, 0)
# lo -> (mixer, 1)
# mixer -> to_real -> pm
self.connect(self.word_ser, self.nrz, self.fm_mod, (self.mixer, 0))
self.connect(self.lo, (self.mixer, 1))
self.connect(self.mixer, self.to_real, self.pm)
# --- Optional AWGN ---
if snr_db is not None:
# Signal power is 1.0 (PM constant envelope)
noise_power = 1.0 / (10.0 ** (snr_db / 10.0))
noise_amplitude = math.sqrt(noise_power / 2.0)
self.noise = analog.noise_source_c(
analog.GR_GAUSSIAN, noise_amplitude, 0,
)
self.sum_noise = blocks.add_cc(1)
self.connect(self.pm, (self.sum_noise, 0))
self.connect(self.noise, (self.sum_noise, 1))
self.connect(self.sum_noise, self)
else:
self.connect(self.pm, self)