Firmware v3.02.0 adds three new vendor commands: - 0xB7 SIGNAL_MONITOR: fast 8-byte combined signal read - 0xB8 TUNE_MONITOR: tune + dwell + read in one round-trip - 0xB9 MULTI_REG_READ: batch read up to 64 indirect registers New tools/skywalker.py provides five modes that use the BCM4500's AGC registers as a crude power detector across 950-2150 MHz IF, even without demodulator lock: - spectrum: sweep analyzer with ASCII/waterfall/matplotlib display - scan: automated transponder scanner (sweep + peak detect + blind scan) - monitor: real-time signal strength for dish alignment - lband: direct input analyzer with L-band allocation annotations - track: carrier/beacon tracker with CSV/JSON logging and drift detection Extracts shared SkyWalker1 class and constants into skywalker_lib.py; tune.py now imports from the shared library.
541 lines
19 KiB
Python
Executable File
541 lines
19 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Genpix SkyWalker-1 DVB-S tuning and streaming tool.
|
|
|
|
Controls the SkyWalker-1 USB DVB-S satellite receiver via vendor USB
|
|
control transfers. Supports tuning, LNB control, DiSEqC switching,
|
|
MPEG-2 transport stream capture, and signal monitoring.
|
|
|
|
Hardware: Cypress FX2 (CY7C68013A) + Broadcom BCM4500 demodulator
|
|
USB: VID 0x09C0, PID 0x0203, EP2 bulk IN for TS data
|
|
"""
|
|
|
|
import sys
|
|
import struct
|
|
import argparse
|
|
import time
|
|
import json
|
|
import signal
|
|
import os
|
|
|
|
# Add tools directory to path for library import
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from skywalker_lib import (
|
|
SkyWalker1, VENDOR_ID, PRODUCT_ID, EP2_URB_SIZE,
|
|
MODULATIONS, FEC_RATES, MOD_FEC_GROUP,
|
|
LNB_LO_LOW, LNB_LO_HIGH,
|
|
CONFIG_BITS,
|
|
signal_bar, format_config_bits,
|
|
)
|
|
|
|
import usb.core
|
|
|
|
|
|
# -- Subcommand handlers --
|
|
|
|
def cmd_status(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Show device status, firmware version, signal info."""
|
|
print(f"Genpix SkyWalker-1 Status")
|
|
print(f"{'=' * 50}")
|
|
|
|
print(f"\nUSB: Bus {sw.dev.bus} Addr {sw.dev.address} "
|
|
f"(VID 0x{VENDOR_ID:04X}, PID 0x{PRODUCT_ID:04X})")
|
|
|
|
# Firmware version
|
|
try:
|
|
fw = sw.get_fw_version()
|
|
print(f"FW: {fw['version']} (built {fw['date']})")
|
|
except usb.core.USBError:
|
|
print("FW: (read failed)")
|
|
fw = None
|
|
|
|
# Config status
|
|
status = sw.get_config()
|
|
print(f"\nConfig: 0x{status:02X}")
|
|
bits = format_config_bits(status)
|
|
for name, is_set in bits:
|
|
state = "ON" if is_set else "off"
|
|
print(f" [{state:>3}] {name}")
|
|
|
|
# Signal lock and strength
|
|
locked = sw.get_signal_lock()
|
|
print(f"\nSignal Lock: {'LOCKED' if locked else 'no lock'}")
|
|
if locked:
|
|
sig = sw.get_signal_strength()
|
|
print(f"SNR: {sig['snr_db']:.1f} dB (raw 0x{sig['snr_raw']:04X})")
|
|
print(f"Quality: {signal_bar(sig['snr_pct'])}")
|
|
|
|
if args.json:
|
|
out = {
|
|
"usb": {"bus": sw.dev.bus, "address": sw.dev.address},
|
|
"config": status,
|
|
"config_bits": {field: bool(status & bit)
|
|
for bit, (_name, field) in CONFIG_BITS.items()},
|
|
"locked": locked,
|
|
}
|
|
if fw:
|
|
out["firmware"] = fw
|
|
if locked:
|
|
out["signal"] = sw.get_signal_strength()
|
|
print(f"\n{json.dumps(out, indent=2)}")
|
|
|
|
|
|
def cmd_tune(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Tune to a transponder."""
|
|
freq_mhz = args.freq
|
|
sr_ksps = args.sr
|
|
mod_name = args.mod
|
|
fec_name = args.fec
|
|
pol = args.pol.upper() if args.pol else None
|
|
band = args.band
|
|
|
|
# Resolve LNB LO
|
|
if args.lnb_lo:
|
|
lnb_lo = args.lnb_lo
|
|
elif band == "high":
|
|
lnb_lo = LNB_LO_HIGH
|
|
else:
|
|
lnb_lo = LNB_LO_LOW
|
|
|
|
# Compute IF frequency
|
|
if_mhz = freq_mhz - lnb_lo
|
|
if_khz = int(if_mhz * 1000)
|
|
|
|
if if_khz < 950000 or if_khz > 2150000:
|
|
print(f"WARNING: IF frequency {if_mhz} MHz is outside 950-2150 MHz range")
|
|
print(f" Downlink: {freq_mhz} MHz, LNB LO: {lnb_lo} MHz")
|
|
if if_khz < 0:
|
|
print(" IF is negative -- check your LNB LO frequency")
|
|
sys.exit(1)
|
|
|
|
# Resolve modulation
|
|
if mod_name not in MODULATIONS:
|
|
print(f"Unknown modulation: {mod_name}")
|
|
print(f"Valid: {', '.join(MODULATIONS.keys())}")
|
|
sys.exit(1)
|
|
mod_index, mod_desc = MODULATIONS[mod_name]
|
|
|
|
# Resolve FEC
|
|
fec_group = MOD_FEC_GROUP[mod_name]
|
|
fec_table = FEC_RATES[fec_group]
|
|
if fec_name not in fec_table:
|
|
print(f"Invalid FEC '{fec_name}' for {mod_desc}")
|
|
print(f"Valid: {', '.join(fec_table.keys())}")
|
|
sys.exit(1)
|
|
fec_index = fec_table[fec_name]
|
|
|
|
sr_sps = sr_ksps * 1000
|
|
|
|
print(f"Tuning SkyWalker-1")
|
|
print(f"{'=' * 50}")
|
|
print(f" Downlink: {freq_mhz} MHz")
|
|
print(f" LNB LO: {lnb_lo} MHz")
|
|
print(f" IF Frequency: {if_mhz} MHz ({if_khz} kHz)")
|
|
print(f" Symbol Rate: {sr_ksps} ksps ({sr_sps} sps)")
|
|
print(f" Modulation: {mod_desc} (index {mod_index})")
|
|
print(f" FEC: {fec_name} (index {fec_index})")
|
|
if pol:
|
|
pol_desc = {"H": "Horizontal (18V)", "V": "Vertical (13V)",
|
|
"L": "Left circular (18V)", "R": "Right circular (13V)"}
|
|
print(f" Polarization: {pol_desc.get(pol, pol)}")
|
|
if band:
|
|
print(f" Band: {band} ({'22kHz on' if band == 'high' else '22kHz off'})")
|
|
print()
|
|
|
|
# Step 1: Check device status
|
|
status = sw.get_config()
|
|
print(f"[1/8] Config status: 0x{status:02X}")
|
|
|
|
# Step 2: Boot demodulator if needed
|
|
if not (status & 0x01):
|
|
print("[2/8] Booting 8PSK demodulator...")
|
|
sw.boot(on=True)
|
|
time.sleep(0.5)
|
|
status = sw.get_config()
|
|
if not (status & 0x01):
|
|
print(" FAILED: Device did not start")
|
|
sys.exit(1)
|
|
print(" OK")
|
|
else:
|
|
print("[2/8] Demodulator already running")
|
|
|
|
# Step 3: Enable LNB power if needed
|
|
if not (status & 0x04):
|
|
print("[3/8] Enabling LNB power supply...")
|
|
sw.start_intersil(on=True)
|
|
time.sleep(0.3)
|
|
status = sw.get_config()
|
|
if not (status & 0x04):
|
|
print(" FAILED: LNB power did not enable")
|
|
sys.exit(1)
|
|
print(" OK")
|
|
else:
|
|
print("[3/8] LNB power already on")
|
|
|
|
# Step 4: Set LNB voltage (polarization)
|
|
if pol:
|
|
high_voltage = pol in ("H", "L")
|
|
print(f"[4/8] Setting LNB voltage: {'18V' if high_voltage else '13V'}")
|
|
sw.set_lnb_voltage(high_voltage)
|
|
else:
|
|
print("[4/8] LNB voltage: not changed (no --pol specified)")
|
|
|
|
# Step 5: Extra voltage if requested
|
|
if args.extra_volt:
|
|
print("[5/8] Enabling +1V LNB boost")
|
|
sw.set_extra_voltage(True)
|
|
else:
|
|
print("[5/8] Extra voltage: off")
|
|
|
|
# Step 6: Set 22 kHz tone (band selection)
|
|
if band:
|
|
tone_on = (band == "high")
|
|
print(f"[6/8] 22 kHz tone: {'ON' if tone_on else 'OFF'}")
|
|
sw.set_22khz_tone(tone_on)
|
|
else:
|
|
print("[6/8] 22 kHz tone: not changed (no --band specified)")
|
|
|
|
# Step 7: Send tune command
|
|
print(f"[7/8] Sending TUNE_8PSK...")
|
|
if sw.verbose:
|
|
payload_hex = struct.pack('<II', sr_sps, if_khz).hex(' ')
|
|
print(f" Payload: {payload_hex} {mod_index:02x} {fec_index:02x}")
|
|
sw.tune(sr_sps, if_khz, mod_index, fec_index)
|
|
|
|
# Step 8: Wait for lock
|
|
timeout = args.timeout
|
|
print(f"[8/8] Waiting for signal lock (timeout {timeout}s)...")
|
|
deadline = time.time() + timeout
|
|
locked = False
|
|
dots = 0
|
|
while time.time() < deadline:
|
|
if sw.get_signal_lock():
|
|
locked = True
|
|
break
|
|
print(".", end="", flush=True)
|
|
dots += 1
|
|
time.sleep(0.5)
|
|
if dots:
|
|
print()
|
|
|
|
if locked:
|
|
sig = sw.get_signal_strength()
|
|
print(f"\n LOCKED")
|
|
print(f" SNR: {sig['snr_db']:.1f} dB (raw 0x{sig['snr_raw']:04X})")
|
|
print(f" Quality: {signal_bar(sig['snr_pct'])}")
|
|
else:
|
|
print(f"\n NO LOCK after {timeout}s")
|
|
print(" Check frequency, symbol rate, polarization, and dish alignment")
|
|
|
|
if args.json:
|
|
out = {
|
|
"tuned": True,
|
|
"locked": locked,
|
|
"freq_mhz": freq_mhz,
|
|
"if_khz": if_khz,
|
|
"sr_ksps": sr_ksps,
|
|
"modulation": mod_name,
|
|
"fec": fec_name,
|
|
}
|
|
if locked:
|
|
out["signal"] = sw.get_signal_strength()
|
|
print(f"\n{json.dumps(out, indent=2)}")
|
|
|
|
|
|
def cmd_stream(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Stream MPEG-2 transport data to file or stdout."""
|
|
# Verify signal lock
|
|
if not sw.get_signal_lock():
|
|
print("No signal lock -- tune to a transponder first")
|
|
print(" Example: tune.py tune 12520 27500 --pol H --band high")
|
|
sys.exit(1)
|
|
|
|
output_file = None
|
|
output_fd = None
|
|
|
|
if args.stdout:
|
|
output_fd = sys.stdout.buffer
|
|
# Suppress all status output when piping
|
|
status_fd = sys.stderr
|
|
elif args.output:
|
|
output_file = args.output
|
|
output_fd = open(output_file, 'wb')
|
|
status_fd = sys.stdout
|
|
else:
|
|
print("Specify -o FILE or --stdout")
|
|
sys.exit(1)
|
|
|
|
duration = args.duration
|
|
total_bytes = 0
|
|
start_time = time.time()
|
|
last_report = start_time
|
|
running = True
|
|
|
|
def stop_handler(signum, frame):
|
|
nonlocal running
|
|
running = False
|
|
|
|
signal.signal(signal.SIGINT, stop_handler)
|
|
signal.signal(signal.SIGTERM, stop_handler)
|
|
|
|
status_fd.write(f"Streaming TS data")
|
|
if output_file:
|
|
status_fd.write(f" to {output_file}")
|
|
if duration:
|
|
status_fd.write(f" for {duration}s")
|
|
status_fd.write("\n")
|
|
status_fd.flush()
|
|
|
|
# Arm the transfer
|
|
sw.arm_transfer(on=True)
|
|
status_fd.write(" Armed. Reading EP2...\n")
|
|
status_fd.flush()
|
|
|
|
try:
|
|
while running:
|
|
if duration and (time.time() - start_time) >= duration:
|
|
break
|
|
|
|
chunk = sw.read_stream(EP2_URB_SIZE, timeout=2000)
|
|
if chunk:
|
|
output_fd.write(chunk)
|
|
total_bytes += len(chunk)
|
|
|
|
now = time.time()
|
|
if now - last_report >= 1.0:
|
|
elapsed = now - start_time
|
|
bitrate = (total_bytes * 8) / elapsed if elapsed > 0 else 0
|
|
if bitrate >= 1e6:
|
|
rate_str = f"{bitrate / 1e6:.2f} Mbps"
|
|
else:
|
|
rate_str = f"{bitrate / 1e3:.1f} kbps"
|
|
status_fd.write(f"\r {total_bytes:,} bytes {rate_str} "
|
|
f"({elapsed:.0f}s) ")
|
|
status_fd.flush()
|
|
last_report = now
|
|
|
|
finally:
|
|
sw.arm_transfer(on=False)
|
|
status_fd.write(f"\n Stopped. Total: {total_bytes:,} bytes\n")
|
|
if output_file and output_fd:
|
|
output_fd.close()
|
|
status_fd.write(f" Saved to: {output_file}\n")
|
|
|
|
|
|
def cmd_diseqc(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Send DiSEqC commands."""
|
|
if args.tone_burst is not None:
|
|
burst_val = {"A": 0, "a": 0, "B": 1, "b": 1}.get(args.tone_burst)
|
|
if burst_val is None:
|
|
print("Tone burst must be A or B")
|
|
sys.exit(1)
|
|
print(f"Sending tone burst: SEC_MINI_{args.tone_burst.upper()} (0x{burst_val:02X})")
|
|
sw.send_diseqc_tone_burst(burst_val)
|
|
print(" OK")
|
|
|
|
elif args.port is not None:
|
|
port = args.port
|
|
if port < 1 or port > 4:
|
|
print("DiSEqC 1.0 port must be 1-4")
|
|
sys.exit(1)
|
|
# DiSEqC 1.0 committed switch command:
|
|
# Framing=0xE0 (command from master, no reply, first tx)
|
|
# Address=0x10 (any switch)
|
|
# Command=0x38 (Write N0 - committed switches)
|
|
# Data=0xF0 | ((port-1) << 2) with option/position bits
|
|
# Bits: [7:4]=0xF (always), [3]=pol, [2]=band, [1:0]=port
|
|
# For simplicity, just switch port without changing pol/band bits
|
|
data_byte = 0xF0 | ((port - 1) << 2)
|
|
msg = bytes([0xE0, 0x10, 0x38, data_byte])
|
|
print(f"Sending DiSEqC 1.0: port {port}")
|
|
print(f" Message: {msg.hex(' ')}")
|
|
sw.send_diseqc_message(msg)
|
|
print(" OK")
|
|
|
|
elif args.raw:
|
|
raw_bytes = bytes(int(b, 16) for b in args.raw)
|
|
if len(raw_bytes) < 3 or len(raw_bytes) > 6:
|
|
print("Raw DiSEqC message must be 3-6 bytes")
|
|
sys.exit(1)
|
|
print(f"Sending raw DiSEqC: {raw_bytes.hex(' ')}")
|
|
sw.send_diseqc_message(raw_bytes)
|
|
print(" OK")
|
|
|
|
else:
|
|
print("Specify --port, --tone-burst, or --raw")
|
|
sys.exit(1)
|
|
|
|
|
|
def cmd_lnb(sw: SkyWalker1, args: argparse.Namespace) -> None:
|
|
"""Control LNB voltage and 22 kHz tone."""
|
|
did_something = False
|
|
|
|
# Ensure LNB power is on first
|
|
status = sw.get_config()
|
|
if not (status & 0x04):
|
|
print("Enabling LNB power supply...")
|
|
sw.start_intersil(on=True)
|
|
time.sleep(0.3)
|
|
|
|
if args.voltage is not None:
|
|
v = args.voltage
|
|
if v not in (13, 18):
|
|
print("Voltage must be 13 or 18")
|
|
sys.exit(1)
|
|
high = (v == 18)
|
|
print(f"Setting LNB voltage: {v}V")
|
|
sw.set_lnb_voltage(high)
|
|
did_something = True
|
|
|
|
if args.extra_volt:
|
|
print("Enabling +1V LNB boost")
|
|
sw.set_extra_voltage(True)
|
|
did_something = True
|
|
|
|
if args.tone is not None:
|
|
tone_on = args.tone.lower() in ("on", "1", "true", "yes")
|
|
print(f"22 kHz tone: {'ON' if tone_on else 'OFF'}")
|
|
sw.set_22khz_tone(tone_on)
|
|
did_something = True
|
|
|
|
if args.power is not None:
|
|
power_on = args.power.lower() in ("on", "1", "true", "yes")
|
|
if power_on:
|
|
print("Enabling LNB power supply")
|
|
sw.start_intersil(on=True)
|
|
else:
|
|
print("Disabling LNB power supply")
|
|
sw.start_intersil(on=False)
|
|
did_something = True
|
|
|
|
if not did_something:
|
|
# Just show current LNB state
|
|
status = sw.get_config()
|
|
print(f"LNB Status:")
|
|
print(f" Power: {'ON' if status & 0x04 else 'off'}")
|
|
print(f" Voltage: {'18V' if status & 0x20 else '13V'}")
|
|
print(f" 22 kHz: {'ON' if status & 0x10 else 'off'}")
|
|
print(f" Armed: {'YES' if status & 0x80 else 'no'}")
|
|
else:
|
|
# Read back config to confirm
|
|
time.sleep(0.1)
|
|
status = sw.get_config()
|
|
print(f"\nConfig: 0x{status:02X}")
|
|
print(f" Power: {'ON' if status & 0x04 else 'off'} "
|
|
f"Voltage: {'18V' if status & 0x20 else '13V'} "
|
|
f"22kHz: {'ON' if status & 0x10 else 'off'}")
|
|
|
|
|
|
# -- CLI --
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="Genpix SkyWalker-1 DVB-S tuning and streaming tool",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""\
|
|
examples:
|
|
%(prog)s status
|
|
%(prog)s tune 12520 27500 --pol H --band high
|
|
%(prog)s tune 12520 27500 --pol H --band high --mod qpsk --fec auto
|
|
%(prog)s stream -o capture.ts --duration 60
|
|
%(prog)s stream --stdout | vlc -
|
|
%(prog)s diseqc --port 1
|
|
%(prog)s diseqc --tone-burst A
|
|
%(prog)s diseqc --raw E0 10 38 F0
|
|
%(prog)s lnb --voltage 18 --tone on
|
|
""")
|
|
parser.add_argument('-v', '--verbose', action='store_true',
|
|
help="Show raw USB traffic")
|
|
parser.add_argument('--json', action='store_true',
|
|
help="Output machine-readable JSON where supported")
|
|
|
|
sub = parser.add_subparsers(dest='command')
|
|
|
|
# status
|
|
p_status = sub.add_parser('status', help="Show device config, FW version, signal status")
|
|
p_status.add_argument('--json', action='store_true', default=False,
|
|
help="Output machine-readable JSON")
|
|
|
|
# tune
|
|
p_tune = sub.add_parser('tune', help="Tune to a transponder")
|
|
p_tune.add_argument('freq', type=float,
|
|
help="Transponder downlink frequency in MHz (e.g. 12520)")
|
|
p_tune.add_argument('sr', type=int,
|
|
help="Symbol rate in ksps (e.g. 27500)")
|
|
p_tune.add_argument('--pol', choices=['H', 'V', 'L', 'R', 'h', 'v', 'l', 'r'],
|
|
help="Polarization: H/V (linear) or L/R (circular)")
|
|
p_tune.add_argument('--band', choices=['low', 'high'],
|
|
help="LNB band: low (tone off) or high (tone on)")
|
|
p_tune.add_argument('--lnb-lo', type=float, default=None,
|
|
help="LNB LO frequency in MHz (default: 9750 low, 10600 high)")
|
|
p_tune.add_argument('--mod', default='qpsk',
|
|
choices=list(MODULATIONS.keys()),
|
|
help="Modulation type (default: qpsk)")
|
|
p_tune.add_argument('--fec', default='auto',
|
|
help="FEC rate (default: auto). Options depend on modulation.")
|
|
p_tune.add_argument('--timeout', type=float, default=10,
|
|
help="Signal lock timeout in seconds (default: 10)")
|
|
p_tune.add_argument('--extra-volt', action='store_true',
|
|
help="Enable +1V LNB voltage boost for long cables")
|
|
p_tune.add_argument('--json', action='store_true', default=False,
|
|
help="Output machine-readable JSON")
|
|
|
|
# stream
|
|
p_stream = sub.add_parser('stream', help="Stream MPEG-2 TS data")
|
|
p_stream.add_argument('-o', '--output', help="Output file for TS data")
|
|
p_stream.add_argument('--stdout', action='store_true',
|
|
help="Write TS stream to stdout (pipe to vlc, ffmpeg, etc)")
|
|
p_stream.add_argument('--duration', type=float, default=None,
|
|
help="Capture duration in seconds (default: until CTRL-C)")
|
|
|
|
# diseqc
|
|
p_diseqc = sub.add_parser('diseqc', help="Send DiSEqC commands")
|
|
p_diseqc.add_argument('--port', type=int,
|
|
help="DiSEqC 1.0 switch port (1-4)")
|
|
p_diseqc.add_argument('--tone-burst', metavar='A|B',
|
|
help="Mini DiSEqC tone burst (A or B)")
|
|
p_diseqc.add_argument('--raw', nargs='+', metavar='HH',
|
|
help="Raw DiSEqC bytes in hex (e.g. E0 10 38 F0)")
|
|
|
|
# lnb
|
|
p_lnb = sub.add_parser('lnb', help="LNB voltage and tone control")
|
|
p_lnb.add_argument('--voltage', type=int, choices=[13, 18],
|
|
help="LNB voltage (13V or 18V)")
|
|
p_lnb.add_argument('--tone', help="22 kHz tone (on/off)")
|
|
p_lnb.add_argument('--extra-volt', action='store_true',
|
|
help="Enable +1V LNB voltage boost")
|
|
p_lnb.add_argument('--power', help="LNB power supply (on/off)")
|
|
|
|
return parser
|
|
|
|
|
|
def main():
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
|
|
if not args.command:
|
|
# Default to status if no subcommand
|
|
args.command = 'status'
|
|
args.json = getattr(args, 'json', False)
|
|
|
|
dispatch = {
|
|
'status': cmd_status,
|
|
'tune': cmd_tune,
|
|
'stream': cmd_stream,
|
|
'diseqc': cmd_diseqc,
|
|
'lnb': cmd_lnb,
|
|
}
|
|
|
|
handler = dispatch.get(args.command)
|
|
if handler is None:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
with SkyWalker1(verbose=args.verbose) as sw:
|
|
handler(sw, args)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|