#!/usr/bin/env python3 """ Genpix SkyWalker-1 DVB-S tuning and streaming tool. Controls the SkyWalker-1 USB DVB-S satellite receiver via vendor USB control transfers. Supports tuning, LNB control, DiSEqC switching, MPEG-2 transport stream capture, and signal monitoring. Hardware: Cypress FX2 (CY7C68013A) + Broadcom BCM4500 demodulator USB: VID 0x09C0, PID 0x0203, EP2 bulk IN for TS data """ import sys import struct import argparse import time import json import signal import os # Add tools directory to path for library import sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from skywalker_lib import ( SkyWalker1, VENDOR_ID, PRODUCT_ID, EP2_URB_SIZE, MODULATIONS, FEC_RATES, MOD_FEC_GROUP, LNB_LO_LOW, LNB_LO_HIGH, CONFIG_BITS, signal_bar, format_config_bits, ) import usb.core # -- Subcommand handlers -- def cmd_status(sw: SkyWalker1, args: argparse.Namespace) -> None: """Show device status, firmware version, signal info.""" print(f"Genpix SkyWalker-1 Status") print(f"{'=' * 50}") print(f"\nUSB: Bus {sw.dev.bus} Addr {sw.dev.address} " f"(VID 0x{VENDOR_ID:04X}, PID 0x{PRODUCT_ID:04X})") # Firmware version try: fw = sw.get_fw_version() print(f"FW: {fw['version']} (built {fw['date']})") except usb.core.USBError: print("FW: (read failed)") fw = None # Config status status = sw.get_config() print(f"\nConfig: 0x{status:02X}") bits = format_config_bits(status) for name, is_set in bits: state = "ON" if is_set else "off" print(f" [{state:>3}] {name}") # Signal lock and strength locked = sw.get_signal_lock() print(f"\nSignal Lock: {'LOCKED' if locked else 'no lock'}") if locked: sig = sw.get_signal_strength() print(f"SNR: {sig['snr_db']:.1f} dB (raw 0x{sig['snr_raw']:04X})") print(f"Quality: {signal_bar(sig['snr_pct'])}") if args.json: out = { "usb": {"bus": sw.dev.bus, "address": sw.dev.address}, "config": status, "config_bits": {field: bool(status & bit) for bit, (_name, field) in CONFIG_BITS.items()}, "locked": locked, } if fw: out["firmware"] = fw if locked: out["signal"] = sw.get_signal_strength() print(f"\n{json.dumps(out, indent=2)}") def cmd_tune(sw: SkyWalker1, args: argparse.Namespace) -> None: """Tune to a transponder.""" freq_mhz = args.freq sr_ksps = args.sr mod_name = args.mod fec_name = args.fec pol = args.pol.upper() if args.pol else None band = args.band # Resolve LNB LO if args.lnb_lo: lnb_lo = args.lnb_lo elif band == "high": lnb_lo = LNB_LO_HIGH else: lnb_lo = LNB_LO_LOW # Compute IF frequency if_mhz = freq_mhz - lnb_lo if_khz = int(if_mhz * 1000) if if_khz < 950000 or if_khz > 2150000: print(f"WARNING: IF frequency {if_mhz} MHz is outside 950-2150 MHz range") print(f" Downlink: {freq_mhz} MHz, LNB LO: {lnb_lo} MHz") if if_khz < 0: print(" IF is negative -- check your LNB LO frequency") sys.exit(1) # Resolve modulation if mod_name not in MODULATIONS: print(f"Unknown modulation: {mod_name}") print(f"Valid: {', '.join(MODULATIONS.keys())}") sys.exit(1) mod_index, mod_desc = MODULATIONS[mod_name] # Resolve FEC fec_group = MOD_FEC_GROUP[mod_name] fec_table = FEC_RATES[fec_group] if fec_name not in fec_table: print(f"Invalid FEC '{fec_name}' for {mod_desc}") print(f"Valid: {', '.join(fec_table.keys())}") sys.exit(1) fec_index = fec_table[fec_name] sr_sps = sr_ksps * 1000 print(f"Tuning SkyWalker-1") print(f"{'=' * 50}") print(f" Downlink: {freq_mhz} MHz") print(f" LNB LO: {lnb_lo} MHz") print(f" IF Frequency: {if_mhz} MHz ({if_khz} kHz)") print(f" Symbol Rate: {sr_ksps} ksps ({sr_sps} sps)") print(f" Modulation: {mod_desc} (index {mod_index})") print(f" FEC: {fec_name} (index {fec_index})") if pol: pol_desc = {"H": "Horizontal (18V)", "V": "Vertical (13V)", "L": "Left circular (18V)", "R": "Right circular (13V)"} print(f" Polarization: {pol_desc.get(pol, pol)}") if band: print(f" Band: {band} ({'22kHz on' if band == 'high' else '22kHz off'})") print() # Step 1: Check device status status = sw.get_config() print(f"[1/8] Config status: 0x{status:02X}") # Step 2: Boot demodulator if needed if not (status & 0x01): print("[2/8] Booting 8PSK demodulator...") sw.boot(on=True) time.sleep(0.5) status = sw.get_config() if not (status & 0x01): print(" FAILED: Device did not start") sys.exit(1) print(" OK") else: print("[2/8] Demodulator already running") # Step 3: Enable LNB power if needed if not (status & 0x04): print("[3/8] Enabling LNB power supply...") sw.start_intersil(on=True) time.sleep(0.3) status = sw.get_config() if not (status & 0x04): print(" FAILED: LNB power did not enable") sys.exit(1) print(" OK") else: print("[3/8] LNB power already on") # Step 4: Set LNB voltage (polarization) if pol: high_voltage = pol in ("H", "L") print(f"[4/8] Setting LNB voltage: {'18V' if high_voltage else '13V'}") sw.set_lnb_voltage(high_voltage) else: print("[4/8] LNB voltage: not changed (no --pol specified)") # Step 5: Extra voltage if requested if args.extra_volt: print("[5/8] Enabling +1V LNB boost") sw.set_extra_voltage(True) else: print("[5/8] Extra voltage: off") # Step 6: Set 22 kHz tone (band selection) if band: tone_on = (band == "high") print(f"[6/8] 22 kHz tone: {'ON' if tone_on else 'OFF'}") sw.set_22khz_tone(tone_on) else: print("[6/8] 22 kHz tone: not changed (no --band specified)") # Step 7: Send tune command print(f"[7/8] Sending TUNE_8PSK...") if sw.verbose: payload_hex = struct.pack(' None: """Stream MPEG-2 transport data to file or stdout.""" # Verify signal lock if not sw.get_signal_lock(): print("No signal lock -- tune to a transponder first") print(" Example: tune.py tune 12520 27500 --pol H --band high") sys.exit(1) output_file = None output_fd = None if args.stdout: output_fd = sys.stdout.buffer # Suppress all status output when piping status_fd = sys.stderr elif args.output: output_file = args.output output_fd = open(output_file, 'wb') status_fd = sys.stdout else: print("Specify -o FILE or --stdout") sys.exit(1) duration = args.duration total_bytes = 0 start_time = time.time() last_report = start_time running = True def stop_handler(signum, frame): nonlocal running running = False signal.signal(signal.SIGINT, stop_handler) signal.signal(signal.SIGTERM, stop_handler) status_fd.write(f"Streaming TS data") if output_file: status_fd.write(f" to {output_file}") if duration: status_fd.write(f" for {duration}s") status_fd.write("\n") status_fd.flush() # Arm the transfer sw.arm_transfer(on=True) status_fd.write(" Armed. Reading EP2...\n") status_fd.flush() try: while running: if duration and (time.time() - start_time) >= duration: break chunk = sw.read_stream(EP2_URB_SIZE, timeout=2000) if chunk: output_fd.write(chunk) total_bytes += len(chunk) now = time.time() if now - last_report >= 1.0: elapsed = now - start_time bitrate = (total_bytes * 8) / elapsed if elapsed > 0 else 0 if bitrate >= 1e6: rate_str = f"{bitrate / 1e6:.2f} Mbps" else: rate_str = f"{bitrate / 1e3:.1f} kbps" status_fd.write(f"\r {total_bytes:,} bytes {rate_str} " f"({elapsed:.0f}s) ") status_fd.flush() last_report = now finally: sw.arm_transfer(on=False) status_fd.write(f"\n Stopped. Total: {total_bytes:,} bytes\n") if output_file and output_fd: output_fd.close() status_fd.write(f" Saved to: {output_file}\n") def cmd_diseqc(sw: SkyWalker1, args: argparse.Namespace) -> None: """Send DiSEqC commands.""" if args.tone_burst is not None: burst_val = {"A": 0, "a": 0, "B": 1, "b": 1}.get(args.tone_burst) if burst_val is None: print("Tone burst must be A or B") sys.exit(1) print(f"Sending tone burst: SEC_MINI_{args.tone_burst.upper()} (0x{burst_val:02X})") sw.send_diseqc_tone_burst(burst_val) print(" OK") elif args.port is not None: port = args.port if port < 1 or port > 4: print("DiSEqC 1.0 port must be 1-4") sys.exit(1) # DiSEqC 1.0 committed switch command: # Framing=0xE0 (command from master, no reply, first tx) # Address=0x10 (any switch) # Command=0x38 (Write N0 - committed switches) # Data=0xF0 | ((port-1) << 2) with option/position bits # Bits: [7:4]=0xF (always), [3]=pol, [2]=band, [1:0]=port # For simplicity, just switch port without changing pol/band bits data_byte = 0xF0 | ((port - 1) << 2) msg = bytes([0xE0, 0x10, 0x38, data_byte]) print(f"Sending DiSEqC 1.0: port {port}") print(f" Message: {msg.hex(' ')}") sw.send_diseqc_message(msg) print(" OK") elif args.raw: raw_bytes = bytes(int(b, 16) for b in args.raw) if len(raw_bytes) < 3 or len(raw_bytes) > 6: print("Raw DiSEqC message must be 3-6 bytes") sys.exit(1) print(f"Sending raw DiSEqC: {raw_bytes.hex(' ')}") sw.send_diseqc_message(raw_bytes) print(" OK") else: print("Specify --port, --tone-burst, or --raw") sys.exit(1) def cmd_lnb(sw: SkyWalker1, args: argparse.Namespace) -> None: """Control LNB voltage and 22 kHz tone.""" did_something = False # Ensure LNB power is on first status = sw.get_config() if not (status & 0x04): print("Enabling LNB power supply...") sw.start_intersil(on=True) time.sleep(0.3) if args.voltage is not None: v = args.voltage if v not in (13, 18): print("Voltage must be 13 or 18") sys.exit(1) high = (v == 18) print(f"Setting LNB voltage: {v}V") sw.set_lnb_voltage(high) did_something = True if args.extra_volt: print("Enabling +1V LNB boost") sw.set_extra_voltage(True) did_something = True if args.tone is not None: tone_on = args.tone.lower() in ("on", "1", "true", "yes") print(f"22 kHz tone: {'ON' if tone_on else 'OFF'}") sw.set_22khz_tone(tone_on) did_something = True if args.power is not None: power_on = args.power.lower() in ("on", "1", "true", "yes") if power_on: print("Enabling LNB power supply") sw.start_intersil(on=True) else: print("Disabling LNB power supply") sw.start_intersil(on=False) did_something = True if not did_something: # Just show current LNB state status = sw.get_config() print(f"LNB Status:") print(f" Power: {'ON' if status & 0x04 else 'off'}") print(f" Voltage: {'18V' if status & 0x20 else '13V'}") print(f" 22 kHz: {'ON' if status & 0x10 else 'off'}") print(f" Armed: {'YES' if status & 0x80 else 'no'}") else: # Read back config to confirm time.sleep(0.1) status = sw.get_config() print(f"\nConfig: 0x{status:02X}") print(f" Power: {'ON' if status & 0x04 else 'off'} " f"Voltage: {'18V' if status & 0x20 else '13V'} " f"22kHz: {'ON' if status & 0x10 else 'off'}") # -- CLI -- def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Genpix SkyWalker-1 DVB-S tuning and streaming tool", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""\ examples: %(prog)s status %(prog)s tune 12520 27500 --pol H --band high %(prog)s tune 12520 27500 --pol H --band high --mod qpsk --fec auto %(prog)s stream -o capture.ts --duration 60 %(prog)s stream --stdout | vlc - %(prog)s diseqc --port 1 %(prog)s diseqc --tone-burst A %(prog)s diseqc --raw E0 10 38 F0 %(prog)s lnb --voltage 18 --tone on """) parser.add_argument('-v', '--verbose', action='store_true', help="Show raw USB traffic") parser.add_argument('--json', action='store_true', help="Output machine-readable JSON where supported") sub = parser.add_subparsers(dest='command') # status p_status = sub.add_parser('status', help="Show device config, FW version, signal status") p_status.add_argument('--json', action='store_true', default=False, help="Output machine-readable JSON") # tune p_tune = sub.add_parser('tune', help="Tune to a transponder") p_tune.add_argument('freq', type=float, help="Transponder downlink frequency in MHz (e.g. 12520)") p_tune.add_argument('sr', type=int, help="Symbol rate in ksps (e.g. 27500)") p_tune.add_argument('--pol', choices=['H', 'V', 'L', 'R', 'h', 'v', 'l', 'r'], help="Polarization: H/V (linear) or L/R (circular)") p_tune.add_argument('--band', choices=['low', 'high'], help="LNB band: low (tone off) or high (tone on)") p_tune.add_argument('--lnb-lo', type=float, default=None, help="LNB LO frequency in MHz (default: 9750 low, 10600 high)") p_tune.add_argument('--mod', default='qpsk', choices=list(MODULATIONS.keys()), help="Modulation type (default: qpsk)") p_tune.add_argument('--fec', default='auto', help="FEC rate (default: auto). Options depend on modulation.") p_tune.add_argument('--timeout', type=float, default=10, help="Signal lock timeout in seconds (default: 10)") p_tune.add_argument('--extra-volt', action='store_true', help="Enable +1V LNB voltage boost for long cables") p_tune.add_argument('--json', action='store_true', default=False, help="Output machine-readable JSON") # stream p_stream = sub.add_parser('stream', help="Stream MPEG-2 TS data") p_stream.add_argument('-o', '--output', help="Output file for TS data") p_stream.add_argument('--stdout', action='store_true', help="Write TS stream to stdout (pipe to vlc, ffmpeg, etc)") p_stream.add_argument('--duration', type=float, default=None, help="Capture duration in seconds (default: until CTRL-C)") # diseqc p_diseqc = sub.add_parser('diseqc', help="Send DiSEqC commands") p_diseqc.add_argument('--port', type=int, help="DiSEqC 1.0 switch port (1-4)") p_diseqc.add_argument('--tone-burst', metavar='A|B', help="Mini DiSEqC tone burst (A or B)") p_diseqc.add_argument('--raw', nargs='+', metavar='HH', help="Raw DiSEqC bytes in hex (e.g. E0 10 38 F0)") # lnb p_lnb = sub.add_parser('lnb', help="LNB voltage and tone control") p_lnb.add_argument('--voltage', type=int, choices=[13, 18], help="LNB voltage (13V or 18V)") p_lnb.add_argument('--tone', help="22 kHz tone (on/off)") p_lnb.add_argument('--extra-volt', action='store_true', help="Enable +1V LNB voltage boost") p_lnb.add_argument('--power', help="LNB power supply (on/off)") return parser def main(): parser = build_parser() args = parser.parse_args() if not args.command: # Default to status if no subcommand args.command = 'status' args.json = getattr(args, 'json', False) dispatch = { 'status': cmd_status, 'tune': cmd_tune, 'stream': cmd_stream, 'diseqc': cmd_diseqc, 'lnb': cmd_lnb, } handler = dispatch.get(args.command) if handler is None: parser.print_help() sys.exit(1) with SkyWalker1(verbose=args.verbose) as sw: handler(sw, args) if __name__ == '__main__': main()