diff --git a/examples/ranging_demo.py b/examples/ranging_demo.py new file mode 100644 index 0000000..4e3b62a --- /dev/null +++ b/examples/ranging_demo.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python3 +""" +Apollo PRN Ranging Demo -- generate, delay, correlate, measure. + +Demonstrates the Apollo ranging system by: +1. Verifying component code properties (lengths, periodicity, balance) +2. Generating the PRN ranging code +3. NRZ encoding to a bipolar waveform +4. Applying a known propagation delay (simulating spacecraft distance) +5. Optionally adding AWGN noise +6. Cross-correlating to recover the delay +7. Comparing measured range to true range + +The demo works at chip rate (1 sample per chip) for simplicity and speed. +No GNU Radio runtime is required. + +Usage: + uv run python examples/ranging_demo.py + uv run python examples/ranging_demo.py --range-km 100 + uv run python examples/ranging_demo.py --range-km 384400 # Moon distance + uv run python examples/ranging_demo.py --snr 20 + uv run python examples/ranging_demo.py --chips 200000 +""" + +import argparse +import time + +import numpy as np + +from apollo.ranging import ( + RANGING_A_LENGTH, + RANGING_B_LENGTH, + RANGING_C_LENGTH, + RANGING_CHIP_RATE_HZ, + RANGING_CL_LENGTH, + RANGING_X_LENGTH, + SPEED_OF_LIGHT_M_S, + RangingCodeGenerator, + RangingCorrelator, + range_m_to_chips, + verify_code_properties, +) + + +def main(): + parser = argparse.ArgumentParser(description="Apollo PRN ranging demo") + parser.add_argument( + "--range-km", + type=float, + default=100.0, + help="Target range in km (default: 100)", + ) + parser.add_argument( + "--snr", + type=float, + default=None, + help="SNR in dB (default: no noise)", + ) + parser.add_argument( + "--chips", + type=int, + default=50_000, + help="Number of chips for correlation (default: 50000)", + ) + args = parser.parse_args() + + print("=" * 60) + print("Apollo PRN Ranging Demo") + print("=" * 60) + + # ------------------------------------------------------------------ + # Step 1: Verify code properties + # ------------------------------------------------------------------ + print() + print("1. Component code verification") + print("-" * 40) + + props = verify_code_properties() + for name in ("cl", "x", "a", "b", "c"): + p = props[name] + status = "OK" if (p["length_correct"] and p["periodic"]) else "FAIL" + balance = "" + if "balance_correct" in p: + balance = f", balance={'OK' if p['balance_correct'] else 'FAIL'}" + print( + f" {name.upper():>2}: length={p['length']:>3} " + f"(1s={p['ones_count']}, 0s={p['zeros_count']}), " + f"periodic={p['periodic']}{balance} [{status}]" + ) + + lp = props["length_product"] + print( + f" Code length: {lp['expected']:,} " + f"= {RANGING_CL_LENGTH}*{RANGING_X_LENGTH}*{RANGING_A_LENGTH}" + f"*{RANGING_B_LENGTH}*{RANGING_C_LENGTH} " + f"[{'OK' if lp['matches_constant'] else 'FAIL'}]" + ) + + cs = props["composite_sample"] + print(f" Composite sample ({cs['length']:,} chips): balance={cs['balance']:.3f} (ideal ~0.5)") + + # ------------------------------------------------------------------ + # Step 2: Generate PRN code + # ------------------------------------------------------------------ + print() + print(f"2. Generating {args.chips:,} PRN chips...") + t0 = time.time() + gen = RangingCodeGenerator() + code = gen.generate_sequence(n_chips=args.chips) + t_gen = time.time() - t0 + print(f" Generated in {t_gen * 1000:.1f} ms") + print( + f" Ones: {int(np.sum(code)):,} / {args.chips:,} ({100 * np.sum(code) / args.chips:.1f}%)" + ) + + # ------------------------------------------------------------------ + # Step 3: NRZ encode + # ------------------------------------------------------------------ + nrz = code.astype(np.float32) * 2.0 - 1.0 + + # ------------------------------------------------------------------ + # Step 4: Apply delay + # ------------------------------------------------------------------ + true_range_m = args.range_km * 1000.0 + delay_chips = range_m_to_chips(true_range_m, two_way=True) + delay_samples = int(round(delay_chips)) # At chip rate, 1 sample = 1 chip + + # Wrap delay within sequence length + delay_samples = delay_samples % args.chips + + print() + print(f"3. Simulating range: {args.range_km:.1f} km") + print(f" Round-trip distance: {true_range_m * 2 / 1000:.1f} km") + print(f" Round-trip time: {2 * true_range_m / SPEED_OF_LIGHT_M_S * 1000:.4f} ms") + print(f" Delay: {delay_chips:.2f} chips ({delay_samples} samples)") + + received = np.roll(nrz, delay_samples) + + # ------------------------------------------------------------------ + # Step 5: Add noise + # ------------------------------------------------------------------ + if args.snr is not None: + noise_power = 1.0 / (10.0 ** (args.snr / 10.0)) + noise = np.random.default_rng(42).standard_normal(len(received)).astype(np.float32) + noise *= np.sqrt(noise_power) + received = received + noise + print(f" Added AWGN noise at {args.snr:.0f} dB SNR (noise power = {noise_power:.4f})") + + # ------------------------------------------------------------------ + # Step 6: Correlate + # ------------------------------------------------------------------ + print() + print("4. Cross-correlating...") + + correlator = RangingCorrelator( + chip_rate=RANGING_CHIP_RATE_HZ, + sample_rate=RANGING_CHIP_RATE_HZ, # 1 sample per chip + two_way=True, + ) + + t0 = time.time() + result = correlator.correlate(received) + t_corr = time.time() - t0 + + measured_range_m = result["range_m"] + error_m = abs(measured_range_m - true_range_m) + + # The quantization error at chip rate: + # one chip = c / (2 * chip_rate) meters (two-way) + quant_m = SPEED_OF_LIGHT_M_S / (2.0 * RANGING_CHIP_RATE_HZ) + + print(f" Correlation time: {t_corr * 1000:.1f} ms") + print(f" Peak-to-average ratio: {result['peak_to_avg_ratio']:.1f}") + + # ------------------------------------------------------------------ + # Step 7: Results + # ------------------------------------------------------------------ + print() + print("5. Range measurement results") + print("-" * 40) + print(f" True range: {args.range_km:>12.1f} km") + print(f" Measured range: {measured_range_m / 1000:>12.1f} km") + print(f" Error: {error_m:>12.1f} m") + print(f" Quantization step: {quant_m:>12.1f} m (1 chip, two-way)") + print(f" Delay (chips): {result['delay_chips']:>12.2f}") + print(f" Delay (samples): {result['delay_samples']:>12d}") + print(f" Correlation peak: {result['correlation_peak']:>12.0f}") + + if error_m <= quant_m: + print() + print(" Error is within one quantization step -- measurement is correct.") + elif error_m <= quant_m * 2: + print() + print(" Error is within two quantization steps -- acceptable.") + else: + print() + print(" Error exceeds quantization limit. Try more --chips or higher --snr.") + + print() + print("=" * 60) + + +if __name__ == "__main__": + main() diff --git a/grc/apollo_ranging_demod.block.yml b/grc/apollo_ranging_demod.block.yml new file mode 100644 index 0000000..16ae16d --- /dev/null +++ b/grc/apollo_ranging_demod.block.yml @@ -0,0 +1,64 @@ +id: apollo_ranging_demod +label: Apollo Ranging Demodulator +category: '[Apollo USB]' +flags: [python] + +parameters: +- id: chip_rate + label: Chip Rate (Hz) + dtype: int + default: '993963' +- id: sample_rate + label: Sample Rate (Hz) + dtype: real + default: '5120000' +- id: correlation_length + label: Correlation Length (samples) + dtype: int + default: '100000' +- id: two_way + label: Two-Way Ranging + dtype: bool + default: 'True' + options: ['True', 'False'] + option_labels: ['Two-way (ground-SC-ground)', 'One-way'] + +inputs: +- label: in + domain: stream + dtype: float + +outputs: +- label: range + domain: message + +templates: + imports: from apollo.ranging_demod import ranging_demod + make: >- + apollo.ranging_demod.ranging_demod( + chip_rate=${chip_rate}, + sample_rate=${sample_rate}, + correlation_length=${correlation_length}, + two_way=${two_way}) + +documentation: |- + Apollo Ranging Demodulator + + Correlates received signal against known PRN ranging code to measure + spacecraft range. Accumulates samples in batches, performs FFT-based + cross-correlation, and emits range measurement PDUs. + + Message output (range): + delay_chips: Measured delay in chip periods + range_m: Computed range in meters + correlation_peak: Peak correlation value + peak_to_avg_ratio: Peak-to-average ratio (signal quality) + timestamp: Measurement timestamp + + Parameters: + chip_rate: Expected PRN chip rate (default 993,963 Hz) + sample_rate: Input sample rate (default 5.12 MHz) + correlation_length: Samples per correlation batch (default 100,000) + two_way: If True, divide range by 2 for round-trip measurement + +file_format: 1 diff --git a/grc/apollo_ranging_mod.block.yml b/grc/apollo_ranging_mod.block.yml new file mode 100644 index 0000000..b88581d --- /dev/null +++ b/grc/apollo_ranging_mod.block.yml @@ -0,0 +1,46 @@ +id: apollo_ranging_mod +label: Apollo Ranging Modulator +category: '[Apollo USB]' +flags: [python] + +parameters: +- id: chip_rate + label: Chip Rate (Hz) + dtype: int + default: '993963' +- id: sample_rate + label: Sample Rate (Hz) + dtype: real + default: '5120000' + +inputs: +- label: in + domain: stream + dtype: byte + +outputs: +- label: out + domain: stream + dtype: float + +templates: + imports: from apollo.ranging_mod import ranging_mod + make: apollo.ranging_mod.ranging_mod(chip_rate=${chip_rate}, sample_rate=${sample_rate}) + +documentation: |- + Apollo Ranging Modulator + + NRZ-encodes PRN ranging chips (byte 0/1) to float (+1/-1) at the + specified sample rate. Output is suitable for summing with other + subcarriers before PM carrier modulation. + + At the default 5.12 MHz sample rate, each chip occupies ~5 samples. + + This is the same NRZ transform as the PCM NRZ Encoder but configured + for the ranging chip rate (~994 kchip/s) instead of the PCM bit rate. + + Parameters: + chip_rate: PRN chip rate (default 993,963 Hz) + sample_rate: Output sample rate (default 5.12 MHz) + +file_format: 1 diff --git a/grc/apollo_ranging_source.block.yml b/grc/apollo_ranging_source.block.yml new file mode 100644 index 0000000..89bd59d --- /dev/null +++ b/grc/apollo_ranging_source.block.yml @@ -0,0 +1,30 @@ +id: apollo_ranging_source +label: Apollo Ranging Source +category: '[Apollo USB]' +flags: [python] + +parameters: [] + +outputs: +- label: out + domain: stream + dtype: byte + +templates: + imports: from apollo.ranging_source import ranging_source + make: apollo.ranging_source.ranging_source() + +documentation: |- + Apollo PRN Ranging Source + + Generates a continuous stream of PRN ranging code chips (bytes 0 or 1). + The composite code combines CL, X, A, B, and C component sequences + using majority-vote logic: output = (NOT(X) AND maj(A,B,C)) XOR CL + + Code length: 5,456,682 chips (~5.49 seconds at 993,963 chips/sec) + The code repeats cyclically. + + The full code period is pre-generated at startup, so the first call + has a brief delay while the sequence is computed. + +file_format: 1 diff --git a/src/apollo/ranging.py b/src/apollo/ranging.py new file mode 100644 index 0000000..524e026 --- /dev/null +++ b/src/apollo/ranging.py @@ -0,0 +1,456 @@ +""" +Apollo PRN Ranging Code Generator. + +Generates the composite pseudo-random noise code used for spacecraft range +measurement. The code combines 5 component sequences (CL, X, A, B, C) using +majority-vote logic and XOR operations. + +Combined code length: 5,456,682 chips (~5.49 seconds at 993,963 chips/sec) + +The composite output is produced by Shirriff's algorithm: on even-numbered +output chips (ck=0), all shift registers advance and the output is computed +from the feedback bits via majority logic. On odd-numbered chips (ck=1), the +output is simply flipped (XOR with CL clock). + +Combination logic (per even chip): + out = (NOT(xnew) AND maj(anew, bnew, cnew)) XOR ck +where maj = (A&B) | (A&C) | (B&C) + +Component LFSR taps (from Shirriff's Teensy rangeGenerator.ino): + A: 5-bit, taps at bit positions 2 and 0 (polynomial x^5+x^2+1) + B: 6-bit, taps at bit positions 1 and 0 (polynomial x^6+x+1) + C: 7-bit, taps at bit positions 1 and 0 (polynomial x^7+x+1) + +Reference: Ken Shirriff's analysis of the Apollo ranging system + http://www.righto.com/2022/04/the-digital-ranging-system-that.html + https://github.com/shirriff/apollo-ranging +""" + +import numpy as np + +# --------------------------------------------------------------------------- +# Ranging constants (local to this module; will migrate to constants.py) +# --------------------------------------------------------------------------- +RANGING_CHIP_RATE_HZ = 993_963 +RANGING_CODE_LENGTH = 5_456_682 # 2 * 11 * 31 * 63 * 127 +RANGING_CL_LENGTH = 2 +RANGING_X_LENGTH = 11 +RANGING_A_LENGTH = 31 +RANGING_B_LENGTH = 63 +RANGING_C_LENGTH = 127 + +RANGING_X_INIT = 22 # 0b10110 +RANGING_A_INIT = 0x1F # all ones (5-bit) +RANGING_B_INIT = 0x3F # all ones (6-bit) +RANGING_C_INIT = 0x7F # all ones (7-bit) + +# LFSR taps: zero-indexed bit positions for XOR feedback, per Shirriff's code. +# These produce maximal-length sequences (period = 2^n - 1). +RANGING_A_TAPS = (2, 0) # 5-bit: x^5 + x^2 + 1 +RANGING_B_TAPS = (1, 0) # 6-bit: x^6 + x + 1 +RANGING_C_TAPS = (1, 0) # 7-bit: x^7 + x + 1 + +SPEED_OF_LIGHT_M_S = 299_792_458 + + +class RangingCodeGenerator: + """Generate Apollo PRN ranging code sequences. + + Each component code can be generated independently (useful for + sequential correlation in the receiver) or the full composite + sequence can be generated at once. + + The component generators produce *feedback bit* sequences, which is + what Shirriff's combination logic uses. These feedback sequences have + the same period and balance properties as the standard LFSR output + sequences. + """ + + def generate_cl(self, n_chips: int | None = None) -> np.ndarray: + """Generate CL component: alternating 0, 1, 0, 1, ... + + Args: + n_chips: Number of chips. Default: RANGING_CL_LENGTH (2). + + Returns: + uint8 array of 0/1 values. + """ + if n_chips is None: + n_chips = RANGING_CL_LENGTH + return np.array([i % 2 for i in range(n_chips)], dtype=np.uint8) + + def generate_x(self, n_chips: int | None = None) -> np.ndarray: + """Generate X component feedback sequence. + + The X code is an 11-chip sequence with non-LFSR feedback. + Register bits: [xg, xf, xf1, xf2, xf3] = [bit4..bit0] + Feedback: xnew = (!xg & !xf2) | (!xf & !xf3) | (!xf1 & xf2 & xf3) + + The output is the feedback bit (xnew), not the shift-out bit. + + Args: + n_chips: Number of chips. Default: RANGING_X_LENGTH (11). + + Returns: + uint8 array of 0/1 values. + """ + if n_chips is None: + n_chips = RANGING_X_LENGTH + + state = RANGING_X_INIT # 22 = 0b10110 + chips = np.zeros(n_chips, dtype=np.uint8) + + for i in range(n_chips): + # Extract individual bits + xg = (state >> 4) & 1 # bit 4 (MSB) + xf = (state >> 3) & 1 # bit 3 + xf1 = (state >> 2) & 1 # bit 2 + xf2 = (state >> 1) & 1 # bit 1 + xf3 = state & 1 # bit 0 + + # Custom feedback per Shirriff's Teensy rangeGenerator.ino + xnew = ((xg ^ 1) & (xf2 ^ 1)) | ((xf ^ 1) & (xf3 ^ 1)) | ((xf1 ^ 1) & xf2 & xf3) + + # Output is the feedback bit + chips[i] = xnew & 1 + + # Shift right, new bit enters at MSB (bit 4) + state = ((xnew & 1) << 4) | (state >> 1) + + return chips + + def _generate_lfsr_feedback( + self, + n_bits: int, + taps: tuple[int, int], + init: int, + n_chips: int, + ) -> np.ndarray: + """Generate LFSR feedback bit sequence. + + Shift direction: right (MSB in, LSB out). XOR feedback from the + two tap positions. Output is the feedback bit (which becomes the + new MSB after shift), matching Shirriff's combination logic. + + Args: + n_bits: Register width. + taps: Pair of zero-indexed bit positions for XOR feedback. + init: Initial register state. + n_chips: Number of output chips. + + Returns: + uint8 array of 0/1 values (feedback bits). + """ + state = init + mask = (1 << n_bits) - 1 + chips = np.zeros(n_chips, dtype=np.uint8) + + for i in range(n_chips): + # XOR feedback from tap positions + fb = ((state >> taps[0]) & 1) ^ ((state >> taps[1]) & 1) + + # Output is the feedback bit + chips[i] = fb + + # Shift right, feedback enters at MSB + state = ((fb << (n_bits - 1)) | (state >> 1)) & mask + + return chips + + def generate_a(self, n_chips: int | None = None) -> np.ndarray: + """Generate A component: 31-chip LFSR, 5 bits, taps [2,0].""" + if n_chips is None: + n_chips = RANGING_A_LENGTH + return self._generate_lfsr_feedback(5, RANGING_A_TAPS, RANGING_A_INIT, n_chips) + + def generate_b(self, n_chips: int | None = None) -> np.ndarray: + """Generate B component: 63-chip LFSR, 6 bits, taps [1,0].""" + if n_chips is None: + n_chips = RANGING_B_LENGTH + return self._generate_lfsr_feedback(6, RANGING_B_TAPS, RANGING_B_INIT, n_chips) + + def generate_c(self, n_chips: int | None = None) -> np.ndarray: + """Generate C component: 127-chip LFSR, 7 bits, taps [1,0].""" + if n_chips is None: + n_chips = RANGING_C_LENGTH + return self._generate_lfsr_feedback(7, RANGING_C_TAPS, RANGING_C_INIT, n_chips) + + def generate_component(self, name: str, n_chips: int | None = None) -> np.ndarray: + """Generate a named component sequence. + + Args: + name: One of "cl", "x", "a", "b", "c" (case-insensitive). + n_chips: Number of chips (default: one full period). + + Returns: + uint8 array of 0/1 values. + + Raises: + ValueError: If name is not a recognized component. + """ + generators = { + "cl": self.generate_cl, + "x": self.generate_x, + "a": self.generate_a, + "b": self.generate_b, + "c": self.generate_c, + } + gen = generators.get(name.lower()) + if gen is None: + raise ValueError(f"Unknown component: {name!r}. Valid: {list(generators)}") + return gen(n_chips) + + def generate_sequence(self, n_chips: int | None = None) -> np.ndarray: + """Generate the full composite PRN ranging code. + + Reproduces Shirriff's calc() function: on even output chips (ck=0), + all shift registers advance and the output is computed from feedback + bits. On odd chips (ck=1), the output is flipped (XOR with clock). + + The component feedback sequences are tiled to fill the requested + length. Each component advances once per 2 output chips (CL period), + so component index = output_chip_index // 2. + + Args: + n_chips: Total chips to generate. + Default: RANGING_CODE_LENGTH (one full period). + + Returns: + uint8 array of 0/1 values, length n_chips. + """ + if n_chips is None: + n_chips = RANGING_CODE_LENGTH + + # Number of register advance steps (one per CL period of 2 chips) + n_steps = (n_chips + 1) // 2 + + # Generate feedback bit sequences for each component, one per step + x_period = self.generate_x() + a_period = self.generate_a() + b_period = self.generate_b() + c_period = self.generate_c() + + def _tile(period: np.ndarray, length: int, total: int) -> np.ndarray: + reps = (total + length - 1) // length + return np.tile(period, reps)[:total] + + x_fb = _tile(x_period, RANGING_X_LENGTH, n_steps) + a_fb = _tile(a_period, RANGING_A_LENGTH, n_steps) + b_fb = _tile(b_period, RANGING_B_LENGTH, n_steps) + c_fb = _tile(c_period, RANGING_C_LENGTH, n_steps) + + # Majority vote on feedback bits: maj(A,B,C) = (A&B) | (A&C) | (B&C) + maj = (a_fb & b_fb) | (a_fb & c_fb) | (b_fb & c_fb) + + # Composite per step: (NOT(xnew) AND maj) -- before CL XOR + base = ((x_fb ^ 1) & maj).astype(np.uint8) + + # Expand to output chips: each step produces 2 chips. + # Chip 0 (ck=1 after advance): base XOR 1 + # Chip 1 (ck=0 before advance): base XOR 0 = base + # Wait -- re-examine Shirriff's calc(): + # On entry with ck=0: advance registers, set ck=1, compute out with XOR ck(=1) + # On entry with ck=1: set ck=0, flip out (out ^= 1) + # So the first chip of each pair uses ck=1 (XOR with 1), + # and the second uses the flip (equivalent to XOR with 0). + chip0 = base ^ 1 # ck=1 at time of computation + chip1 = base # flipped = (base ^ 1) ^ 1 = base + + # Interleave: [chip0[0], chip1[0], chip0[1], chip1[1], ...] + output = np.empty(n_steps * 2, dtype=np.uint8) + output[0::2] = chip0 + output[1::2] = chip1 + + return output[:n_chips] + + +class RangingCorrelator: + """Cross-correlate received NRZ samples with the known PRN code. + + Uses FFT-based cross-correlation for efficiency. Works at any sample + rate -- when sample_rate equals chip_rate, each chip is one sample. + """ + + def __init__( + self, + chip_rate: int = RANGING_CHIP_RATE_HZ, + sample_rate: float = RANGING_CHIP_RATE_HZ, + two_way: bool = True, + ): + self.chip_rate = chip_rate + self.sample_rate = sample_rate + self.two_way = two_way + self.samples_per_chip = sample_rate / chip_rate + + self._gen = RangingCodeGenerator() + + def correlate(self, received: np.ndarray, code_chips: int | None = None) -> dict: + """Cross-correlate received samples with PRN code. + + Args: + received: Float samples (NRZ-like, +1/-1 or similar). + code_chips: Number of PRN chips for reference. + Default: enough to cover the received samples. + + Returns: + Dict with delay_chips, delay_samples, range_m, + correlation_peak, peak_to_avg_ratio. + """ + if code_chips is None: + code_chips = int(len(received) / self.samples_per_chip) + 1 + + # Generate reference code (at most one full period) + ref_chips = self._gen.generate_sequence(min(code_chips, RANGING_CODE_LENGTH)) + + # NRZ encode reference: 0 -> -1, 1 -> +1 + ref_nrz = ref_chips.astype(np.float32) * 2.0 - 1.0 + + # Upsample reference to match sample rate + spc = int(self.samples_per_chip) + ref_samples = np.repeat(ref_nrz, spc) if spc > 1 else ref_nrz + + # Truncate to match lengths for correlation + min_len = min(len(received), len(ref_samples)) + rx = received[:min_len].astype(np.float32) + ref = ref_samples[:min_len] + + # Cross-correlate using FFT + n_fft = 1 + while n_fft < 2 * min_len: + n_fft *= 2 + + rx_fft = np.fft.rfft(rx, n_fft) + ref_fft = np.fft.rfft(ref, n_fft) + xcorr = np.fft.irfft(rx_fft * np.conj(ref_fft), n_fft) + + # Find peak in the valid region + peak_idx = int(np.argmax(np.abs(xcorr[:min_len]))) + peak_val = float(np.abs(xcorr[peak_idx])) + avg_val = float(np.mean(np.abs(xcorr[:min_len]))) + + # Convert sample delay to chip delay + delay_chips = peak_idx / self.samples_per_chip + range_m = chips_to_range_m(delay_chips, two_way=self.two_way) + + return { + "delay_samples": peak_idx, + "delay_chips": delay_chips, + "range_m": range_m, + "correlation_peak": peak_val, + "peak_to_avg_ratio": peak_val / avg_val if avg_val > 0 else 0.0, + } + + +# --------------------------------------------------------------------------- +# Utility functions +# --------------------------------------------------------------------------- + + +def chips_to_range_m(delay_chips: float, two_way: bool = True) -> float: + """Convert chip delay to range in meters. + + Args: + delay_chips: Delay measured in chip periods. + two_way: If True, signal traveled ground -> spacecraft -> ground. + + Returns: + Range in meters. + """ + chip_period_s = 1.0 / RANGING_CHIP_RATE_HZ + delay_s = delay_chips * chip_period_s + distance = SPEED_OF_LIGHT_M_S * delay_s + if two_way: + distance /= 2.0 + return distance + + +def range_m_to_chips(range_m: float, two_way: bool = True) -> float: + """Convert range in meters to chip delay. + + Args: + range_m: Distance in meters. + two_way: If True, compute round-trip delay. + + Returns: + Delay in chip periods. + """ + distance = range_m * 2.0 if two_way else range_m + delay_s = distance / SPEED_OF_LIGHT_M_S + return delay_s * RANGING_CHIP_RATE_HZ + + +def verify_code_properties() -> dict: + """Verify all component codes have correct properties. + + Checks length, periodicity, and balance for each component and the + full composite sequence. + + Returns: + Dict with component names as keys and property dicts as values. + """ + gen = RangingCodeGenerator() + results = {} + + components = { + "cl": {"length": RANGING_CL_LENGTH, "n_bits": None}, + "x": {"length": RANGING_X_LENGTH, "n_bits": 5}, + "a": {"length": RANGING_A_LENGTH, "n_bits": 5}, + "b": {"length": RANGING_B_LENGTH, "n_bits": 6}, + "c": {"length": RANGING_C_LENGTH, "n_bits": 7}, + } + + for name, props in components.items(): + code = gen.generate_component(name) + exp_len = props["length"] + + result = { + "length": len(code), + "length_correct": len(code) == exp_len, + "ones_count": int(np.sum(code)), + "zeros_count": int(exp_len - np.sum(code)), + } + + # Check periodicity: 2x generation should repeat exactly + code_2x = gen.generate_component(name, exp_len * 2) + result["periodic"] = bool(np.array_equal(code_2x[:exp_len], code_2x[exp_len:])) + + # For maximal-length LFSRs, verify balance: ones = 2^(n-1), zeros = 2^(n-1)-1 + if name in ("a", "b", "c") and props["n_bits"] is not None: + n = props["n_bits"] + expected_ones = 2 ** (n - 1) + expected_zeros = 2 ** (n - 1) - 1 + result["balance_correct"] = ( + result["ones_count"] == expected_ones and result["zeros_count"] == expected_zeros + ) + + # CL should be perfectly balanced + if name == "cl": + result["balance_correct"] = result["ones_count"] == 1 and result["zeros_count"] == 1 + + results[name] = result + + # Verify combined length is the product of all component lengths + product = ( + RANGING_CL_LENGTH + * RANGING_X_LENGTH + * RANGING_A_LENGTH + * RANGING_B_LENGTH + * RANGING_C_LENGTH + ) + results["length_product"] = { + "expected": product, + "matches_constant": product == RANGING_CODE_LENGTH, + } + + # Full composite -- generate a shorter segment for speed + # (full 5.4M chips is fine but we verify length math above) + short_len = 10_000 + short_seq = gen.generate_sequence(n_chips=short_len) + results["composite_sample"] = { + "length": len(short_seq), + "ones_count": int(np.sum(short_seq)), + "zeros_count": int(short_len - np.sum(short_seq)), + "balance": float(np.sum(short_seq)) / short_len, + } + + return results diff --git a/src/apollo/ranging_demod.py b/src/apollo/ranging_demod.py new file mode 100644 index 0000000..46eca8f --- /dev/null +++ b/src/apollo/ranging_demod.py @@ -0,0 +1,114 @@ +""" +Apollo Ranging Demodulator -- correlates received signal to measure range. + +Takes the PM-demodulated float signal (containing the ranging subcarrier), +cross-correlates with the known PRN code, and outputs range measurements +as PDU messages. + +The correlator accumulates a configurable number of samples per batch, +then performs FFT-based cross-correlation against the reference PRN code. +Each correlation batch produces one range measurement PDU. + +Reference: Ken Shirriff's Apollo ranging analysis + http://www.righto.com/2022/04/the-digital-ranging-system-that.html +""" + +import time + +import numpy as np + +from apollo.ranging import ( + RANGING_CHIP_RATE_HZ, + RangingCorrelator, +) + +SAMPLE_RATE_BASEBAND = 5_120_000 + +# --------------------------------------------------------------------------- +# GNU Radio block (optional -- only if gnuradio is available) +# --------------------------------------------------------------------------- + +try: + import pmt + from gnuradio import gr + + class ranging_demod(gr.basic_block): + """GNU Radio block: ranging demodulator with correlation. + + Accumulates samples, runs batch correlation, emits range PDUs. + + Input: float (PM demod output or filtered ranging signal) + Output: message PDUs with range measurements (port "range") + """ + + def __init__( + self, + chip_rate: int = RANGING_CHIP_RATE_HZ, + sample_rate: float = SAMPLE_RATE_BASEBAND, + correlation_length: int = 100_000, + two_way: bool = True, + ): + gr.basic_block.__init__( + self, + name="apollo_ranging_demod", + in_sig=[np.float32], + out_sig=None, + ) + + self.message_port_register_out(pmt.intern("range")) + + self._correlator = RangingCorrelator( + chip_rate=chip_rate, + sample_rate=sample_rate, + two_way=two_way, + ) + self._buffer = np.array([], dtype=np.float32) + self._correlation_length = correlation_length + + def general_work(self, input_items, output_items): + n = len(input_items[0]) + samples = np.array(input_items[0][:n], dtype=np.float32) + self.consume(0, n) + + self._buffer = np.concatenate([self._buffer, samples]) + + while len(self._buffer) >= self._correlation_length: + chunk = self._buffer[: self._correlation_length] + self._buffer = self._buffer[self._correlation_length :] + + result = self._correlator.correlate(chunk) + + meta = pmt.make_dict() + meta = pmt.dict_add( + meta, + pmt.intern("delay_chips"), + pmt.from_double(result["delay_chips"]), + ) + meta = pmt.dict_add( + meta, + pmt.intern("range_m"), + pmt.from_double(result["range_m"]), + ) + meta = pmt.dict_add( + meta, + pmt.intern("correlation_peak"), + pmt.from_double(result["correlation_peak"]), + ) + meta = pmt.dict_add( + meta, + pmt.intern("peak_to_avg_ratio"), + pmt.from_double(result["peak_to_avg_ratio"]), + ) + meta = pmt.dict_add( + meta, + pmt.intern("timestamp"), + pmt.from_double(time.time()), + ) + + pdu = pmt.cons(meta, pmt.PMT_NIL) + self.message_port_pub(pmt.intern("range"), pdu) + + return 0 + +except ImportError: + pass diff --git a/src/apollo/ranging_mod.py b/src/apollo/ranging_mod.py new file mode 100644 index 0000000..28642af --- /dev/null +++ b/src/apollo/ranging_mod.py @@ -0,0 +1,65 @@ +""" +Apollo Ranging Modulator -- NRZ-encodes PRN chips for carrier modulation. + +Converts the chip stream (bytes 0/1) to a float NRZ waveform (+1/-1) +at the baseband sample rate. This output is suitable for summing with +other subcarriers before PM modulation. + +The chip rate is ~994 kchip/s, so at 5.12 MHz sample rate there are +approximately 5.15 samples per chip. + +This is essentially the same transform as nrz_encoder but configured +for the ranging chip rate instead of the PCM bit rate. + +Reference: IMPLEMENTATION_SPEC.md ranging modulation path +""" + +from gnuradio import blocks, gr + +from apollo.ranging import RANGING_CHIP_RATE_HZ + +# Use the standard baseband sample rate +SAMPLE_RATE_BASEBAND = 5_120_000 + + +class ranging_mod(gr.hier_block2): + """Ranging chip NRZ modulator: byte (0/1) -> float (+1/-1) at sample_rate. + + Input: byte stream (chip values 0 or 1) + Output: float NRZ waveform at sample_rate + """ + + def __init__( + self, + chip_rate: int = RANGING_CHIP_RATE_HZ, + sample_rate: float = SAMPLE_RATE_BASEBAND, + ): + gr.hier_block2.__init__( + self, + "apollo_ranging_mod", + gr.io_signature(1, 1, gr.sizeof_char), + gr.io_signature(1, 1, gr.sizeof_float), + ) + + self._chip_rate = chip_rate + self._sample_rate = sample_rate + samples_per_chip = int(sample_rate / chip_rate) + + # byte (0/1) -> float (0.0/1.0) + self.to_float = blocks.char_to_float(1, 1) + + # float (0.0/1.0) -> float (0.0/2.0) + self.scale = blocks.multiply_const_ff(2.0) + + # float (0.0/2.0) -> float (-1.0/+1.0) + self.offset = blocks.add_const_ff(-1.0) + + # Upsample: repeat each value samples_per_chip times + self.upsample = blocks.repeat(gr.sizeof_float, samples_per_chip) + + # Connect chain + self.connect(self, self.to_float, self.scale, self.offset, self.upsample, self) + + @property + def samples_per_chip(self) -> int: + return int(self._sample_rate / self._chip_rate) diff --git a/src/apollo/ranging_source.py b/src/apollo/ranging_source.py new file mode 100644 index 0000000..9b9cb09 --- /dev/null +++ b/src/apollo/ranging_source.py @@ -0,0 +1,60 @@ +""" +Apollo Ranging Source -- streams PRN ranging chips. + +Outputs a continuous stream of bytes (0 or 1) representing the composite +PRN ranging code. The code repeats every 5,456,682 chips (~5.49 seconds +at the nominal 993,963 chip/s rate). + +The full code period is pre-generated and cycled through, so startup cost +is paid once and streaming is zero-allocation. + +Reference: Ken Shirriff's Apollo ranging analysis + http://www.righto.com/2022/04/the-digital-ranging-system-that.html +""" + +import numpy as np + +from apollo.ranging import RANGING_CODE_LENGTH, RangingCodeGenerator + +# --------------------------------------------------------------------------- +# GNU Radio block (optional -- only if gnuradio is available) +# --------------------------------------------------------------------------- + +try: + from gnuradio import gr + + class ranging_source(gr.sync_block): + """GNU Radio source: continuous PRN ranging chip stream. + + Outputs bytes (0 or 1) at the chip rate. Pre-generates the full + code period and cycles through it. + """ + + def __init__(self): + gr.sync_block.__init__( + self, + name="apollo_ranging_source", + in_sig=None, + out_sig=[np.byte], + ) + + self._gen = RangingCodeGenerator() + self._code = self._gen.generate_sequence() + self._pos = 0 + + def work(self, input_items, output_items): + out = output_items[0] + n = len(out) + produced = 0 + + while produced < n: + remaining = RANGING_CODE_LENGTH - self._pos + chunk = min(n - produced, remaining) + out[produced : produced + chunk] = self._code[self._pos : self._pos + chunk] + self._pos = (self._pos + chunk) % RANGING_CODE_LENGTH + produced += chunk + + return produced + +except ImportError: + pass