skywalker-1/tools/signal_analysis.py
Ryan Malloy cc3a0707a1 Add DiSEqC motor control, QO-100 DATV reception, and carrier survey
Firmware v3.03.0: DiSEqC Manchester encoder (cmd 0x8D extended),
parameterized spectrum sweep (0xBA), adaptive blind scan (0xBB),
error code reporting (0xBC). All new function locals moved to XDATA
to fit within FX2LP 256-byte internal RAM constraint.

Motor control: DiSEqC 1.2 positioner with USALS GotoX, stored
positions, interactive keyboard jog, 30-second safety auto-halt.

QO-100 DATV: Es'hail-2 wideband transponder tools — LNB IF
calculator, narrowband scan, tune, and TS-to-video pipe (ffplay/mpv).

Carrier survey: six-stage pipeline (coarse sweep → peak detection →
fine sweep → blind scan → TS sample → catalog). JSON catalog with
differential analysis, QO-100 optimized mode, CSV/text export.

TUI: F9 Motor screen (3-column layout with signal gauge), F10 Survey
screen (Full Band + QO-100 tabs). Bridge, demo, and theme updated.

Docs: motor.mdx, survey.mdx, qo100-datv.mdx guide, tui.mdx updated
for 10 screens. Site builds 41 pages, all links valid.
2026-02-15 17:01:11 -07:00

240 lines
8.1 KiB
Python

#!/usr/bin/env python3
"""
Enhanced signal analysis for carrier detection and characterization.
Goes beyond the basic detect_peaks() in skywalker_lib by using robust
noise floor estimation (median + MAD), peak width measurement at -3dB,
peak merging within estimated carrier bandwidth, and carrier classification.
"""
import math
import statistics
def adaptive_noise_floor(powers: list) -> tuple:
"""
Robust noise floor estimation using median + MAD.
The median is insensitive to strong carriers in the sweep, and the
Median Absolute Deviation (MAD) provides a robust spread measure
that won't be pulled by a few dominant peaks.
Returns (noise_floor_db, mad_db).
"""
if not powers:
return (0.0, 0.0)
median_power = statistics.median(powers)
deviations = [abs(p - median_power) for p in powers]
mad = statistics.median(deviations) if deviations else 0.0
# The noise floor sits at the median -- most bins are noise in a
# typical satellite IF sweep. MAD gives us an idea of how "bumpy"
# the noise is, useful for setting adaptive thresholds.
return (median_power, mad)
def detect_peaks_enhanced(freqs: list, powers: list,
threshold_db: float = 6.0) -> list:
"""
Enhanced peak detection with width estimation and merging.
Returns list of dicts, each containing:
freq - center frequency in MHz
power - peak power in dB (relative)
index - index into freqs/powers arrays
width_mhz - estimated carrier bandwidth at -3dB
prominence_db - peak power above noise floor
Steps:
1. Compute adaptive noise floor (median + MAD)
2. Find local maxima above noise_floor + threshold_db
3. Estimate -3dB width around each peak
4. Merge peaks whose -3dB extents overlap (same carrier)
"""
if len(powers) < 3 or len(freqs) != len(powers):
return []
noise_floor, mad = adaptive_noise_floor(powers)
# Effective threshold: user threshold, but never below 3x MAD to
# avoid chasing noise ripples.
effective_threshold = max(threshold_db, 3.0 * mad) if mad > 0 else threshold_db
min_power = noise_floor + effective_threshold
# Step 1: find raw local maxima
raw_peaks = []
for i in range(1, len(powers) - 1):
if powers[i] > powers[i - 1] and powers[i] > powers[i + 1]:
if powers[i] >= min_power:
raw_peaks.append(i)
# Also check endpoints if they are strong
if len(powers) >= 2:
if powers[0] > powers[1] and powers[0] >= min_power:
raw_peaks.insert(0, 0)
if powers[-1] > powers[-2] and powers[-1] >= min_power:
raw_peaks.append(len(powers) - 1)
if not raw_peaks:
return []
# Step 2: measure width and build peak dicts
peaks = []
for idx in raw_peaks:
bw = estimate_carrier_bw(freqs, powers, idx)
prominence = powers[idx] - noise_floor
peaks.append({
"freq": freqs[idx],
"power": powers[idx],
"index": idx,
"width_mhz": bw,
"prominence_db": prominence,
})
# Step 3: merge overlapping peaks (keep the stronger one)
merged = _merge_peaks(peaks)
return merged
def _merge_peaks(peaks: list) -> list:
"""
Merge peaks whose -3dB extents overlap.
When two peaks are closer together than the sum of their half-widths
they likely belong to the same carrier. Keep the stronger peak and
take the wider bandwidth.
"""
if len(peaks) <= 1:
return peaks
# Sort by frequency
peaks = sorted(peaks, key=lambda p: p["freq"])
merged = [peaks[0]]
for peak in peaks[1:]:
prev = merged[-1]
# Half-widths
prev_upper = prev["freq"] + prev["width_mhz"] / 2
peak_lower = peak["freq"] - peak["width_mhz"] / 2
if peak_lower <= prev_upper:
# Overlap: keep the stronger peak, widen the bandwidth
if peak["power"] > prev["power"]:
wider = max(prev["width_mhz"], peak["width_mhz"],
(peak["freq"] + peak["width_mhz"] / 2) -
(prev["freq"] - prev["width_mhz"] / 2))
peak["width_mhz"] = wider
merged[-1] = peak
else:
wider = max(prev["width_mhz"], peak["width_mhz"],
(peak["freq"] + peak["width_mhz"] / 2) -
(prev["freq"] - prev["width_mhz"] / 2))
prev["width_mhz"] = wider
else:
merged.append(peak)
return merged
def estimate_carrier_bw(freqs: list, powers: list,
peak_idx: int) -> float:
"""
Estimate carrier bandwidth by walking from peak until power drops
3 dB below the peak value (the -3dB bandwidth).
Walks left and right from the peak index, interpolating between
adjacent frequency bins when the -3dB crossing falls between them.
Returns estimated bandwidth in MHz. Minimum return is one frequency
step width (avoids zero-width artifacts on single-bin peaks).
"""
if peak_idx < 0 or peak_idx >= len(powers):
return 0.0
peak_power = powers[peak_idx]
cutoff = peak_power - 3.0
# Minimum step size for fallback
if len(freqs) >= 2:
step = abs(freqs[1] - freqs[0])
else:
return 0.0
# Walk left
left_freq = freqs[peak_idx]
for i in range(peak_idx - 1, -1, -1):
if powers[i] <= cutoff:
# Interpolate between bin i and bin i+1
if powers[i + 1] != powers[i]:
frac = (cutoff - powers[i]) / (powers[i + 1] - powers[i])
else:
frac = 0.5
left_freq = freqs[i] + frac * (freqs[i + 1] - freqs[i])
break
left_freq = freqs[i]
# Walk right
right_freq = freqs[peak_idx]
for i in range(peak_idx + 1, len(powers)):
if powers[i] <= cutoff:
if powers[i - 1] != powers[i]:
frac = (cutoff - powers[i]) / (powers[i - 1] - powers[i])
else:
frac = 0.5
right_freq = freqs[i] - frac * (freqs[i] - freqs[i - 1])
break
right_freq = freqs[i]
bw = right_freq - left_freq
return max(bw, step)
def classify_carrier(bw_mhz: float, power_db: float) -> dict:
"""
Classify a detected carrier based on bandwidth and power.
Uses empirical ranges for DVB-S symbol rates: SR (Msps) is roughly
BW (MHz) / 1.35 for QPSK with roll-off 0.35.
Returns dict with:
estimated_sr_range - (min_sps, max_sps) tuple
likely_modulation - list of plausible modulation names
signal_quality - 'strong', 'moderate', or 'weak'
"""
# Roll-off factor for DVB-S is typically 0.35, so BW ~ SR * 1.35.
# Allow some tolerance on both sides.
sr_center = bw_mhz / 1.35 # Msps
sr_min = int(max(256_000, (bw_mhz / 1.5) * 1_000_000))
sr_max = int(min(30_000_000, (bw_mhz / 1.2) * 1_000_000))
if sr_max < sr_min:
sr_max = sr_min
# Guess modulation based on bandwidth
likely_mods = []
if bw_mhz < 3.0:
# Narrow carrier: low-SR data channels, SCPC, DCII split
likely_mods = ["qpsk", "dcii-i", "dcii-q", "dss"]
elif bw_mhz < 10.0:
# Medium: typical SCPC, small MCPC
likely_mods = ["qpsk", "turbo-qpsk", "dcii-combo"]
elif bw_mhz < 20.0:
# Wide: MCPC transponders
likely_mods = ["qpsk", "turbo-qpsk", "turbo-8psk"]
else:
# Very wide: full transponder, high-SR
likely_mods = ["qpsk", "turbo-qpsk", "turbo-8psk", "dcii-combo"]
# Signal quality heuristic (relative power, device-dependent)
if power_db > -10.0:
quality = "strong"
elif power_db > -25.0:
quality = "moderate"
else:
quality = "weak"
return {
"estimated_sr_range": (sr_min, sr_max),
"likely_modulation": likely_mods,
"signal_quality": quality,
}