skywalker-1/tools/ts_analyze.py
Ryan Malloy c7b5932cc0 Add EEPROM flash tool, TS analyzer, DVB-S2 investigation, and tune.py bugfix
New tools:
- tools/eeprom_write.py: EEPROM firmware flash with backup, verify, dry-run
- tools/ts_analyze.py: MPEG-2 transport stream analyzer with PAT/PMT parsing

DVB-S2 investigation confirms BCM4500 hardware limitation (no LDPC/BCH silicon).

Fix --json flag on tune.py subcommands (argparse parent/child scoping).
All tools verified against live SkyWalker-1 hardware.
2026-02-11 14:46:20 -07:00

898 lines
30 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Genpix SkyWalker-1 MPEG-2 Transport Stream analyzer.
Parses and analyzes 188-byte MPEG-2 TS packets from .ts files captured
by tune.py, stdin pipes, or any standard transport stream source.
Supports PID analysis, PAT/PMT parsing, continuity counter checking,
scrambling detection, hex packet dumps, and live stream monitoring.
Reference: ISO/IEC 13818-1 (MPEG-2 Systems)
TS packet: 188 bytes, sync byte 0x47
"""
import sys
import struct
import argparse
import time
import os
TS_PACKET_SIZE = 188
TS_SYNC_BYTE = 0x47
# Well-known PID assignments (ISO 13818-1 Table 2-3)
KNOWN_PIDS = {
0x0000: "PAT",
0x0001: "CAT",
0x0002: "TSDT",
0x0010: "NIT/ST",
0x0011: "SDT/BAT/ST",
0x0012: "EIT/ST",
0x0013: "RST/ST",
0x0014: "TDT/TOT/ST",
0x001E: "DIT",
0x001F: "SIT",
0x1FFF: "Null",
}
# Stream type identifiers (ISO 13818-1 Table 2-36)
STREAM_TYPES = {
0x00: "Reserved",
0x01: "MPEG-1 Video (11172-2)",
0x02: "MPEG-2 Video (13818-2)",
0x03: "MPEG-1 Audio (11172-3)",
0x04: "MPEG-2 Audio (13818-3)",
0x05: "Private Sections (13818-1)",
0x06: "PES Private Data",
0x07: "MHEG",
0x08: "DSM-CC",
0x09: "H.222.1",
0x0A: "DSM-CC Type A",
0x0B: "DSM-CC Type B",
0x0C: "DSM-CC Type C",
0x0D: "DSM-CC Type D",
0x0E: "Auxiliary",
0x0F: "MPEG-2 AAC Audio",
0x10: "MPEG-4 Visual",
0x11: "MPEG-4 AAC Audio (LATM)",
0x15: "Metadata in PES",
0x1B: "H.264/AVC Video",
0x24: "H.265/HEVC Video",
0x42: "AVS Video",
0x81: "AC-3 Audio (ATSC)",
0x82: "DTS Audio",
0x83: "Dolby TrueHD",
0x84: "Dolby Digital Plus (EAC-3)",
0x85: "DTS-HD",
0x86: "DTS-HD Master Audio",
0x87: "EAC-3 Audio (ATSC)",
0xEA: "VC-1 Video",
}
class TSPacket:
"""Parsed MPEG-2 transport stream packet header."""
__slots__ = (
'sync', 'tei', 'pusi', 'priority', 'pid',
'scrambling', 'adaptation', 'continuity',
'adaptation_field', 'payload', 'raw',
)
def __init__(self, data: bytes):
if len(data) != TS_PACKET_SIZE:
raise ValueError(f"Packet must be {TS_PACKET_SIZE} bytes, got {len(data)}")
self.raw = data
self.sync = data[0]
self.tei = bool(data[1] & 0x80)
self.pusi = bool(data[1] & 0x40)
self.priority = bool(data[1] & 0x20)
self.pid = ((data[1] & 0x1F) << 8) | data[2]
self.scrambling = (data[3] >> 6) & 0x03
self.adaptation = (data[3] >> 4) & 0x03
self.continuity = data[3] & 0x0F
# Parse adaptation field and payload boundaries
offset = 4
self.adaptation_field = None
self.payload = None
if self.adaptation & 0x02:
# Adaptation field present
if offset < TS_PACKET_SIZE:
af_len = data[offset]
af_end = offset + 1 + af_len
if af_end <= TS_PACKET_SIZE:
self.adaptation_field = data[offset:af_end]
offset = af_end
if self.adaptation & 0x01:
# Payload present
if offset < TS_PACKET_SIZE:
self.payload = data[offset:]
def has_pcr(self) -> bool:
"""Check if adaptation field contains a PCR."""
if self.adaptation_field is None or len(self.adaptation_field) < 7:
return False
af_flags = self.adaptation_field[1] if len(self.adaptation_field) > 1 else 0
return bool(af_flags & 0x10)
def get_pcr(self) -> int:
"""Extract PCR value (in 27 MHz clock ticks). Returns -1 if no PCR."""
if not self.has_pcr():
return -1
# PCR is 6 bytes starting at adaptation_field[2]
af = self.adaptation_field
pcr_base = (af[2] << 25) | (af[3] << 17) | (af[4] << 9) | \
(af[5] << 1) | ((af[6] >> 7) & 0x01)
pcr_ext = ((af[6] & 0x01) << 8) | af[7]
return pcr_base * 300 + pcr_ext
class TSReader:
"""Reads TS packets from a file or stream, handling sync alignment."""
def __init__(self, source, verbose: bool = False):
self.source = source
self.verbose = verbose
self.offset = 0
self._sync_offset = -1
def find_sync(self, data: bytes) -> int:
"""Find sync byte alignment in raw data. Returns byte offset or -1."""
# Need at least 3 consecutive sync bytes to confirm alignment
for i in range(min(len(data), TS_PACKET_SIZE)):
if data[i] != TS_SYNC_BYTE:
continue
# Check for consecutive sync bytes at 188-byte intervals
ok = True
for check in range(1, 4):
pos = i + check * TS_PACKET_SIZE
if pos >= len(data):
# Not enough data to confirm, accept if at least one more matches
if check >= 2:
break
ok = False
break
if data[pos] != TS_SYNC_BYTE:
ok = False
break
if ok:
return i
return -1
def iter_packets(self, max_packets: int = 0):
"""Yield TSPacket objects from the source."""
buf = b''
synced = False
count = 0
while True:
chunk = self.source.read(65536)
if not chunk:
break
buf += chunk
if not synced:
sync_off = self.find_sync(buf)
if sync_off < 0:
# Keep last 187 bytes in case sync straddles chunk boundary
if len(buf) > TS_PACKET_SIZE * 4:
buf = buf[-(TS_PACKET_SIZE - 1):]
continue
self._sync_offset = sync_off + self.offset
if self.verbose and sync_off > 0:
print(f" Sync found at byte offset {sync_off}", file=sys.stderr)
buf = buf[sync_off:]
synced = True
while len(buf) >= TS_PACKET_SIZE:
pkt_data = buf[:TS_PACKET_SIZE]
buf = buf[TS_PACKET_SIZE:]
if pkt_data[0] != TS_SYNC_BYTE:
# Lost sync, try to re-acquire
synced = False
if self.verbose:
print(f" Sync lost, re-scanning...", file=sys.stderr)
break
count += 1
yield TSPacket(pkt_data)
if max_packets and count >= max_packets:
return
self.offset += len(chunk)
@property
def sync_offset(self) -> int:
return self._sync_offset
class PSIParser:
"""Parse PSI sections from TS packet payloads."""
def __init__(self):
self._section_bufs = {} # pid -> accumulated bytes
def feed(self, pkt: TSPacket) -> dict:
"""Feed a packet, return parsed section dict or None."""
if pkt.payload is None:
return None
pid = pkt.pid
payload = pkt.payload
if pkt.pusi:
# Payload Unit Start Indicator set
if len(payload) < 1:
return None
pointer = payload[0]
payload = payload[1 + pointer:]
self._section_bufs[pid] = payload
elif pid in self._section_bufs:
self._section_bufs[pid] += payload
else:
return None
return self._try_parse(pid)
def _try_parse(self, pid: int) -> dict:
"""Try to parse a complete section from the buffer."""
buf = self._section_bufs.get(pid, b'')
if len(buf) < 3:
return None
table_id = buf[0]
section_length = ((buf[1] & 0x0F) << 8) | buf[2]
total_len = 3 + section_length
if len(buf) < total_len:
return None # Incomplete, wait for more data
section = buf[:total_len]
# Clear buffer for next section
self._section_bufs[pid] = buf[total_len:]
if section_length < 5:
return None
result = {
"table_id": table_id,
"section_syntax": bool(buf[1] & 0x80),
"section_length": section_length,
"raw": section,
}
if result["section_syntax"]:
result["table_id_ext"] = (section[3] << 8) | section[4]
result["version"] = (section[5] >> 1) & 0x1F
result["current_next"] = section[5] & 0x01
result["section_number"] = section[6]
result["last_section_number"] = section[7]
result["data"] = section[8:-4]
result["crc32"] = struct.unpack_from('>I', section, total_len - 4)[0]
return result
def parse_pat(section: dict) -> dict:
"""Parse a Program Association Table section."""
if section is None or section["table_id"] != 0x00:
return None
transport_stream_id = section["table_id_ext"]
data = section["data"]
programs = {}
for i in range(0, len(data), 4):
if i + 4 > len(data):
break
prog_num = (data[i] << 8) | data[i + 1]
pmt_pid = ((data[i + 2] & 0x1F) << 8) | data[i + 3]
programs[prog_num] = pmt_pid
return {
"transport_stream_id": transport_stream_id,
"version": section["version"],
"programs": programs,
}
def parse_pmt(section: dict) -> dict:
"""Parse a Program Map Table section."""
if section is None or section["table_id"] != 0x02:
return None
program_number = section["table_id_ext"]
data = section["raw"]
if len(data) < 12:
return None
pcr_pid = ((data[8] & 0x1F) << 8) | data[9]
prog_info_len = ((data[10] & 0x0F) << 8) | data[11]
offset = 12 + prog_info_len
streams = []
while offset + 5 <= len(data) - 4: # -4 for CRC
stream_type = data[offset]
elementary_pid = ((data[offset + 1] & 0x1F) << 8) | data[offset + 2]
es_info_len = ((data[offset + 3] & 0x0F) << 8) | data[offset + 4]
streams.append({
"stream_type": stream_type,
"elementary_pid": elementary_pid,
"es_info_length": es_info_len,
"type_name": STREAM_TYPES.get(stream_type, f"Unknown (0x{stream_type:02X})"),
})
offset += 5 + es_info_len
return {
"program_number": program_number,
"version": section["version"],
"pcr_pid": pcr_pid,
"streams": streams,
}
def open_input(path: str):
"""Open TS input from a file path or stdin ('-')."""
if path == '-':
return sys.stdin.buffer
if not os.path.exists(path):
print(f"File not found: {path}")
sys.exit(1)
return open(path, 'rb')
def format_pid(pid: int, known: dict = None) -> str:
"""Format a PID with its known name if available."""
if known is None:
known = KNOWN_PIDS
name = known.get(pid, "")
if name:
return f"0x{pid:04X} ({name})"
return f"0x{pid:04X}"
# -- Subcommand handlers --
def cmd_analyze(args: argparse.Namespace) -> None:
"""Full transport stream analysis."""
source = open_input(args.input)
reader = TSReader(source, verbose=args.verbose)
pid_counts = {}
pid_cc = {} # Last continuity counter per PID
cc_errors = {}
tei_count = 0
scrambled_count = 0
total_packets = 0
first_pcr = None
first_pcr_pkt = 0
last_pcr = None
last_pcr_pkt = 0
print(f"MPEG-2 Transport Stream Analysis")
print(f"{'=' * 60}")
if args.input != '-':
file_size = os.path.getsize(args.input)
print(f"File: {args.input} ({file_size:,} bytes)")
print()
try:
for pkt in reader.iter_packets(max_packets=args.max_packets):
total_packets += 1
pid = pkt.pid
# PID counting
pid_counts[pid] = pid_counts.get(pid, 0) + 1
# TEI
if pkt.tei:
tei_count += 1
# Scrambling
if pkt.scrambling != 0:
scrambled_count += 1
# Continuity counter check (only for PIDs carrying payload)
if pkt.adaptation & 0x01 and pid != 0x1FFF:
if pid in pid_cc:
expected = (pid_cc[pid] + 1) & 0x0F
if pkt.continuity != expected and pkt.continuity != pid_cc[pid]:
cc_errors[pid] = cc_errors.get(pid, 0) + 1
pid_cc[pid] = pkt.continuity
# PCR extraction for bitrate calculation
if pkt.has_pcr():
pcr = pkt.get_pcr()
if pcr >= 0:
if first_pcr is None:
first_pcr = pcr
first_pcr_pkt = total_packets
last_pcr = pcr
last_pcr_pkt = total_packets
except KeyboardInterrupt:
print("\n (interrupted)")
finally:
if source is not sys.stdin.buffer:
source.close()
if total_packets == 0:
print("No valid TS packets found.")
return
# Summary
if reader.sync_offset > 0:
print(f"Sync offset: {reader.sync_offset} bytes (skipped leading garbage)")
print(f"Total packets: {total_packets:,}")
print(f"Total bytes: {total_packets * TS_PACKET_SIZE:,}")
print(f"Unique PIDs: {len(pid_counts)}")
print(f"TEI errors: {tei_count}")
print(f"Scrambled: {scrambled_count}")
# Bitrate
if first_pcr is not None and last_pcr is not None and last_pcr != first_pcr:
pcr_delta = last_pcr - first_pcr
pkt_delta = last_pcr_pkt - first_pcr_pkt
if pcr_delta > 0 and pkt_delta > 0:
duration = pcr_delta / 27_000_000.0 # PCR is 27 MHz clock
byte_count = pkt_delta * TS_PACKET_SIZE
bitrate = (byte_count * 8) / duration
if bitrate >= 1e6:
rate_str = f"{bitrate / 1e6:.2f} Mbps"
else:
rate_str = f"{bitrate / 1e3:.1f} kbps"
print(f"Duration: {duration:.2f}s (from PCR)")
print(f"Bitrate: {rate_str} (PCR-based)")
elif args.input != '-':
# File size estimate
file_size = os.path.getsize(args.input)
print(f"Bitrate: (no PCR found, cannot calculate from timing)")
# PID table
print(f"\n{'=' * 60}")
print(f"PID Distribution")
print(f"{'=' * 60}")
print(f" {'PID':>6} {'Count':>10} {'%':>7} {'CC Err':>6} Name")
print(f" {'---':>6} {'-----':>10} {'--':>7} {'------':>6} ----")
for pid in sorted(pid_counts.keys()):
count = pid_counts[pid]
pct = (count / total_packets) * 100
cc_err = cc_errors.get(pid, 0)
name = KNOWN_PIDS.get(pid, "")
cc_str = str(cc_err) if cc_err > 0 else "-"
print(f" 0x{pid:04X} {count:>10,} {pct:>6.2f}% {cc_str:>6} {name}")
# CC error summary
total_cc_errors = sum(cc_errors.values())
if total_cc_errors > 0:
print(f"\nContinuity errors: {total_cc_errors} total across "
f"{len(cc_errors)} PID(s)")
def cmd_pids(args: argparse.Namespace) -> None:
"""Quick PID summary table."""
source = open_input(args.input)
reader = TSReader(source, verbose=args.verbose)
pid_counts = {}
total = 0
try:
for pkt in reader.iter_packets(max_packets=args.max_packets):
total += 1
pid_counts[pkt.pid] = pid_counts.get(pkt.pid, 0) + 1
except KeyboardInterrupt:
pass
finally:
if source is not sys.stdin.buffer:
source.close()
if total == 0:
print("No TS packets found.")
return
print(f"PID Table ({total:,} packets)")
print(f"{'=' * 50}")
print(f" {'PID':>6} {'Count':>10} {'%':>7} Name")
print(f" {'---':>6} {'-----':>10} {'--':>7} ----")
for pid in sorted(pid_counts.keys()):
count = pid_counts[pid]
pct = (count / total) * 100
name = KNOWN_PIDS.get(pid, "")
print(f" 0x{pid:04X} {count:>10,} {pct:>6.2f}% {name}")
def cmd_pat(args: argparse.Namespace) -> None:
"""Parse and display the Program Association Table."""
source = open_input(args.input)
reader = TSReader(source, verbose=args.verbose)
psi = PSIParser()
pat_found = False
try:
for pkt in reader.iter_packets():
if pkt.pid != 0x0000:
continue
section = psi.feed(pkt)
if section is None:
continue
pat = parse_pat(section)
if pat is None:
continue
pat_found = True
print(f"Program Association Table (PAT)")
print(f"{'=' * 50}")
print(f" Transport Stream ID: 0x{pat['transport_stream_id']:04X} "
f"({pat['transport_stream_id']})")
print(f" Version: {pat['version']}")
print(f" Programs: {len(pat['programs'])}")
print()
print(f" {'Program':>10} {'PMT PID':>10} Note")
print(f" {'-------':>10} {'-------':>10} ----")
for prog, pmt_pid in sorted(pat['programs'].items()):
note = "NIT" if prog == 0 else ""
print(f" {prog:>10} 0x{pmt_pid:04X} {note}")
break
except KeyboardInterrupt:
pass
finally:
if source is not sys.stdin.buffer:
source.close()
if not pat_found:
print("No PAT (PID 0x0000) found in stream.")
def cmd_pmt(args: argparse.Namespace) -> None:
"""Parse and display Program Map Tables."""
source = open_input(args.input)
reader = TSReader(source, verbose=args.verbose)
psi_pat = PSIParser()
psi_pmt = PSIParser()
pat = None
pmt_pids = set()
pmts_found = {}
try:
for pkt in reader.iter_packets():
# First, collect PAT to learn PMT PIDs
if pkt.pid == 0x0000 and pat is None:
section = psi_pat.feed(pkt)
if section is not None:
pat = parse_pat(section)
if pat is not None:
for prog, pid in pat['programs'].items():
if prog != 0: # Skip NIT reference
pmt_pids.add(pid)
# Then collect PMT sections
if pkt.pid in pmt_pids and pkt.pid not in pmts_found:
section = psi_pmt.feed(pkt)
if section is not None:
pmt = parse_pmt(section)
if pmt is not None:
pmts_found[pkt.pid] = pmt
# Done when we have all PMTs
if pat is not None and len(pmts_found) >= len(pmt_pids):
break
except KeyboardInterrupt:
pass
finally:
if source is not sys.stdin.buffer:
source.close()
if pat is None:
print("No PAT found -- cannot locate PMTs.")
return
if not pmts_found:
print("PAT found but no PMT sections could be parsed.")
return
print(f"Program Map Tables")
print(f"{'=' * 60}")
print(f"Transport Stream ID: 0x{pat['transport_stream_id']:04X}")
print()
for pmt_pid in sorted(pmts_found.keys()):
pmt = pmts_found[pmt_pid]
print(f" Program {pmt['program_number']} (PMT PID 0x{pmt_pid:04X}, "
f"version {pmt['version']})")
print(f" PCR PID: 0x{pmt['pcr_pid']:04X}")
print(f" Streams:")
print(f" {'Type':>6} {'PID':>6} Description")
print(f" {'----':>6} {'---':>6} -----------")
for s in pmt['streams']:
print(f" 0x{s['stream_type']:02X} 0x{s['elementary_pid']:04X} "
f"{s['type_name']}")
print()
def cmd_dump(args: argparse.Namespace) -> None:
"""Hex dump of individual TS packets."""
source = open_input(args.input)
reader = TSReader(source, verbose=args.verbose)
filter_pid = args.pid
max_count = args.count
shown = 0
try:
for pkt in reader.iter_packets():
if filter_pid is not None and pkt.pid != filter_pid:
continue
shown += 1
scrambling_str = ["none", "reserved", "even key", "odd key"][pkt.scrambling]
adapt_str = ["reserved", "payload only", "adapt only", "adapt+payload"][pkt.adaptation]
print(f"Packet #{shown}")
print(f" PID: 0x{pkt.pid:04X} ({KNOWN_PIDS.get(pkt.pid, '')})")
print(f" TEI: {int(pkt.tei)} PUSI: {int(pkt.pusi)} "
f"Priority: {int(pkt.priority)}")
print(f" Scrambling: {scrambling_str} "
f"Adaptation: {adapt_str} CC: {pkt.continuity}")
if pkt.adaptation_field is not None and len(pkt.adaptation_field) > 1:
af_len = pkt.adaptation_field[0]
af_flags = pkt.adaptation_field[1] if len(pkt.adaptation_field) > 1 else 0
flags = []
if af_flags & 0x80: flags.append("discontinuity")
if af_flags & 0x40: flags.append("random_access")
if af_flags & 0x20: flags.append("ES_priority")
if af_flags & 0x10: flags.append("PCR")
if af_flags & 0x08: flags.append("OPCR")
if af_flags & 0x04: flags.append("splice_point")
if af_flags & 0x02: flags.append("private_data")
if af_flags & 0x01: flags.append("extension")
print(f" Adaptation field: {af_len} bytes, "
f"flags=[{', '.join(flags) if flags else 'none'}]")
if pkt.has_pcr():
pcr = pkt.get_pcr()
pcr_secs = pcr / 27_000_000.0
print(f" PCR: {pcr} ({pcr_secs:.6f}s)")
# Hex dump
data = pkt.raw
print(f" Hex:")
for row_off in range(0, len(data), 16):
row = data[row_off:row_off + 16]
hex_part = ' '.join(f'{b:02X}' for b in row)
ascii_part = ''.join(chr(b) if 0x20 <= b < 0x7F else '.' for b in row)
print(f" {row_off:04X}: {hex_part:<48} {ascii_part}")
print()
if max_count and shown >= max_count:
break
except KeyboardInterrupt:
pass
finally:
if source is not sys.stdin.buffer:
source.close()
if shown == 0:
if filter_pid is not None:
print(f"No packets found with PID 0x{filter_pid:04X}")
else:
print("No TS packets found.")
def cmd_monitor(args: argparse.Namespace) -> None:
"""Live stream monitoring from stdin or file."""
source = open_input(args.input)
reader = TSReader(source, verbose=args.verbose)
pid_counts = {}
known_pids = set()
cc_last = {}
cc_errors = 0
tei_count = 0
total_packets = 0
interval_packets = 0
start_time = time.time()
last_report = start_time
print(f"MPEG-2 TS Live Monitor")
print(f"{'=' * 60}")
print(f"Ctrl-C to stop\n")
try:
for pkt in reader.iter_packets():
total_packets += 1
interval_packets += 1
pid = pkt.pid
pid_counts[pid] = pid_counts.get(pid, 0) + 1
# New PID detection
if pid not in known_pids:
known_pids.add(pid)
name = KNOWN_PIDS.get(pid, "")
label = f" ({name})" if name else ""
elapsed = time.time() - start_time
print(f" [{elapsed:>7.1f}s] New PID: 0x{pid:04X}{label}")
# TEI
if pkt.tei:
tei_count += 1
elapsed = time.time() - start_time
print(f" [{elapsed:>7.1f}s] TEI error on PID 0x{pid:04X}")
# CC check
if pkt.adaptation & 0x01 and pid != 0x1FFF:
if pid in cc_last:
expected = (cc_last[pid] + 1) & 0x0F
if pkt.continuity != expected and pkt.continuity != cc_last[pid]:
cc_errors += 1
elapsed = time.time() - start_time
print(f" [{elapsed:>7.1f}s] CC error PID 0x{pid:04X}: "
f"expected {expected}, got {pkt.continuity}")
cc_last[pid] = pkt.continuity
# Periodic status
now = time.time()
if now - last_report >= 1.0:
elapsed = now - start_time
total_bytes = total_packets * TS_PACKET_SIZE
bitrate = (interval_packets * TS_PACKET_SIZE * 8)
if bitrate >= 1e6:
rate_str = f"{bitrate / 1e6:.2f} Mbps"
else:
rate_str = f"{bitrate / 1e3:.1f} kbps"
sys.stderr.write(
f"\r {total_packets:>10,} pkts "
f"{total_bytes:>12,} bytes "
f"{rate_str:>12} "
f"PIDs:{len(known_pids):>3} "
f"CCerr:{cc_errors} "
f"TEI:{tei_count} "
f"({elapsed:.0f}s) "
)
sys.stderr.flush()
interval_packets = 0
last_report = now
except KeyboardInterrupt:
pass
finally:
if source is not sys.stdin.buffer:
source.close()
elapsed = time.time() - start_time
total_bytes = total_packets * TS_PACKET_SIZE
print(f"\n\nMonitor Summary")
print(f"{'=' * 40}")
print(f" Duration: {elapsed:.1f}s")
print(f" Packets: {total_packets:,}")
print(f" Bytes: {total_bytes:,}")
print(f" Unique PIDs: {len(known_pids)}")
print(f" CC errors: {cc_errors}")
print(f" TEI errors: {tei_count}")
if elapsed > 0:
avg_bitrate = (total_bytes * 8) / elapsed
if avg_bitrate >= 1e6:
print(f" Avg bitrate: {avg_bitrate / 1e6:.2f} Mbps")
else:
print(f" Avg bitrate: {avg_bitrate / 1e3:.1f} kbps")
# -- CLI --
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="MPEG-2 Transport Stream analyzer for Genpix SkyWalker-1",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
examples:
%(prog)s capture.ts
%(prog)s analyze capture.ts
%(prog)s pids capture.ts
%(prog)s pat capture.ts
%(prog)s pmt capture.ts
%(prog)s dump capture.ts --pid 0x100 --count 5
%(prog)s monitor -
tune.py stream --stdout | %(prog)s monitor -
""")
parser.add_argument('-v', '--verbose', action='store_true',
help="Show sync search details and debug info")
sub = parser.add_subparsers(dest='command')
# analyze (default)
p_analyze = sub.add_parser('analyze', help="Full stream analysis (default)")
p_analyze.add_argument('input', help="TS file path or '-' for stdin")
p_analyze.add_argument('--max-packets', type=int, default=0,
help="Max packets to analyze (0 = all)")
# pids
p_pids = sub.add_parser('pids', help="Quick PID summary table")
p_pids.add_argument('input', help="TS file path or '-' for stdin")
p_pids.add_argument('--max-packets', type=int, default=0,
help="Max packets to analyze (0 = all)")
# pat
p_pat = sub.add_parser('pat', help="Parse Program Association Table")
p_pat.add_argument('input', help="TS file path or '-' for stdin")
# pmt
p_pmt = sub.add_parser('pmt', help="Parse Program Map Tables")
p_pmt.add_argument('input', help="TS file path or '-' for stdin")
# dump
p_dump = sub.add_parser('dump', help="Hex dump of TS packets")
p_dump.add_argument('input', help="TS file path or '-' for stdin")
p_dump.add_argument('--pid', type=lambda x: int(x, 0), default=None,
help="Filter by PID (hex: 0x100 or decimal: 256)")
p_dump.add_argument('--count', type=int, default=10,
help="Max packets to dump (default: 10)")
# monitor
p_monitor = sub.add_parser('monitor', help="Live stream monitoring")
p_monitor.add_argument('input', help="TS file path or '-' for stdin")
return parser
def main():
parser = build_parser()
# Handle bare filename without subcommand: default to 'analyze'
# Insert 'analyze' after any global flags but before the filename
subcmds = {'analyze', 'pids', 'pat', 'pmt', 'dump', 'monitor'}
argv = sys.argv[1:]
if argv:
first_pos = None
insert_idx = 0
for i, a in enumerate(argv):
if not a.startswith('-'):
first_pos = a
insert_idx = i
break
# Skip flag and its value if it takes one (currently none do)
insert_idx = i + 1
if first_pos is not None and first_pos not in subcmds:
argv.insert(insert_idx, 'analyze')
args = parser.parse_args(argv)
if not args.command:
parser.print_help()
sys.exit(1)
dispatch = {
'analyze': cmd_analyze,
'pids': cmd_pids,
'pat': cmd_pat,
'pmt': cmd_pmt,
'dump': cmd_dump,
'monitor': cmd_monitor,
}
handler = dispatch.get(args.command)
if handler is None:
parser.print_help()
sys.exit(1)
handler(args)
if __name__ == '__main__':
main()