From c7b5932cc04ebb2e2f538441fdeb01dff6ae4a31 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Wed, 11 Feb 2026 14:46:20 -0700 Subject: [PATCH] Add EEPROM flash tool, TS analyzer, DVB-S2 investigation, and tune.py bugfix New tools: - tools/eeprom_write.py: EEPROM firmware flash with backup, verify, dry-run - tools/ts_analyze.py: MPEG-2 transport stream analyzer with PAT/PMT parsing DVB-S2 investigation confirms BCM4500 hardware limitation (no LDPC/BCH silicon). Fix --json flag on tune.py subcommands (argparse parent/child scoping). All tools verified against live SkyWalker-1 hardware. --- dvb-s2-investigation.md | 255 ++++++++++++ tools/eeprom_write.py | 575 ++++++++++++++++++++++++++ tools/ts_analyze.py | 897 ++++++++++++++++++++++++++++++++++++++++ tools/tune.py | 6 +- 4 files changed, 1732 insertions(+), 1 deletion(-) create mode 100644 dvb-s2-investigation.md create mode 100755 tools/eeprom_write.py create mode 100755 tools/ts_analyze.py diff --git a/dvb-s2-investigation.md b/dvb-s2-investigation.md new file mode 100644 index 0000000..474c54d --- /dev/null +++ b/dvb-s2-investigation.md @@ -0,0 +1,255 @@ +# DVB-S2 Incompatibility Investigation: Genpix SkyWalker-1 + +## Definitive Conclusion + +**The SkyWalker-1's inability to receive DVB-S2 is a fundamental hardware limitation of the Broadcom BCM4500 demodulator silicon, not a firmware limitation.** The BCM4500 was designed and fabricated before the DVB-S2 standard was ratified (2005) and contains no LDPC or BCH decoder hardware. DVB-S2 requires LDPC (Low-Density Parity-Check) and BCH (Bose-Chaudhuri-Hocquenghem) forward error correction -- entirely different decoder architectures from the Viterbi/turbo/Reed-Solomon decoders present in the BCM4500. No firmware update could add DVB-S2 support to this hardware. + +Genpix eventually addressed this by releasing the SkyWalker-3, which replaced the entire demodulator subsystem (likely switching from Broadcom BCM4500 to STMicroelectronics STV0903), trading turbo-FEC support for DVB-S2 LDPC/BCH capability. + +--- + +## 1. Does the BCM4500 Silicon Support DVB-S2? + +**No. The BCM4500 has no LDPC or BCH decoder hardware.** + +### BCM4500 FEC Architecture (from datasheet) + +The BCM4500 contains exactly two FEC decoder paths: + +1. **Advanced Modulation Turbo FEC Decoder** -- an iterative turbo code decoder supporting: + - QPSK: rates 1/4, 1/2, 3/4 + - 8PSK: rates 2/3, 3/4, 5/6, 8/9 + - 16QAM: rate 3/4 + - Reed-Solomon outer code (t=10) after turbo decoding + +2. **Legacy DVB/DIRECTV/DCII-Compliant FEC Decoder** -- a concatenated coding chain: + - Inner: Viterbi decoder (convolutional code, rates 1/2 through 7/8) + - Outer: Reed-Solomon decoder + +The datasheet describes the signal path explicitly: "Optimized soft decisions are then fed into either a DVB/DIRECTV/DCII-compliant FEC decoder, or an advanced modulation turbo decoder." These are the only two paths. There is no third path for LDPC/BCH. + +### DVB-S2 FEC Architecture (for comparison) + +DVB-S2 (EN 302 307, ratified March 2005) mandates: + +- **Inner code**: LDPC (Low-Density Parity-Check) -- block lengths of 64,800 or 16,200 bits +- **Outer code**: BCH (Bose-Chaudhuri-Hocquenghem) +- **Code rates**: 1/4, 1/3, 2/5, 1/2, 3/5, 2/3, 3/4, 4/5, 5/6, 8/9, 9/10 + +LDPC decoding requires dedicated hardware: large block RAM for message passing (the LDPC block size is 64,800 bits, requiring significant on-chip storage), iterative belief propagation logic, and a fundamentally different decoder architecture from both Viterbi and turbo decoders. This cannot be emulated in firmware on the BCM4500's simple 8-bit on-chip microcontroller (used only for configuration, acquisition, and monitoring -- not data-path processing). + +### Evidence from the BCM4500 Datasheet + +Source: [BCM4500 Datasheet (DatasheetQ)](https://html.datasheetq.com/pdf-html/885700/Broadcom/2page/BCM4500.html), [BCM4500 Datasheet (Elcodis)](https://elcodis.com/parts/5786421/BCM4500.html) + +Key specifications confirming no DVB-S2 capability: +- Modulation: BPSK, QPSK, 8PSK, 16QAM (no mention of DVB-S2-specific constellations) +- FEC: "advanced modulation turbo FEC decoder" and "DVB/DIRECTV/DCII-compliant FEC decoder" +- Symbol rate: 256 Ksps to 30 Msps +- Package: 128-pin MQFP +- Supply: 3.3V I/O, 1.8V digital +- No mention of LDPC, BCH, or DVB-S2 anywhere in the datasheet + +--- + +## 2. What FEC Types Does the BCM4500 Actually Support? + +The BCM4500 supports three distinct FEC coding families, none of which are DVB-S2 compatible: + +### 2.1 Viterbi + Reed-Solomon (Legacy DVB-S / DSS / DCII) + +Used for standard DVB-S QPSK, DSS QPSK, DVB-S BPSK, and Digicipher II modes. + +**Firmware evidence** (from `skywalker1-hardware-reference.md`, Section 6.3): +- FEC lookup table at XRAM 0xE0F9, maximum index 7 +- Modulation dispatch sets XRAM 0xE0F6 = 0x00 (turbo flag OFF) +- XRAM 0xE0F5 = 0x10 (standard demod mode) +- The firmware FEC table supports rates: 1/2, 2/3, 3/4, 5/6, 7/8, auto, none + +**Windows driver evidence** (`SkyWalker1Control.h`, line 59): +```c +m_CurResource.ulInnerFecType = BDA_FEC_VITERBI; +``` + +**Windows driver evidence** (`SkyWalker1TunerFilter.cpp`, lines 1069-1070): +```c +//Only supported FEC VITERBI Type Error Correction +else if(ulNewInnerFecType == BDA_FEC_VITERBI) +``` + +The Windows BDA driver explicitly rejects any FEC type other than `BDA_FEC_VITERBI` and restricts code rates to 1/2, 2/3, 3/4, 5/6, 7/8 (lines 1112-1116). There is no `BDA_FEC_LDPC` handling. + +### 2.2 Turbo Codes (Proprietary 8PSK/QPSK/16QAM) + +Used for Turbo QPSK, Turbo 8PSK, and Turbo 16QAM -- the proprietary "advanced modulation" modes developed by Broadcom for EchoStar/Dish Network. + +**Firmware evidence** (from `tuning-protocol-analysis.md`, Section 3): +- Turbo QPSK: FEC table at XRAM 0xE0B7, max index 5 +- Turbo 8PSK: FEC table at XRAM 0xE0B1, max index 5 +- Turbo 16QAM: FEC table at XRAM 0xE0BC, max index 1 +- All turbo modes set XRAM 0xE0F6 = 0x01 (turbo flag ON) + +These turbo codes are proprietary to EchoStar/Broadcom. They are NOT the same as DVB-S2's LDPC codes, despite both being "advanced" coding schemes. The turbo decoder uses a fundamentally different iterative decoding algorithm (parallel concatenated convolutional codes) compared to LDPC (sparse parity-check matrix belief propagation). + +### 2.3 Digicipher II (Motorola/GI Proprietary) + +Used for DCII combo, split I/Q, and offset QPSK modes. + +**Firmware evidence**: FEC table at XRAM 0xE0BD, max index 9, with a fixed FEC code of 0xFC written to XRAM 0xE0EB. + +### Summary: FEC Architecture Comparison + +| Feature | BCM4500 (SkyWalker-1) | DVB-S2 Requirement | +|---------|----------------------|-------------------| +| Inner FEC | Viterbi (DVB-S) or Turbo (proprietary) | LDPC | +| Outer FEC | Reed-Solomon (t=10) | BCH | +| Block size | Convolutional (streaming) / Turbo (short blocks) | 64,800 or 16,200 bits | +| Decoder type | Trellis-based (Viterbi) or iterative turbo | Iterative belief propagation | +| Hardware IP | Hardwired Viterbi + turbo silicon | Requires dedicated LDPC engine | +| Standardization | DVB-S (ETSI EN 300 421) + proprietary turbo | DVB-S2 (ETSI EN 302 307) | + +--- + +## 3. What Would a DVB-S2-Capable Replacement Look Like? + +### Broadcom's Own DVB-S2 Chip Timeline + +Broadcom addressed DVB-S2 by designing entirely new silicon: + +| Chip | Year | DVB-S2? | Key Feature | Source | +|------|------|---------|-------------|--------| +| **BCM4500** | ~2003 | No | Turbo FEC + legacy Viterbi/RS | [Datasheet](https://elcodis.com/parts/5786421/BCM4500.html) | +| **BCM4501** | 2006 | **Yes** | First dual-tuner DVB-S2 receiver; LDPC/BCH decoder | [Broadcom](https://www.broadcom.com/products/broadband/set-top-box/bcm4501), [EDN](https://www.edn.com/bcm4501-dual-dvb-s2-advanced-modulation-satellite-receiver/) | +| **BCM4505** | 2007 | **Yes** | Single-channel, 65nm, LDPC/BCH + legacy | [Broadcom](https://www.broadcom.com/products/broadband/set-top-box/bcm4505) | +| **BCM4506** | 2007 | **Yes** | Dual-channel, 65nm, LDPC/BCH + legacy | [Broadcom](https://www.broadcom.com/products/broadband/set-top-box/bcm4506) | + +The BCM4501 datasheet explicitly states it includes "four 8-bit ADCs, all-digital variable rate QPSK/8PSK receivers, advanced modulation LDPC/BCH, and DVB-S-compliant forward error correction decoder." The addition of LDPC/BCH required new silicon -- it was not a firmware upgrade to the BCM4500. + +However, Broadcom restricted sales of these chips to set-top box manufacturers (EchoStar, DIRECTV) and did not sell to PC peripheral makers. This is why Genpix could not simply drop in a BCM4501. + +### What Genpix Actually Did: The SkyWalker-3 + +Genpix released the SkyWalker-3 as a DVB-S2-capable successor, using a completely different demodulator: + +| Feature | SkyWalker-1 (BCM4500) | SkyWalker-3 (likely STV0903) | +|---------|----------------------|---------------------------| +| DVB-S QPSK | Yes | Yes | +| DVB-S2 QPSK | No | Yes (rates 1/2 through 9/10) | +| DVB-S2 8PSK | No | Yes (rates 3/5 through 9/10) | +| Turbo QPSK | Yes | **No** | +| Turbo 8PSK | Yes | **No** | +| Turbo 16QAM | Yes | **No** | +| DCII | Yes | Yes | +| DSS | Yes | Yes | +| Symbol rate (DVB-S) | 256 Ksps - 30 Msps | 1 - 45 Msps | +| Symbol rate (DVB-S2) | N/A | 5 - 33 Msps | +| FEC inner (DVB-S) | Viterbi | Viterbi | +| FEC inner (DVB-S2) | N/A | LDPC | +| FEC outer (DVB-S2) | N/A | BCH | +| Demodulator | Broadcom BCM4500 | STMicroelectronics STV0903 (probable) | +| Tuner | Broadcom BCM3440 | STMicroelectronics STV6110 (probable) | + +Source: [Genpix SkyWalker-3 specifications](https://www.genpix-electronics.com/what-is-skywalker-3.html) + +The trade-off is visible: the SkyWalker-3 gained DVB-S2 but lost turbo-FEC support entirely. The turbo codes were proprietary to Broadcom/EchoStar, and the STMicroelectronics STV0903 demodulator does not implement them. This means the SkyWalker-3 cannot receive Dish Network's legacy turbo-coded 8PSK transmissions. + +--- + +## 4. Are There Any Hints of DVB-S2 Awareness in the Firmware? + +**No. There are zero references to DVB-S2, LDPC, or BCH in any firmware version or in the Windows driver source.** + +### Firmware Search Results + +Searched all three firmware versions (v2.06, Rev.2 v2.10, v2.13) via Ghidra disassembly and the following source files: + +- `SkyWalker1Control.h` -- defines modulation constants 0-9, none related to DVB-S2 +- `SkyWalker1CommonDef.h` -- device parameter structure uses `BDA_FEC_VITERBI` only +- `SkyWalker1TunerFilter.cpp` -- explicitly rejects non-QPSK modulation types and non-Viterbi FEC +- `SkyWalker1Control.cpp` -- hardcodes `ADV_MOD_DVB_QPSK` (value 0) in tune command byte 8 + +**Specific evidence of no DVB-S2 awareness:** + +1. **Modulation enum caps at 9** (`SkyWalker1Control.h`, lines 64-74): The modulation constants are `ADV_MOD_DVB_QPSK` (0) through `ADV_MOD_DVB_BPSK` (9). No value 10+ exists for DVB-S2 modes. + +2. **Firmware dispatch table has exactly 10 entries** (`tuning-protocol-analysis.md`, Section 3.1): The jump table at CODE:0873 contains 20 bytes (10 entries x 2 bytes). Modulation values >= 10 are rejected by the bounds check at CODE:0866. + +3. **FEC type is hardcoded to Viterbi** (`SkyWalker1TunerFilter.cpp`, line 1070): `else if(ulNewInnerFecType == BDA_FEC_VITERBI)` -- only Viterbi is accepted; any other FEC type returns `STATUS_INVALID_PARAMETER`. + +4. **Tune command hardcodes DVB-S QPSK** (`SkyWalker1Control.cpp`, line 292): `ucCommand[8] = ADV_MOD_DVB_QPSK;` -- the Windows driver always sends modulation type 0 (DVB-S QPSK) regardless of what the application requests. + +5. **No LDPC/BCH code rate values** exist in any FEC lookup table. The firmware's XRAM tables at 0xE0B1, 0xE0B7, 0xE0BC, 0xE0BD, and 0xE0F9 contain only Viterbi rates (1/2 through 7/8), turbo rates, and DCII combined codes. + +6. **No DVB-S2-specific register addresses** appear in the I2C traffic. The BCM4500 is programmed exclusively through indirect registers 0xA6/0xA7/0xA8 with page 0x00 -- a protocol specific to the BCM4500. DVB-S2 demodulators like the STV0903 use entirely different register maps. + +--- + +## 5. Could the GPIF Streaming Path Handle DVB-S2 Data Rates? + +**Yes -- the USB data path is not the bottleneck. The GPIF/USB 2.0 streaming architecture could handle DVB-S2 data rates if the demodulator supported them.** + +### Data Rate Analysis + +**DVB-S2 maximum useful bit rate** (from ETSI EN 302 307): +- Highest configuration: 8PSK, rate 9/10, 30 Msps = ~72 Mbps raw, ~58 Mbps net after FEC +- Typical HD transponder: 8PSK, rate 3/4, 27.5 Msps = ~44 Mbps net + +**GPIF/USB 2.0 throughput capacity:** +- USB 2.0 High Speed bulk: 480 Mbps theoretical, ~35 MB/s (~280 Mbps) practical +- GPIF engine: 48 MHz clock, 8-bit data path = 48 MB/s (384 Mbps) theoretical +- EP2 FIFO: 4x buffer with AUTOIN, 7 URBs x 8KB on host side + +**Current DVB-S usage** (from `gpif-streaming-analysis.md`, Section 15): +- "Transport stream rate: BCM4500 outputs at the satellite symbol rate (up to 30 Msps), but the effective byte rate depends on modulation and coding. USB 2.0 High Speed bulk bandwidth (480 Mbps theoretical, ~35 MB/s practical) is more than sufficient for DVB-S transport streams (typically 1-5 MB/s)." + +**Assessment**: Even at the theoretical maximum DVB-S2 data rate of ~58 Mbps (~7.25 MB/s), the USB 2.0 bulk streaming path has approximately 5x headroom. The GPIF engine configuration (IFCONFIG=0xEE, EP2FIFOCFG=0x0C, FLOWSTATEA with FSEN enabled) is identical across all firmware versions and provides a fully hardware-managed pipeline that would not require modification. + +The 8-bit parallel transport stream interface between the demodulator and FX2 is also sufficient -- DVB-S2 uses the same MPEG-TS output format (188-byte packets) as DVB-S. The GPIF waveform and AUTOIN configuration would work unchanged. + +**However**, this is a moot point. The bottleneck is the demodulator silicon, not the data path. Even if you physically replaced the BCM4500 with a DVB-S2-capable chip, you would need to rewrite the entire FX2 firmware (I2C register protocol, tuning sequence, modulation dispatch, FEC configuration) since every DVB-S2 demodulator uses a completely different register interface. + +--- + +## Summary + +| Question | Answer | +|----------|--------| +| Is DVB-S2 a hardware or firmware limitation? | **Hardware** -- the BCM4500 has no LDPC/BCH decoder logic | +| Could a firmware update add DVB-S2? | **No** -- LDPC decoding requires dedicated silicon | +| Which Broadcom chip first added LDPC? | **BCM4501** (2006), followed by BCM4505/BCM4506 (2007) | +| Any DVB-S2 hints in firmware/driver? | **None** -- zero references to LDPC, BCH, or DVB-S2 | +| Is the USB data path a bottleneck? | **No** -- USB 2.0 bulk has ~5x headroom for DVB-S2 rates | +| What did Genpix do for DVB-S2? | Released SkyWalker-3 with a different demodulator (likely STV0903) | +| What was lost in the transition? | Turbo-FEC support (proprietary to Broadcom/EchoStar) | + +--- + +## Sources + +### Datasheets and Product Pages +- [BCM4500 Datasheet (DatasheetQ)](https://html.datasheetq.com/pdf-html/885700/Broadcom/2page/BCM4500.html) +- [BCM4500 Datasheet (Elcodis)](https://elcodis.com/parts/5786421/BCM4500.html) +- [BCM4500 Datasheet (AllDatasheet)](https://www.alldatasheet.com/datasheet-pdf/pdf/85246/BOARDCOM/BCM4500.html) +- [BCM4501 Product Page (Broadcom)](https://www.broadcom.com/products/broadband/set-top-box/bcm4501) +- [BCM4501 (EDN)](https://www.edn.com/bcm4501-dual-dvb-s2-advanced-modulation-satellite-receiver/) +- [BCM4505 Product Page (Broadcom)](https://www.broadcom.com/products/broadband/set-top-box/bcm4505) +- [BCM4506 Product Page (Broadcom)](https://www.broadcom.com/products/broadband/set-top-box/bcm4506) +- [BCM4505/BCM4506 Announcement (RTTNews)](https://www.rttnews.com/380136/broadcom-launches-bcm4505-and-bcm4506-two-fully-integrated-single-chip-single-and-dual-channel-multi-format-satellite-receivers-quick-facts.aspx) + +### Genpix Products +- [Genpix SkyWalker-3 Specifications](https://www.genpix-electronics.com/what-is-skywalker-3.html) +- [Genpix Official Site](https://www.genpix-electronics.com/index.php?act=viewDoc&docId=9) + +### Community and Technical Discussions +- [LinuxTV mailing list: BCM4500 and DVB-S2 distinction](https://www.mail-archive.com/linux-dvb@linuxtv.org/msg24808.html) +- [SatelliteGuys: Turbo 8PSK card reverse engineering](https://www.satelliteguys.us/xen/threads/another-turbo-8psk-card.246879/) +- [SatelliteGuys: Genpix SkyWalker-1 discussion](https://www.satelliteguys.us/xen/threads/genpix-skywalker-1.214196/) + +### Reverse Engineering Analysis (This Project) +- `skywalker1-hardware-reference.md` -- Sections 1, 6, 7: Overview, tuning protocol, BCM4500 interface +- `tuning-protocol-analysis.md` -- Section 3: Modulation dispatch table, FEC lookup tables +- `gpif-streaming-analysis.md` -- Sections 13-15: GPIF throughput and data path analysis +- `rev2-deep-analysis.md` -- Complete Rev.2 function inventory +- `SkyWalker1Control.h` -- Modulation mode constants (lines 63-74), FEC/command definitions +- `SkyWalker1TunerFilter.cpp` -- SetInnerFecType() Viterbi-only restriction (lines 1058-1086) +- `SkyWalker1Control.cpp` -- TuneDevice() hardcoded ADV_MOD_DVB_QPSK (line 292) diff --git a/tools/eeprom_write.py b/tools/eeprom_write.py new file mode 100755 index 0000000..e307f38 --- /dev/null +++ b/tools/eeprom_write.py @@ -0,0 +1,575 @@ +#!/usr/bin/env python3 +""" +Genpix SkyWalker-1 EEPROM firmware flash tool. + +Writes C2-format firmware images to the Cypress FX2 boot EEPROM via +the I2C_WRITE vendor command. + +Protocol: + I2C_WRITE (0x83): wValue=0x51, wIndex=offset, data=bytes + I2C_READ (0x84): wValue=0x51, wIndex=offset, length=chunk_size + +The EEPROM uses Cypress C2 IIC boot format: + - Header: C2 VID_L VID_H PID_L PID_H DID_L DID_H CONFIG + - Records: LEN_H LEN_L ADDR_H ADDR_L DATA[LEN] + - End: 80 01 ENTRY_H ENTRY_L (reset vector) + +WARNING: Flashing incorrect firmware can brick the device. The FX2 +boots from this EEPROM on power-up -- a corrupted image means the +device will not enumerate on USB until the EEPROM is reprogrammed +with an external programmer or the FX2 boot ROM's A0 vendor request. +""" +import usb.core, usb.util, sys, struct, time, os + +VENDOR_ID = 0x09C0 +PRODUCT_ID = 0x0203 +I2C_WRITE = 0x83 +I2C_READ = 0x84 +EEPROM_SLAVE = 0x51 + +# EEPROM page write parameters +PAGE_SIZE = 16 # Conservative page size for 24Cxx EEPROMs +WRITE_CYCLE_MS = 10 # Max internal write cycle time per page +MAX_EEPROM_SIZE = 16384 # 128Kbit / 16KB max addressable via wIndex + + +def find_device(): + dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID) + if dev is None: + print("SkyWalker-1 not found") + sys.exit(1) + return dev + + +def detach_driver(dev): + intf_num = None + for cfg in dev: + for intf in cfg: + if dev.is_kernel_driver_active(intf.bInterfaceNumber): + try: + dev.detach_kernel_driver(intf.bInterfaceNumber) + intf_num = intf.bInterfaceNumber + except usb.core.USBError as e: + print(f"Cannot detach driver: {e}") + print("Try: sudo modprobe -r dvb_usb_gp8psk") + sys.exit(1) + try: + dev.set_configuration() + except: + pass + return intf_num + + +def eeprom_read(dev, offset, length=64): + """Read from EEPROM at given offset.""" + return dev.ctrl_transfer( + usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN, + I2C_READ, EEPROM_SLAVE, offset, length, 2000) + + +def eeprom_write(dev, offset, data): + """Write data to EEPROM at given offset. Caller handles page alignment.""" + return dev.ctrl_transfer( + usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT, + I2C_WRITE, EEPROM_SLAVE, offset, data, 2000) + + +def eeprom_read_all(dev, size, label="Reading"): + """Read entire EEPROM contents up to size bytes.""" + chunk_size = 64 + data = bytearray() + for offset in range(0, size, chunk_size): + remaining = min(chunk_size, size - offset) + chunk = eeprom_read(dev, offset, remaining) + if chunk is None: + print(f"\n Read failed at offset 0x{offset:04X}") + return None + data.extend(bytes(chunk)) + if offset % 1024 == 0: + pct = offset * 100 // size + print(f"\r {label}: 0x{offset:04X} / 0x{size:04X} [{pct:3d}%]", + end="", flush=True) + print(f"\r {label}: 0x{size:04X} / 0x{size:04X} [100%] ") + return data + + +def parse_c2_header(data): + """Parse Cypress C2 boot EEPROM header. Returns dict or None.""" + if len(data) < 8: + return None + if data[0] != 0xC2: + return None + + vid = data[2] << 8 | data[1] + pid = data[4] << 8 | data[3] + did = data[6] << 8 | data[5] + config = data[7] + + return {"vid": vid, "pid": pid, "did": did, "config": config} + + +def parse_records(data, offset=8): + """Parse C2 load records from EEPROM data.""" + records = [] + while offset < len(data) - 4: + rec_len = (data[offset] << 8) | data[offset + 1] + rec_addr = (data[offset + 2] << 8) | data[offset + 3] + + if rec_len == 0x8001: + records.append({ + "type": "end", + "entry_point": rec_addr, + "offset": offset + }) + break + elif rec_len == 0 or rec_len > 0x4000: + records.append({ + "type": "invalid", + "raw_len": rec_len, + "offset": offset + }) + break + + rec_data = data[offset + 4:offset + 4 + rec_len] + records.append({ + "type": "data", + "length": rec_len, + "load_addr": rec_addr, + "data": bytes(rec_data), + "offset": offset + }) + offset += 4 + rec_len + + return records + + +def print_c2_header(header, prefix=" "): + """Display parsed C2 header fields.""" + print(f"{prefix}Format: C2 (Large EEPROM, code loads to internal RAM)") + print(f"{prefix}VID: 0x{header['vid']:04X}" + f" {'(Genpix)' if header['vid'] == 0x09C0 else ''}") + print(f"{prefix}PID: 0x{header['pid']:04X}" + f" {'(SkyWalker-1)' if header['pid'] == 0x0203 else ''}") + print(f"{prefix}DID: 0x{header['did']:04X}") + print(f"{prefix}Config: 0x{header['config']:02X}", end="") + + config_flags = [] + if header["config"] & 0x40: + config_flags.append("400kHz I2C") + if header["config"] & 0x04: + config_flags.append("disconnect") + if config_flags: + print(f" ({', '.join(config_flags)})") + else: + print() + + +def print_c2_records(records, prefix=" "): + """Display parsed C2 load records.""" + total_code = 0 + for i, rec in enumerate(records): + if rec["type"] == "data": + end_addr = rec["load_addr"] + rec["length"] - 1 + preview = rec["data"][:8].hex(' ') + print(f"{prefix}[{i}] {rec['length']:5d} bytes -> " + f"0x{rec['load_addr']:04X}-0x{end_addr:04X} " + f"[{preview}...]") + total_code += rec["length"] + elif rec["type"] == "end": + print(f"{prefix}[{i}] END MARKER -> entry point: " + f"0x{rec['entry_point']:04X}") + else: + print(f"{prefix}[{i}] INVALID (raw_len=0x{rec['raw_len']:04X}) " + f"at EEPROM offset 0x{rec['offset']:04X}") + + data_recs = [r for r in records if r["type"] == "data"] + print(f"\n{prefix}Total firmware: {total_code} bytes in " + f"{len(data_recs)} segments") + end_recs = [r for r in records if r["type"] == "end"] + if end_recs: + print(f"{prefix}Entry point: 0x{end_recs[0]['entry_point']:04X} " + f"(LJMP target after boot)") + + +def validate_c2_image(data, label="image"): + """Validate a C2 firmware image. Returns (header, records) or exits.""" + if len(data) < 12: + print(f" {label}: too small ({len(data)} bytes, need at least 12)") + return None, None + + if data[0] != 0xC2: + print(f" {label}: not a C2 image (first byte: 0x{data[0]:02X}, " + f"expected 0xC2)") + return None, None + + header = parse_c2_header(data) + if header is None: + print(f" {label}: failed to parse C2 header") + return None, None + + records = parse_records(data) + if not records: + print(f" {label}: no load records found") + return None, None + + end_recs = [r for r in records if r["type"] == "end"] + invalid_recs = [r for r in records if r["type"] == "invalid"] + if not end_recs: + print(f" {label}: WARNING -- no end marker found") + if invalid_recs: + print(f" {label}: WARNING -- {len(invalid_recs)} invalid record(s)") + + return header, records + + +def cmd_info(args): + """Parse and display C2 header info from a .bin file.""" + if not os.path.exists(args.file): + print(f"File not found: {args.file}") + sys.exit(1) + + with open(args.file, 'rb') as f: + data = f.read() + + print(f"C2 Image: {args.file}") + print(f"File size: {len(data)} bytes") + print("=" * 40) + + header, records = validate_c2_image(data, args.file) + if header is None: + sys.exit(1) + + print("\nHeader:") + print_c2_header(header) + + print("\nLoad Records:") + print_c2_records(records) + + # Compute EEPROM usage (header + record headers + data + end marker) + if records: + last = records[-1] + if last["type"] == "end": + eeprom_end = last["offset"] + 4 + elif last["type"] == "data": + eeprom_end = last["offset"] + 4 + last["length"] + else: + eeprom_end = last["offset"] + print(f"\n EEPROM footprint: {eeprom_end} bytes " + f"(0x{eeprom_end:04X})") + + +def cmd_backup(args): + """Dump current EEPROM contents to a file.""" + print("Genpix SkyWalker-1 EEPROM Backup") + print("=" * 40) + + dev = find_device() + print(f"Found device: Bus {dev.bus} Addr {dev.address}") + intf = detach_driver(dev) + + try: + size = args.max_size + print(f"\nReading EEPROM ({size} bytes)...") + data = eeprom_read_all(dev, size) + if data is None: + print("Backup failed: read error") + sys.exit(1) + + with open(args.output, 'wb') as f: + f.write(data) + print(f" Saved to: {args.output}") + + # Show header info + header = parse_c2_header(data) + if header: + print("\nHeader:") + print_c2_header(header) + finally: + if intf is not None: + try: + usb.util.release_interface(dev, intf) + dev.attach_kernel_driver(intf) + print("\nRe-attached kernel driver") + except: + print("\nNote: run 'sudo modprobe dvb_usb_gp8psk' to reload") + + +def cmd_verify(args): + """Compare a .bin file against current EEPROM contents.""" + if not os.path.exists(args.file): + print(f"File not found: {args.file}") + sys.exit(1) + + with open(args.file, 'rb') as f: + image = f.read() + + print("Genpix SkyWalker-1 EEPROM Verify") + print("=" * 40) + + # Validate the image first + header, records = validate_c2_image(image, args.file) + if header is None: + sys.exit(1) + + print(f"\nImage: {args.file} ({len(image)} bytes)") + print_c2_header(header) + + dev = find_device() + print(f"\nFound device: Bus {dev.bus} Addr {dev.address}") + intf = detach_driver(dev) + + try: + print(f"\nReading EEPROM ({len(image)} bytes)...") + eeprom = eeprom_read_all(dev, len(image), label="Verify") + if eeprom is None: + print("Verify failed: read error") + sys.exit(1) + + # Compare byte-by-byte + mismatches = [] + for i in range(len(image)): + if i < len(eeprom) and image[i] != eeprom[i]: + mismatches.append(i) + + if not mismatches: + print(f"\n MATCH -- EEPROM contents match {args.file}") + else: + print(f"\n MISMATCH -- {len(mismatches)} byte(s) differ:") + for off in mismatches[:32]: + exp = image[off] + got = eeprom[off] if off < len(eeprom) else 0xFF + print(f" 0x{off:04X}: expected 0x{exp:02X}, " + f"got 0x{got:02X}") + if len(mismatches) > 32: + print(f" ... and {len(mismatches) - 32} more") + sys.exit(1) + finally: + if intf is not None: + try: + usb.util.release_interface(dev, intf) + dev.attach_kernel_driver(intf) + print("\nRe-attached kernel driver") + except: + print("\nNote: run 'sudo modprobe dvb_usb_gp8psk' to reload") + + +def cmd_flash(args): + """Write a C2-format .bin file to the EEPROM.""" + if not os.path.exists(args.file): + print(f"File not found: {args.file}") + sys.exit(1) + + with open(args.file, 'rb') as f: + image = f.read() + + print("Genpix SkyWalker-1 EEPROM Flash") + print("=" * 40) + print() + print(" *** FIRMWARE FLASH -- READ CAREFULLY ***") + print(" Writing bad firmware will brick the device.") + print(" The SkyWalker-1 boots from this EEPROM on power-up.") + print(" A corrupted image = no USB enumeration.") + print() + + # Validate input image + img_header, img_records = validate_c2_image(image, args.file) + if img_header is None: + sys.exit(1) + + print(f"Image: {args.file} ({len(image)} bytes)") + print_c2_header(img_header) + + # Size sanity check + if len(image) > MAX_EEPROM_SIZE: + print(f"\n Image too large: {len(image)} bytes " + f"(max {MAX_EEPROM_SIZE})") + sys.exit(1) + + if len(image) < 12: + print(f"\n Image too small: {len(image)} bytes") + sys.exit(1) + + # Connect to device + dev = find_device() + print(f"\nFound device: Bus {dev.bus} Addr {dev.address}") + intf = detach_driver(dev) + + try: + # Check VID/PID against the connected device + if not args.force: + if img_header["vid"] != VENDOR_ID: + print(f"\n VID mismatch: image has 0x{img_header['vid']:04X}," + f" device is 0x{VENDOR_ID:04X}") + print(" Use --force to override") + sys.exit(1) + if img_header["pid"] != PRODUCT_ID: + print(f"\n PID mismatch: image has 0x{img_header['pid']:04X}," + f" device is 0x{PRODUCT_ID:04X}") + print(" Use --force to override") + sys.exit(1) + elif img_header["vid"] != VENDOR_ID or img_header["pid"] != PRODUCT_ID: + print(f"\n WARNING: VID/PID mismatch (--force active)") + print(f" Image: VID=0x{img_header['vid']:04X} " + f"PID=0x{img_header['pid']:04X}") + print(f" Device: VID=0x{VENDOR_ID:04X} PID=0x{PRODUCT_ID:04X}") + + # Backup current EEPROM + if not args.no_backup: + ts = time.strftime("%Y%m%d_%H%M%S") + backup_file = f"eeprom_backup_{ts}.bin" + print(f"\nBacking up current EEPROM to {backup_file}...") + backup = eeprom_read_all(dev, MAX_EEPROM_SIZE, label="Backup") + if backup is None: + print(" Backup failed: read error. Aborting.") + sys.exit(1) + with open(backup_file, 'wb') as f: + f.write(backup) + print(f" Backup saved: {backup_file} ({len(backup)} bytes)") + + # Show what's currently on the EEPROM + cur_header = parse_c2_header(backup) + if cur_header: + print("\n Current EEPROM:") + print_c2_header(cur_header, prefix=" ") + else: + print("\n Skipping backup (--no-backup)") + + # Dry-run stops here + if args.dry_run: + print("\n DRY RUN -- would write {0} bytes in {1} pages".format( + len(image), (len(image) + PAGE_SIZE - 1) // PAGE_SIZE)) + print(" No changes made.") + return + + # Final confirmation + print(f"\nAbout to write {len(image)} bytes to EEPROM...") + print(" Press Ctrl+C within 3 seconds to abort.") + try: + for i in range(3, 0, -1): + print(f"\r Writing in {i}... ", end="", flush=True) + time.sleep(1) + print("\r Writing now... ") + except KeyboardInterrupt: + print("\n Aborted.") + return + + # Write in page-sized chunks + total_pages = (len(image) + PAGE_SIZE - 1) // PAGE_SIZE + write_errors = 0 + + for page_num in range(total_pages): + offset = page_num * PAGE_SIZE + end = min(offset + PAGE_SIZE, len(image)) + chunk = image[offset:end] + + pct = (page_num + 1) * 100 // total_pages + print(f"\r Write: 0x{offset:04X} / 0x{len(image):04X} " + f"[{pct:3d}%]", end="", flush=True) + + try: + written = eeprom_write(dev, offset, chunk) + if written != len(chunk): + print(f"\n Short write at 0x{offset:04X}: " + f"sent {len(chunk)}, wrote {written}") + write_errors += 1 + except usb.core.USBError as e: + print(f"\n Write error at 0x{offset:04X}: {e}") + write_errors += 1 + + # Wait for EEPROM internal write cycle + time.sleep(WRITE_CYCLE_MS / 1000.0) + + print(f"\r Write: 0x{len(image):04X} / 0x{len(image):04X} " + f"[100%] ") + + if write_errors: + print(f"\n WARNING: {write_errors} write error(s) occurred") + + # Verify by reading back + print(f"\nVerifying ({len(image)} bytes)...") + verify = eeprom_read_all(dev, len(image), label="Verify") + if verify is None: + print(" Verify failed: read error") + print(" *** EEPROM STATE UNKNOWN -- check before power cycling ***") + sys.exit(1) + + mismatches = [] + for i in range(len(image)): + if i < len(verify) and image[i] != verify[i]: + mismatches.append(i) + + if not mismatches: + print(f"\n VERIFIED -- all {len(image)} bytes match") + print(" Flash complete. Power cycle the device to boot new firmware.") + else: + print(f"\n VERIFY FAILED -- {len(mismatches)} byte(s) differ:") + for off in mismatches[:16]: + exp = image[off] + got = verify[off] if off < len(verify) else 0xFF + print(f" 0x{off:04X}: wrote 0x{exp:02X}, " + f"read 0x{got:02X}") + if len(mismatches) > 16: + print(f" ... and {len(mismatches) - 16} more") + print("\n *** EEPROM CONTENTS DO NOT MATCH IMAGE ***") + print(" Do NOT power cycle until this is resolved.") + sys.exit(1) + + finally: + if intf is not None: + try: + usb.util.release_interface(dev, intf) + dev.attach_kernel_driver(intf) + print("\nRe-attached kernel driver") + except: + print("\nNote: run 'sudo modprobe dvb_usb_gp8psk' to reload") + + +def main(): + import argparse + parser = argparse.ArgumentParser( + description="SkyWalker-1 EEPROM firmware flash tool") + sub = parser.add_subparsers(dest='command', required=True) + + # info + p_info = sub.add_parser('info', + help='Parse and display C2 header from a .bin file') + p_info.add_argument('file', help='C2 firmware image (.bin)') + + # backup + p_backup = sub.add_parser('backup', + help='Dump current EEPROM to a file') + p_backup.add_argument('-o', '--output', default='skywalker1_eeprom.bin', + help='Output file (default: skywalker1_eeprom.bin)') + p_backup.add_argument('--max-size', type=int, default=MAX_EEPROM_SIZE, + help=f'Bytes to read (default: {MAX_EEPROM_SIZE})') + + # verify + p_verify = sub.add_parser('verify', + help='Compare .bin file against EEPROM') + p_verify.add_argument('file', help='C2 firmware image (.bin)') + + # flash + p_flash = sub.add_parser('flash', + help='Write C2 firmware image to EEPROM') + p_flash.add_argument('file', help='C2 firmware image (.bin)') + p_flash.add_argument('--dry-run', action='store_true', + help='Show what would happen without writing') + p_flash.add_argument('--no-backup', action='store_true', + help='Skip pre-flash EEPROM backup') + p_flash.add_argument('--force', action='store_true', + help='Override VID/PID mismatch check') + + args = parser.parse_args() + + if args.command == 'info': + cmd_info(args) + elif args.command == 'backup': + cmd_backup(args) + elif args.command == 'verify': + cmd_verify(args) + elif args.command == 'flash': + cmd_flash(args) + + +if __name__ == '__main__': + main() diff --git a/tools/ts_analyze.py b/tools/ts_analyze.py new file mode 100755 index 0000000..004c50d --- /dev/null +++ b/tools/ts_analyze.py @@ -0,0 +1,897 @@ +#!/usr/bin/env python3 +""" +Genpix SkyWalker-1 MPEG-2 Transport Stream analyzer. + +Parses and analyzes 188-byte MPEG-2 TS packets from .ts files captured +by tune.py, stdin pipes, or any standard transport stream source. + +Supports PID analysis, PAT/PMT parsing, continuity counter checking, +scrambling detection, hex packet dumps, and live stream monitoring. + +Reference: ISO/IEC 13818-1 (MPEG-2 Systems) +TS packet: 188 bytes, sync byte 0x47 +""" + +import sys +import struct +import argparse +import time +import os + +TS_PACKET_SIZE = 188 +TS_SYNC_BYTE = 0x47 + +# Well-known PID assignments (ISO 13818-1 Table 2-3) +KNOWN_PIDS = { + 0x0000: "PAT", + 0x0001: "CAT", + 0x0002: "TSDT", + 0x0010: "NIT/ST", + 0x0011: "SDT/BAT/ST", + 0x0012: "EIT/ST", + 0x0013: "RST/ST", + 0x0014: "TDT/TOT/ST", + 0x001E: "DIT", + 0x001F: "SIT", + 0x1FFF: "Null", +} + +# Stream type identifiers (ISO 13818-1 Table 2-36) +STREAM_TYPES = { + 0x00: "Reserved", + 0x01: "MPEG-1 Video (11172-2)", + 0x02: "MPEG-2 Video (13818-2)", + 0x03: "MPEG-1 Audio (11172-3)", + 0x04: "MPEG-2 Audio (13818-3)", + 0x05: "Private Sections (13818-1)", + 0x06: "PES Private Data", + 0x07: "MHEG", + 0x08: "DSM-CC", + 0x09: "H.222.1", + 0x0A: "DSM-CC Type A", + 0x0B: "DSM-CC Type B", + 0x0C: "DSM-CC Type C", + 0x0D: "DSM-CC Type D", + 0x0E: "Auxiliary", + 0x0F: "MPEG-2 AAC Audio", + 0x10: "MPEG-4 Visual", + 0x11: "MPEG-4 AAC Audio (LATM)", + 0x15: "Metadata in PES", + 0x1B: "H.264/AVC Video", + 0x24: "H.265/HEVC Video", + 0x42: "AVS Video", + 0x81: "AC-3 Audio (ATSC)", + 0x82: "DTS Audio", + 0x83: "Dolby TrueHD", + 0x84: "Dolby Digital Plus (EAC-3)", + 0x85: "DTS-HD", + 0x86: "DTS-HD Master Audio", + 0x87: "EAC-3 Audio (ATSC)", + 0xEA: "VC-1 Video", +} + + +class TSPacket: + """Parsed MPEG-2 transport stream packet header.""" + + __slots__ = ( + 'sync', 'tei', 'pusi', 'priority', 'pid', + 'scrambling', 'adaptation', 'continuity', + 'adaptation_field', 'payload', 'raw', + ) + + def __init__(self, data: bytes): + if len(data) != TS_PACKET_SIZE: + raise ValueError(f"Packet must be {TS_PACKET_SIZE} bytes, got {len(data)}") + + self.raw = data + self.sync = data[0] + self.tei = bool(data[1] & 0x80) + self.pusi = bool(data[1] & 0x40) + self.priority = bool(data[1] & 0x20) + self.pid = ((data[1] & 0x1F) << 8) | data[2] + self.scrambling = (data[3] >> 6) & 0x03 + self.adaptation = (data[3] >> 4) & 0x03 + self.continuity = data[3] & 0x0F + + # Parse adaptation field and payload boundaries + offset = 4 + self.adaptation_field = None + self.payload = None + + if self.adaptation & 0x02: + # Adaptation field present + if offset < TS_PACKET_SIZE: + af_len = data[offset] + af_end = offset + 1 + af_len + if af_end <= TS_PACKET_SIZE: + self.adaptation_field = data[offset:af_end] + offset = af_end + + if self.adaptation & 0x01: + # Payload present + if offset < TS_PACKET_SIZE: + self.payload = data[offset:] + + def has_pcr(self) -> bool: + """Check if adaptation field contains a PCR.""" + if self.adaptation_field is None or len(self.adaptation_field) < 7: + return False + af_flags = self.adaptation_field[1] if len(self.adaptation_field) > 1 else 0 + return bool(af_flags & 0x10) + + def get_pcr(self) -> int: + """Extract PCR value (in 27 MHz clock ticks). Returns -1 if no PCR.""" + if not self.has_pcr(): + return -1 + # PCR is 6 bytes starting at adaptation_field[2] + af = self.adaptation_field + pcr_base = (af[2] << 25) | (af[3] << 17) | (af[4] << 9) | \ + (af[5] << 1) | ((af[6] >> 7) & 0x01) + pcr_ext = ((af[6] & 0x01) << 8) | af[7] + return pcr_base * 300 + pcr_ext + + +class TSReader: + """Reads TS packets from a file or stream, handling sync alignment.""" + + def __init__(self, source, verbose: bool = False): + self.source = source + self.verbose = verbose + self.offset = 0 + self._sync_offset = -1 + + def find_sync(self, data: bytes) -> int: + """Find sync byte alignment in raw data. Returns byte offset or -1.""" + # Need at least 3 consecutive sync bytes to confirm alignment + for i in range(min(len(data), TS_PACKET_SIZE)): + if data[i] != TS_SYNC_BYTE: + continue + # Check for consecutive sync bytes at 188-byte intervals + ok = True + for check in range(1, 4): + pos = i + check * TS_PACKET_SIZE + if pos >= len(data): + # Not enough data to confirm, accept if at least one more matches + if check >= 2: + break + ok = False + break + if data[pos] != TS_SYNC_BYTE: + ok = False + break + if ok: + return i + return -1 + + def iter_packets(self, max_packets: int = 0): + """Yield TSPacket objects from the source.""" + buf = b'' + synced = False + count = 0 + + while True: + chunk = self.source.read(65536) + if not chunk: + break + buf += chunk + + if not synced: + sync_off = self.find_sync(buf) + if sync_off < 0: + # Keep last 187 bytes in case sync straddles chunk boundary + if len(buf) > TS_PACKET_SIZE * 4: + buf = buf[-(TS_PACKET_SIZE - 1):] + continue + self._sync_offset = sync_off + self.offset + if self.verbose and sync_off > 0: + print(f" Sync found at byte offset {sync_off}", file=sys.stderr) + buf = buf[sync_off:] + synced = True + + while len(buf) >= TS_PACKET_SIZE: + pkt_data = buf[:TS_PACKET_SIZE] + buf = buf[TS_PACKET_SIZE:] + + if pkt_data[0] != TS_SYNC_BYTE: + # Lost sync, try to re-acquire + synced = False + if self.verbose: + print(f" Sync lost, re-scanning...", file=sys.stderr) + break + + count += 1 + yield TSPacket(pkt_data) + + if max_packets and count >= max_packets: + return + + self.offset += len(chunk) + + @property + def sync_offset(self) -> int: + return self._sync_offset + + +class PSIParser: + """Parse PSI sections from TS packet payloads.""" + + def __init__(self): + self._section_bufs = {} # pid -> accumulated bytes + + def feed(self, pkt: TSPacket) -> dict: + """Feed a packet, return parsed section dict or None.""" + if pkt.payload is None: + return None + + pid = pkt.pid + payload = pkt.payload + + if pkt.pusi: + # Payload Unit Start Indicator set + if len(payload) < 1: + return None + pointer = payload[0] + payload = payload[1 + pointer:] + self._section_bufs[pid] = payload + elif pid in self._section_bufs: + self._section_bufs[pid] += payload + else: + return None + + return self._try_parse(pid) + + def _try_parse(self, pid: int) -> dict: + """Try to parse a complete section from the buffer.""" + buf = self._section_bufs.get(pid, b'') + if len(buf) < 3: + return None + + table_id = buf[0] + section_length = ((buf[1] & 0x0F) << 8) | buf[2] + total_len = 3 + section_length + + if len(buf) < total_len: + return None # Incomplete, wait for more data + + section = buf[:total_len] + # Clear buffer for next section + self._section_bufs[pid] = buf[total_len:] + + if section_length < 5: + return None + + result = { + "table_id": table_id, + "section_syntax": bool(buf[1] & 0x80), + "section_length": section_length, + "raw": section, + } + + if result["section_syntax"]: + result["table_id_ext"] = (section[3] << 8) | section[4] + result["version"] = (section[5] >> 1) & 0x1F + result["current_next"] = section[5] & 0x01 + result["section_number"] = section[6] + result["last_section_number"] = section[7] + result["data"] = section[8:-4] + result["crc32"] = struct.unpack_from('>I', section, total_len - 4)[0] + + return result + + +def parse_pat(section: dict) -> dict: + """Parse a Program Association Table section.""" + if section is None or section["table_id"] != 0x00: + return None + + transport_stream_id = section["table_id_ext"] + data = section["data"] + programs = {} + + for i in range(0, len(data), 4): + if i + 4 > len(data): + break + prog_num = (data[i] << 8) | data[i + 1] + pmt_pid = ((data[i + 2] & 0x1F) << 8) | data[i + 3] + programs[prog_num] = pmt_pid + + return { + "transport_stream_id": transport_stream_id, + "version": section["version"], + "programs": programs, + } + + +def parse_pmt(section: dict) -> dict: + """Parse a Program Map Table section.""" + if section is None or section["table_id"] != 0x02: + return None + + program_number = section["table_id_ext"] + data = section["raw"] + + if len(data) < 12: + return None + + pcr_pid = ((data[8] & 0x1F) << 8) | data[9] + prog_info_len = ((data[10] & 0x0F) << 8) | data[11] + + offset = 12 + prog_info_len + streams = [] + + while offset + 5 <= len(data) - 4: # -4 for CRC + stream_type = data[offset] + elementary_pid = ((data[offset + 1] & 0x1F) << 8) | data[offset + 2] + es_info_len = ((data[offset + 3] & 0x0F) << 8) | data[offset + 4] + + streams.append({ + "stream_type": stream_type, + "elementary_pid": elementary_pid, + "es_info_length": es_info_len, + "type_name": STREAM_TYPES.get(stream_type, f"Unknown (0x{stream_type:02X})"), + }) + offset += 5 + es_info_len + + return { + "program_number": program_number, + "version": section["version"], + "pcr_pid": pcr_pid, + "streams": streams, + } + + +def open_input(path: str): + """Open TS input from a file path or stdin ('-').""" + if path == '-': + return sys.stdin.buffer + if not os.path.exists(path): + print(f"File not found: {path}") + sys.exit(1) + return open(path, 'rb') + + +def format_pid(pid: int, known: dict = None) -> str: + """Format a PID with its known name if available.""" + if known is None: + known = KNOWN_PIDS + name = known.get(pid, "") + if name: + return f"0x{pid:04X} ({name})" + return f"0x{pid:04X}" + + +# -- Subcommand handlers -- + +def cmd_analyze(args: argparse.Namespace) -> None: + """Full transport stream analysis.""" + source = open_input(args.input) + reader = TSReader(source, verbose=args.verbose) + + pid_counts = {} + pid_cc = {} # Last continuity counter per PID + cc_errors = {} + tei_count = 0 + scrambled_count = 0 + total_packets = 0 + first_pcr = None + first_pcr_pkt = 0 + last_pcr = None + last_pcr_pkt = 0 + + print(f"MPEG-2 Transport Stream Analysis") + print(f"{'=' * 60}") + if args.input != '-': + file_size = os.path.getsize(args.input) + print(f"File: {args.input} ({file_size:,} bytes)") + print() + + try: + for pkt in reader.iter_packets(max_packets=args.max_packets): + total_packets += 1 + pid = pkt.pid + + # PID counting + pid_counts[pid] = pid_counts.get(pid, 0) + 1 + + # TEI + if pkt.tei: + tei_count += 1 + + # Scrambling + if pkt.scrambling != 0: + scrambled_count += 1 + + # Continuity counter check (only for PIDs carrying payload) + if pkt.adaptation & 0x01 and pid != 0x1FFF: + if pid in pid_cc: + expected = (pid_cc[pid] + 1) & 0x0F + if pkt.continuity != expected and pkt.continuity != pid_cc[pid]: + cc_errors[pid] = cc_errors.get(pid, 0) + 1 + pid_cc[pid] = pkt.continuity + + # PCR extraction for bitrate calculation + if pkt.has_pcr(): + pcr = pkt.get_pcr() + if pcr >= 0: + if first_pcr is None: + first_pcr = pcr + first_pcr_pkt = total_packets + last_pcr = pcr + last_pcr_pkt = total_packets + except KeyboardInterrupt: + print("\n (interrupted)") + finally: + if source is not sys.stdin.buffer: + source.close() + + if total_packets == 0: + print("No valid TS packets found.") + return + + # Summary + if reader.sync_offset > 0: + print(f"Sync offset: {reader.sync_offset} bytes (skipped leading garbage)") + print(f"Total packets: {total_packets:,}") + print(f"Total bytes: {total_packets * TS_PACKET_SIZE:,}") + print(f"Unique PIDs: {len(pid_counts)}") + print(f"TEI errors: {tei_count}") + print(f"Scrambled: {scrambled_count}") + + # Bitrate + if first_pcr is not None and last_pcr is not None and last_pcr != first_pcr: + pcr_delta = last_pcr - first_pcr + pkt_delta = last_pcr_pkt - first_pcr_pkt + if pcr_delta > 0 and pkt_delta > 0: + duration = pcr_delta / 27_000_000.0 # PCR is 27 MHz clock + byte_count = pkt_delta * TS_PACKET_SIZE + bitrate = (byte_count * 8) / duration + if bitrate >= 1e6: + rate_str = f"{bitrate / 1e6:.2f} Mbps" + else: + rate_str = f"{bitrate / 1e3:.1f} kbps" + print(f"Duration: {duration:.2f}s (from PCR)") + print(f"Bitrate: {rate_str} (PCR-based)") + elif args.input != '-': + # File size estimate + file_size = os.path.getsize(args.input) + print(f"Bitrate: (no PCR found, cannot calculate from timing)") + + # PID table + print(f"\n{'=' * 60}") + print(f"PID Distribution") + print(f"{'=' * 60}") + print(f" {'PID':>6} {'Count':>10} {'%':>7} {'CC Err':>6} Name") + print(f" {'---':>6} {'-----':>10} {'--':>7} {'------':>6} ----") + + for pid in sorted(pid_counts.keys()): + count = pid_counts[pid] + pct = (count / total_packets) * 100 + cc_err = cc_errors.get(pid, 0) + name = KNOWN_PIDS.get(pid, "") + cc_str = str(cc_err) if cc_err > 0 else "-" + print(f" 0x{pid:04X} {count:>10,} {pct:>6.2f}% {cc_str:>6} {name}") + + # CC error summary + total_cc_errors = sum(cc_errors.values()) + if total_cc_errors > 0: + print(f"\nContinuity errors: {total_cc_errors} total across " + f"{len(cc_errors)} PID(s)") + + +def cmd_pids(args: argparse.Namespace) -> None: + """Quick PID summary table.""" + source = open_input(args.input) + reader = TSReader(source, verbose=args.verbose) + + pid_counts = {} + total = 0 + + try: + for pkt in reader.iter_packets(max_packets=args.max_packets): + total += 1 + pid_counts[pkt.pid] = pid_counts.get(pkt.pid, 0) + 1 + except KeyboardInterrupt: + pass + finally: + if source is not sys.stdin.buffer: + source.close() + + if total == 0: + print("No TS packets found.") + return + + print(f"PID Table ({total:,} packets)") + print(f"{'=' * 50}") + print(f" {'PID':>6} {'Count':>10} {'%':>7} Name") + print(f" {'---':>6} {'-----':>10} {'--':>7} ----") + + for pid in sorted(pid_counts.keys()): + count = pid_counts[pid] + pct = (count / total) * 100 + name = KNOWN_PIDS.get(pid, "") + print(f" 0x{pid:04X} {count:>10,} {pct:>6.2f}% {name}") + + +def cmd_pat(args: argparse.Namespace) -> None: + """Parse and display the Program Association Table.""" + source = open_input(args.input) + reader = TSReader(source, verbose=args.verbose) + psi = PSIParser() + pat_found = False + + try: + for pkt in reader.iter_packets(): + if pkt.pid != 0x0000: + continue + section = psi.feed(pkt) + if section is None: + continue + pat = parse_pat(section) + if pat is None: + continue + + pat_found = True + print(f"Program Association Table (PAT)") + print(f"{'=' * 50}") + print(f" Transport Stream ID: 0x{pat['transport_stream_id']:04X} " + f"({pat['transport_stream_id']})") + print(f" Version: {pat['version']}") + print(f" Programs: {len(pat['programs'])}") + print() + print(f" {'Program':>10} {'PMT PID':>10} Note") + print(f" {'-------':>10} {'-------':>10} ----") + for prog, pmt_pid in sorted(pat['programs'].items()): + note = "NIT" if prog == 0 else "" + print(f" {prog:>10} 0x{pmt_pid:04X} {note}") + break + + except KeyboardInterrupt: + pass + finally: + if source is not sys.stdin.buffer: + source.close() + + if not pat_found: + print("No PAT (PID 0x0000) found in stream.") + + +def cmd_pmt(args: argparse.Namespace) -> None: + """Parse and display Program Map Tables.""" + source = open_input(args.input) + reader = TSReader(source, verbose=args.verbose) + psi_pat = PSIParser() + psi_pmt = PSIParser() + + pat = None + pmt_pids = set() + pmts_found = {} + + try: + for pkt in reader.iter_packets(): + # First, collect PAT to learn PMT PIDs + if pkt.pid == 0x0000 and pat is None: + section = psi_pat.feed(pkt) + if section is not None: + pat = parse_pat(section) + if pat is not None: + for prog, pid in pat['programs'].items(): + if prog != 0: # Skip NIT reference + pmt_pids.add(pid) + + # Then collect PMT sections + if pkt.pid in pmt_pids and pkt.pid not in pmts_found: + section = psi_pmt.feed(pkt) + if section is not None: + pmt = parse_pmt(section) + if pmt is not None: + pmts_found[pkt.pid] = pmt + + # Done when we have all PMTs + if pat is not None and len(pmts_found) >= len(pmt_pids): + break + + except KeyboardInterrupt: + pass + finally: + if source is not sys.stdin.buffer: + source.close() + + if pat is None: + print("No PAT found -- cannot locate PMTs.") + return + + if not pmts_found: + print("PAT found but no PMT sections could be parsed.") + return + + print(f"Program Map Tables") + print(f"{'=' * 60}") + print(f"Transport Stream ID: 0x{pat['transport_stream_id']:04X}") + print() + + for pmt_pid in sorted(pmts_found.keys()): + pmt = pmts_found[pmt_pid] + print(f" Program {pmt['program_number']} (PMT PID 0x{pmt_pid:04X}, " + f"version {pmt['version']})") + print(f" PCR PID: 0x{pmt['pcr_pid']:04X}") + print(f" Streams:") + print(f" {'Type':>6} {'PID':>6} Description") + print(f" {'----':>6} {'---':>6} -----------") + for s in pmt['streams']: + print(f" 0x{s['stream_type']:02X} 0x{s['elementary_pid']:04X} " + f"{s['type_name']}") + print() + + +def cmd_dump(args: argparse.Namespace) -> None: + """Hex dump of individual TS packets.""" + source = open_input(args.input) + reader = TSReader(source, verbose=args.verbose) + filter_pid = args.pid + max_count = args.count + shown = 0 + + try: + for pkt in reader.iter_packets(): + if filter_pid is not None and pkt.pid != filter_pid: + continue + + shown += 1 + scrambling_str = ["none", "reserved", "even key", "odd key"][pkt.scrambling] + adapt_str = ["reserved", "payload only", "adapt only", "adapt+payload"][pkt.adaptation] + + print(f"Packet #{shown}") + print(f" PID: 0x{pkt.pid:04X} ({KNOWN_PIDS.get(pkt.pid, '')})") + print(f" TEI: {int(pkt.tei)} PUSI: {int(pkt.pusi)} " + f"Priority: {int(pkt.priority)}") + print(f" Scrambling: {scrambling_str} " + f"Adaptation: {adapt_str} CC: {pkt.continuity}") + + if pkt.adaptation_field is not None and len(pkt.adaptation_field) > 1: + af_len = pkt.adaptation_field[0] + af_flags = pkt.adaptation_field[1] if len(pkt.adaptation_field) > 1 else 0 + flags = [] + if af_flags & 0x80: flags.append("discontinuity") + if af_flags & 0x40: flags.append("random_access") + if af_flags & 0x20: flags.append("ES_priority") + if af_flags & 0x10: flags.append("PCR") + if af_flags & 0x08: flags.append("OPCR") + if af_flags & 0x04: flags.append("splice_point") + if af_flags & 0x02: flags.append("private_data") + if af_flags & 0x01: flags.append("extension") + print(f" Adaptation field: {af_len} bytes, " + f"flags=[{', '.join(flags) if flags else 'none'}]") + if pkt.has_pcr(): + pcr = pkt.get_pcr() + pcr_secs = pcr / 27_000_000.0 + print(f" PCR: {pcr} ({pcr_secs:.6f}s)") + + # Hex dump + data = pkt.raw + print(f" Hex:") + for row_off in range(0, len(data), 16): + row = data[row_off:row_off + 16] + hex_part = ' '.join(f'{b:02X}' for b in row) + ascii_part = ''.join(chr(b) if 0x20 <= b < 0x7F else '.' for b in row) + print(f" {row_off:04X}: {hex_part:<48} {ascii_part}") + print() + + if max_count and shown >= max_count: + break + + except KeyboardInterrupt: + pass + finally: + if source is not sys.stdin.buffer: + source.close() + + if shown == 0: + if filter_pid is not None: + print(f"No packets found with PID 0x{filter_pid:04X}") + else: + print("No TS packets found.") + + +def cmd_monitor(args: argparse.Namespace) -> None: + """Live stream monitoring from stdin or file.""" + source = open_input(args.input) + reader = TSReader(source, verbose=args.verbose) + + pid_counts = {} + known_pids = set() + cc_last = {} + cc_errors = 0 + tei_count = 0 + total_packets = 0 + interval_packets = 0 + start_time = time.time() + last_report = start_time + + print(f"MPEG-2 TS Live Monitor") + print(f"{'=' * 60}") + print(f"Ctrl-C to stop\n") + + try: + for pkt in reader.iter_packets(): + total_packets += 1 + interval_packets += 1 + pid = pkt.pid + + pid_counts[pid] = pid_counts.get(pid, 0) + 1 + + # New PID detection + if pid not in known_pids: + known_pids.add(pid) + name = KNOWN_PIDS.get(pid, "") + label = f" ({name})" if name else "" + elapsed = time.time() - start_time + print(f" [{elapsed:>7.1f}s] New PID: 0x{pid:04X}{label}") + + # TEI + if pkt.tei: + tei_count += 1 + elapsed = time.time() - start_time + print(f" [{elapsed:>7.1f}s] TEI error on PID 0x{pid:04X}") + + # CC check + if pkt.adaptation & 0x01 and pid != 0x1FFF: + if pid in cc_last: + expected = (cc_last[pid] + 1) & 0x0F + if pkt.continuity != expected and pkt.continuity != cc_last[pid]: + cc_errors += 1 + elapsed = time.time() - start_time + print(f" [{elapsed:>7.1f}s] CC error PID 0x{pid:04X}: " + f"expected {expected}, got {pkt.continuity}") + cc_last[pid] = pkt.continuity + + # Periodic status + now = time.time() + if now - last_report >= 1.0: + elapsed = now - start_time + total_bytes = total_packets * TS_PACKET_SIZE + bitrate = (interval_packets * TS_PACKET_SIZE * 8) + if bitrate >= 1e6: + rate_str = f"{bitrate / 1e6:.2f} Mbps" + else: + rate_str = f"{bitrate / 1e3:.1f} kbps" + sys.stderr.write( + f"\r {total_packets:>10,} pkts " + f"{total_bytes:>12,} bytes " + f"{rate_str:>12} " + f"PIDs:{len(known_pids):>3} " + f"CCerr:{cc_errors} " + f"TEI:{tei_count} " + f"({elapsed:.0f}s) " + ) + sys.stderr.flush() + interval_packets = 0 + last_report = now + + except KeyboardInterrupt: + pass + finally: + if source is not sys.stdin.buffer: + source.close() + + elapsed = time.time() - start_time + total_bytes = total_packets * TS_PACKET_SIZE + print(f"\n\nMonitor Summary") + print(f"{'=' * 40}") + print(f" Duration: {elapsed:.1f}s") + print(f" Packets: {total_packets:,}") + print(f" Bytes: {total_bytes:,}") + print(f" Unique PIDs: {len(known_pids)}") + print(f" CC errors: {cc_errors}") + print(f" TEI errors: {tei_count}") + + if elapsed > 0: + avg_bitrate = (total_bytes * 8) / elapsed + if avg_bitrate >= 1e6: + print(f" Avg bitrate: {avg_bitrate / 1e6:.2f} Mbps") + else: + print(f" Avg bitrate: {avg_bitrate / 1e3:.1f} kbps") + + +# -- CLI -- + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="MPEG-2 Transport Stream analyzer for Genpix SkyWalker-1", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog="""\ +examples: + %(prog)s capture.ts + %(prog)s analyze capture.ts + %(prog)s pids capture.ts + %(prog)s pat capture.ts + %(prog)s pmt capture.ts + %(prog)s dump capture.ts --pid 0x100 --count 5 + %(prog)s monitor - + tune.py stream --stdout | %(prog)s monitor - +""") + parser.add_argument('-v', '--verbose', action='store_true', + help="Show sync search details and debug info") + + sub = parser.add_subparsers(dest='command') + + # analyze (default) + p_analyze = sub.add_parser('analyze', help="Full stream analysis (default)") + p_analyze.add_argument('input', help="TS file path or '-' for stdin") + p_analyze.add_argument('--max-packets', type=int, default=0, + help="Max packets to analyze (0 = all)") + + # pids + p_pids = sub.add_parser('pids', help="Quick PID summary table") + p_pids.add_argument('input', help="TS file path or '-' for stdin") + p_pids.add_argument('--max-packets', type=int, default=0, + help="Max packets to analyze (0 = all)") + + # pat + p_pat = sub.add_parser('pat', help="Parse Program Association Table") + p_pat.add_argument('input', help="TS file path or '-' for stdin") + + # pmt + p_pmt = sub.add_parser('pmt', help="Parse Program Map Tables") + p_pmt.add_argument('input', help="TS file path or '-' for stdin") + + # dump + p_dump = sub.add_parser('dump', help="Hex dump of TS packets") + p_dump.add_argument('input', help="TS file path or '-' for stdin") + p_dump.add_argument('--pid', type=lambda x: int(x, 0), default=None, + help="Filter by PID (hex: 0x100 or decimal: 256)") + p_dump.add_argument('--count', type=int, default=10, + help="Max packets to dump (default: 10)") + + # monitor + p_monitor = sub.add_parser('monitor', help="Live stream monitoring") + p_monitor.add_argument('input', help="TS file path or '-' for stdin") + + return parser + + +def main(): + parser = build_parser() + + # Handle bare filename without subcommand: default to 'analyze' + # Insert 'analyze' after any global flags but before the filename + subcmds = {'analyze', 'pids', 'pat', 'pmt', 'dump', 'monitor'} + argv = sys.argv[1:] + if argv: + first_pos = None + insert_idx = 0 + for i, a in enumerate(argv): + if not a.startswith('-'): + first_pos = a + insert_idx = i + break + # Skip flag and its value if it takes one (currently none do) + insert_idx = i + 1 + if first_pos is not None and first_pos not in subcmds: + argv.insert(insert_idx, 'analyze') + + args = parser.parse_args(argv) + + if not args.command: + parser.print_help() + sys.exit(1) + + dispatch = { + 'analyze': cmd_analyze, + 'pids': cmd_pids, + 'pat': cmd_pat, + 'pmt': cmd_pmt, + 'dump': cmd_dump, + 'monitor': cmd_monitor, + } + + handler = dispatch.get(args.command) + if handler is None: + parser.print_help() + sys.exit(1) + + handler(args) + + +if __name__ == '__main__': + main() diff --git a/tools/tune.py b/tools/tune.py index 48cda50..e37c889 100755 --- a/tools/tune.py +++ b/tools/tune.py @@ -751,7 +751,9 @@ examples: sub = parser.add_subparsers(dest='command') # status - sub.add_parser('status', help="Show device config, FW version, signal status") + p_status = sub.add_parser('status', help="Show device config, FW version, signal status") + p_status.add_argument('--json', action='store_true', default=False, + help="Output machine-readable JSON") # tune p_tune = sub.add_parser('tune', help="Tune to a transponder") @@ -774,6 +776,8 @@ examples: help="Signal lock timeout in seconds (default: 10)") p_tune.add_argument('--extra-volt', action='store_true', help="Enable +1V LNB voltage boost for long cables") + p_tune.add_argument('--json', action='store_true', default=False, + help="Output machine-readable JSON") # stream p_stream = sub.add_parser('stream', help="Stream MPEG-2 TS data")