From 1c27a8d15d595a0d756d7d0305b16922dfd7c165 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Sat, 14 Feb 2026 18:05:15 -0700 Subject: [PATCH] Add console-probe package for firmware console discovery Automated deep probe of Winegard firmware console interfaces. Discovers available commands via help parsing, submenu probing, and wordlist brute-force. Handles prompt-terminated serial I/O with the > character termination strategy. Modules: serial_io (prompt-aware I/O), discovery (auto-discovery), profile (DeviceProfile dataclass), report (JSON output), cli (argparse). --- src/console_probe/__init__.py | 1 + src/console_probe/cli.py | 362 ++++++++++++++ src/console_probe/discovery.py | 847 +++++++++++++++++++++++++++++++++ src/console_probe/profile.py | 34 ++ src/console_probe/report.py | 105 ++++ src/console_probe/serial_io.py | 105 ++++ 6 files changed, 1454 insertions(+) create mode 100644 src/console_probe/__init__.py create mode 100644 src/console_probe/cli.py create mode 100644 src/console_probe/discovery.py create mode 100644 src/console_probe/profile.py create mode 100644 src/console_probe/report.py create mode 100644 src/console_probe/serial_io.py diff --git a/src/console_probe/__init__.py b/src/console_probe/__init__.py new file mode 100644 index 0000000..6b661e6 --- /dev/null +++ b/src/console_probe/__init__.py @@ -0,0 +1 @@ +"""Generic embedded console command prober.""" diff --git a/src/console_probe/cli.py b/src/console_probe/cli.py new file mode 100644 index 0000000..13491af --- /dev/null +++ b/src/console_probe/cli.py @@ -0,0 +1,362 @@ +"""CLI entry point for the embedded console probe.""" + +from __future__ import annotations + +import argparse +import sys +from pathlib import Path + +import serial # pyright: ignore[reportMissingImports] + +from console_probe.discovery import ( + auto_discover, + discover_submenu_help, + enter_submenu, + generate_candidates, + navigate_to_root, + parse_help_output, + probe_commands, +) +from console_probe.profile import DeviceProfile +from console_probe.report import write_json_report +from console_probe.serial_io import send_cmd + +LINE_ENDINGS = {"cr": "\r", "lf": "\n", "crlf": "\r\n"} + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Probe for hidden commands on an embedded console", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog="""\ +examples: + %(prog)s --port /dev/ttyUSB2 --baud 115200 + %(prog)s --deep + %(prog)s --discover-only --json /tmp/discover.json + %(prog)s --submenu mot + %(prog)s --wordlist scripts/wordlists/winegard.txt + %(prog)s --prompt "U-Boot>" --error "Unknown command" +""", + ) + + conn = parser.add_argument_group("connection") + conn.add_argument( + "--port", default="/dev/ttyUSB0", help="Serial port (default: /dev/ttyUSB0)" + ) + conn.add_argument( + "--baud", type=int, default=115200, help="Baud rate (default: 115200)" + ) + conn.add_argument( + "--line-ending", + choices=LINE_ENDINGS, + default="cr", + help="Line ending to send (default: cr)", + ) + + disc = parser.add_argument_group("discovery overrides") + disc.add_argument( + "--prompt", default=None, help="Override auto-detected root prompt" + ) + disc.add_argument( + "--error", default=None, help="Override auto-detected error string" + ) + disc.add_argument( + "--help-cmd", default="?", help="Command to request help (default: ?)" + ) + disc.add_argument( + "--exit-cmd", default="q", help="Command to exit submenu (default: q)" + ) + + probe = parser.add_argument_group("probing") + probe.add_argument( + "--discover-only", + action="store_true", + help="Discover commands via help only (no brute-force probing)", + ) + probe.add_argument( + "--deep", action="store_true", help="Probe all discovered submenus" + ) + probe.add_argument( + "--submenu", type=str, default=None, help="Probe a single submenu by name" + ) + probe.add_argument( + "--timeout", + type=float, + default=0.5, + help="Per-command timeout in seconds (default: 0.5)", + ) + probe.add_argument( + "--blocklist", + default="reboot,stow,def,q,Q", + help="Comma-separated commands to never send (default: reboot,stow,def,q,Q)", + ) + probe.add_argument( + "--wordlist", + action="append", + default=None, + metavar="FILE", + help="Extra candidate words file (one per line, repeatable)", + ) + probe.add_argument( + "--bundled", + action="append", + default=None, + metavar="FILE", + help="Bundled wordlist file (alias for --wordlist, repeatable)", + ) + + output = parser.add_argument_group("output") + output.add_argument( + "--json", metavar="FILE", default=None, help="Write results as JSON to FILE" + ) + + return parser.parse_args() + + +def _run_discover_only( + ser: serial.Serial, + profile: DeviceProfile, + help_cmd: str, +) -> dict[str, list[tuple[str, str]]]: + """Discover commands via submenu help only (no brute-force). + + Enters each discovered submenu, queries help, stores results in + ``profile.submenu_help``, and prints a summary. + + Returns an empty results dict (no probe hits). + """ + all_results: dict[str, list[tuple[str, str]]] = {} + total_commands = 0 + + submenus = list(profile.submenus) + if not submenus: + print(" No submenus to discover. Use --deep for brute-force probing.") + return all_results + + print(f"\nPhase 2: Discovering commands in {len(submenus)} submenus...\n") + + for menu in submenus: + label = menu.upper() + sub_prompt = enter_submenu(ser, menu, profile) + print(f" [{label}] Prompt: {sub_prompt}") + + entries = discover_submenu_help(ser, menu, profile, help_cmd=help_cmd) + profile.submenu_help[label] = entries + total_commands += len(entries) + + if entries: + print(f" [{label}] Found {len(entries)} commands:") + for entry in entries: + params = f" {entry.params}" if entry.params else "" + desc = f" -- {entry.description}" if entry.description else "" + print(f" {entry.name}{params}{desc}") + else: + print(f" [{label}] No commands found in help output.") + + # Empty probe results for this menu (discover-only) + all_results[label] = [] + + navigate_to_root(ser, profile) + print() + + print( + f"Discovery complete: {total_commands} commands across {len(submenus)} submenus" + ) + return all_results + + +def _run_deep_probe( + ser: serial.Serial, + profile: DeviceProfile, + candidates: list[str], + submenus_to_probe: list[str], + help_cmd: str, + timeout: float, +) -> dict[str, list[tuple[str, str]]]: + """Run submenu help discovery, then brute-force probe. + + For each submenu: discover help first, then probe candidates. + This populates both ``profile.submenu_help`` and the probe results, + enabling the report to show ``undiscovered`` commands (probe-only). + """ + all_results: dict[str, list[tuple[str, str]]] = {} + + for menu in submenus_to_probe: + label = menu.upper() + print(f"\n=== {label} submenu ===\n") + sub_prompt = enter_submenu(ser, menu, profile) + print(f" Prompt: {sub_prompt}") + + # Step 1: Discover via help + print(" Querying help...") + entries = discover_submenu_help(ser, menu, profile, help_cmd=help_cmd) + profile.submenu_help[label] = entries + help_names = {e.name for e in entries} + names_str = ", ".join(sorted(help_names)) + print(f" Help discovered {len(entries)} commands: {names_str}") + + # Step 2: Brute-force probe + print(f" Probing {len(candidates)} candidates...") + sub_hits = probe_commands( + ser, candidates, sub_prompt, label, profile, timeout=timeout + ) + all_results[label] = sub_hits + + # Classify + undiscovered = [ + (cmd, resp) for cmd, resp in sub_hits if cmd.lower() not in help_names + ] + + print(f"\n--- {label} Results ---") + print(f" Help commands: {len(entries)}") + print(f" Probe hits: {len(sub_hits)}") + print(f" Undiscovered (probe-only): {len(undiscovered)}") + for cmd, resp in undiscovered: + print(f" '{cmd}' -> {resp}") + + navigate_to_root(ser, profile) + + return all_results + + +def main() -> None: + args = parse_args() + + profile = DeviceProfile( + port=args.port, + baud=args.baud, + line_ending=LINE_ENDINGS[args.line_ending], + exit_cmd=args.exit_cmd, + ) + + if args.prompt: + profile.root_prompt = args.prompt + profile.prompts = [args.prompt] + if args.error: + profile.error_string = args.error + + # Merge --bundled into --wordlist + wordlist_raw = (args.wordlist or []) + (args.bundled or []) + wordlist_paths = [Path(p) for p in wordlist_raw] if wordlist_raw else None + + # Build candidates (skip if discover-only) + blocklist = {w.strip() for w in args.blocklist.split(",") if w.strip()} + if not args.discover_only: + candidates = generate_candidates(blocklist, wordlist_paths) + print(f"Generated {len(candidates)} candidate commands\n") + else: + candidates = [] + + # Open serial + ser = serial.Serial(profile.port, profile.baud, timeout=1) + ser.reset_input_buffer() + + all_results: dict[str, list[tuple[str, str]]] = {} + + try: + # Auto-discovery + if not profile.root_prompt or not profile.error_string: + profile = auto_discover(ser, profile) + else: + print("Using overrides:") + print(f" Root prompt: {profile.root_prompt}") + print(f' Error string: "{profile.error_string}"') + + print(f" Sending help command: {args.help_cmd}") + help_resp = send_cmd(ser, args.help_cmd, profile, timeout=2.0) + commands, submenus = parse_help_output(help_resp, profile) + profile.known_commands = commands + if not profile.submenus: + profile.submenus = submenus + for sub in profile.submenus: + sub_prompt = f"{sub.upper()}>" + if sub_prompt not in profile.prompts: + profile.prompts.append(sub_prompt) + + if commands: + print( + f" Known commands ({len(commands)}): {', '.join(sorted(commands))}" + ) + if submenus: + print(f" Submenus ({len(submenus)}): {', '.join(submenus)}") + print() + + if not profile.root_prompt: + print( + "ERROR: Could not determine root prompt. Use --prompt to specify.", + file=sys.stderr, + ) + sys.exit(1) + + prompt = navigate_to_root(ser, profile) + print(f"Starting at: {prompt}\n") + + # --- Discover-only mode --- + if args.discover_only: + all_results = _run_discover_only(ser, profile, args.help_cmd) + + if args.json: + write_json_report(Path(args.json), profile, all_results) + return + + # --- Standard probing --- + root_label = profile.root_prompt.rstrip(">$#").strip() or "ROOT" + print( + f"=== Probing {profile.root_prompt} (root) " + f"-- {len(candidates)} candidates ===\n" + ) + + root_hits = probe_commands( + ser, + candidates, + profile.root_prompt, + root_label, + profile, + timeout=args.timeout, + ) + all_results[root_label] = root_hits + + unknown_hits = [ + (cmd, resp) + for cmd, resp in root_hits + if cmd.lower() not in profile.known_commands + ] + known_count = len(root_hits) - len(unknown_hits) + + print("\n--- Root Results ---") + print(f" Total hits: {len(root_hits)}") + print(f" Known commands: {known_count}") + print(f" UNKNOWN commands: {len(unknown_hits)}") + for cmd, resp in unknown_hits: + print(f" '{cmd}' -> {resp}") + + # Submenu probing + submenus_to_probe: list[str] = [] + if args.submenu: + submenus_to_probe.append(args.submenu) + elif args.deep: + submenus_to_probe = list(profile.submenus) + + if submenus_to_probe: + sub_results = _run_deep_probe( + ser, + profile, + candidates, + submenus_to_probe, + args.help_cmd, + args.timeout, + ) + all_results.update(sub_results) + + if args.json: + write_json_report(Path(args.json), profile, all_results) + + except KeyboardInterrupt: + print("\n\nInterrupted.") + finally: + ser.close() + print("\nPort closed.") + + +if __name__ == "__main__": + main() diff --git a/src/console_probe/discovery.py b/src/console_probe/discovery.py new file mode 100644 index 0000000..25fd847 --- /dev/null +++ b/src/console_probe/discovery.py @@ -0,0 +1,847 @@ +"""Auto-discovery, help parsing, navigation, and probing for embedded consoles.""" + +from __future__ import annotations + +import re +import sys +import time + +import serial # pyright: ignore[reportMissingImports] + +from console_probe.profile import DeviceProfile, HelpEntry +from console_probe.serial_io import PROMPT_RE, detect_prompt, send_cmd + +# --------------------------------------------------------------------------- +# Help output parsing +# --------------------------------------------------------------------------- + +# Common parameter placeholder names that should never be treated as commands. +# These appear in help text like ``help []`` or ``set ``. +_PARAM_PLACEHOLDERS: set[str] = { + "command", + "commands", + "parameter", + "parameters", + "value", + "values", + "index", + "name", + "arg", + "args", + "argument", + "arguments", + "option", + "options", + "number", + "string", + "text", + "file", + "path", + "addr", + "address", + "size", + "count", + "offset", + "length", + "data", + "byte", + "bytes", + "word", + "type", + "mode", + "level", + "pin", + "port", + "id", + "key", +} + + +def _inside_brackets(text: str, pos: int) -> bool: + """Return True if *pos* falls inside ``[...]`` in *text*. + + Counts unmatched ``[`` characters before *pos*. If the count is odd, + the position is inside brackets (parameter syntax). + """ + depth = 0 + for i in range(pos): + ch = text[i] + if ch == "[": + depth += 1 + elif ch == "]": + depth = max(0, depth - 1) + return depth > 0 + + +# Shared regex patterns for help parsing +_CMD_DASH_RE = re.compile(r"^\s*(\w[\w.]*)\s+[-\u2014]\s+") +_CMD_SPACES_RE = re.compile(r"^\s{0,4}(\w[\w.]*)\s{2,}") +_BARE_CMD_RE = re.compile(r"^\s{0,4}(\w[\w.]*)\s*$") +_BRACKET_RE = re.compile(r"<(\w[\w.]*)>") +_ENTER_RE = re.compile(r"[Ee]nter\s+?", re.IGNORECASE) +_MENU_RE = re.compile(r"(\w+)\s+[Mm]enu") +_SUBMENU_RE = re.compile(r"(\w+)\s+[Ss]ub-?[Mm]enu") + + +def parse_help_output( + help_text: str, + profile: DeviceProfile, +) -> tuple[set[str], list[str]]: + """Parse help output for command names and submenu hints. + + Returns (known_commands, submenu_names). + """ + commands: set[str] = set() + submenus: list[str] = [] + + for line in help_text.split("\n"): + stripped = line.strip() + if not stripped: + continue + if PROMPT_RE.fullmatch(stripped): + continue + + # Try angle-bracket commands first (e.g. "Enter - ...") + bracket_match = _BRACKET_RE.search(stripped) + if bracket_match: + cmd_name = bracket_match.group(1).lower() + # Reject if inside [...] brackets or is a known placeholder + if ( + len(cmd_name) <= 20 + and not _inside_brackets(stripped, bracket_match.start()) + and cmd_name not in _PARAM_PLACEHOLDERS + ): + commands.add(cmd_name) + else: + # Fall back to generic patterns + for pat in (_CMD_DASH_RE, _CMD_SPACES_RE, _BARE_CMD_RE): + m = pat.match(stripped) + if m: + cmd_name = m.group(1).lower() + if len(cmd_name) <= 20: + commands.add(cmd_name) + break + + # Look for submenu hints + enter_match = _ENTER_RE.search(stripped) + if enter_match: + sub = enter_match.group(1).lower() + if sub not in submenus and len(sub) <= 20: + submenus.append(sub) + else: + for pat in (_SUBMENU_RE, _MENU_RE): + m = pat.search(stripped) + if m: + sub = m.group(1).lower() + if sub not in submenus and len(sub) <= 20: + submenus.append(sub) + break + + return commands, submenus + + +def parse_help_structured( + help_text: str, + profile: DeviceProfile, +) -> list[HelpEntry]: + """Parse help output into structured entries with descriptions and params. + + Same regex logic as ``parse_help_output`` but captures description text + (everything after `` - `` or double-space separator) and parameter syntax + (text in ``[...]`` / ``<...>``). + """ + entries: list[HelpEntry] = [] + seen: set[str] = set() + + # Extended patterns that capture descriptions + # "command - description" or "command --- description" + cmd_desc_re = re.compile(r"^\s*(\w[\w.]*)\s+[-\u2014]+\s+(.*)") + # "command Description text" (2+ spaces after command) + cmd_space_desc_re = re.compile(r"^\s{0,4}(\w[\w.]*)\s{2,}(\S.*)") + + # Parameter syntax: text in [...] or <...> on the same line as the command + param_re = re.compile(r"(\[.*?\]|<\w[\w.]*>)") + + for line in help_text.split("\n"): + stripped = line.strip() + if not stripped: + continue + if PROMPT_RE.fullmatch(stripped): + continue + + cmd_name: str | None = None + description = "" + + # Try angle-bracket commands: "Enter - Motor driver" + bracket_match = _BRACKET_RE.search(stripped) + if bracket_match: + candidate = bracket_match.group(1).lower() + if ( + len(candidate) <= 20 + and not _inside_brackets(stripped, bracket_match.start()) + and candidate not in _PARAM_PLACEHOLDERS + ): + cmd_name = candidate + # Description is everything after the bracket match + separator + rest = stripped[bracket_match.end() :] + desc_m = re.match(r"\s*[-\u2014]+\s*(.*)", rest) + if desc_m: + description = desc_m.group(1).strip() + elif rest.strip(): + description = rest.strip() + + if not cmd_name: + # "command - description" + m = cmd_desc_re.match(stripped) + if m: + cmd_name = m.group(1).lower() + description = m.group(2).strip() + else: + # "command Description" + m = cmd_space_desc_re.match(stripped) + if m: + cmd_name = m.group(1).lower() + description = m.group(2).strip() + else: + # Bare command name + m = _BARE_CMD_RE.match(stripped) + if m: + cmd_name = m.group(1).lower() + + if cmd_name and len(cmd_name) <= 20 and cmd_name not in seen: + # Extract parameter syntax from the full line + params_parts = param_re.findall(stripped) + # Filter out parts that are just the command name in brackets + params = " ".join( + p for p in params_parts if p.strip("[]<>").lower() != cmd_name + ) + + seen.add(cmd_name) + entries.append( + HelpEntry( + name=cmd_name, + description=description, + params=params, + ) + ) + + return entries + + +# --------------------------------------------------------------------------- +# Submenu help discovery +# --------------------------------------------------------------------------- + + +def discover_submenu_help( + ser: serial.Serial, + menu: str, + profile: DeviceProfile, + help_cmd: str = "?", + extra_help_cmds: list[str] | None = None, +) -> list[HelpEntry]: + """Query help in the current submenu and parse structured entries. + + Assumes the caller has already navigated into the submenu. + + Sends *help_cmd* (default ``?``), then tries *extra_help_cmds* + (default ``["man"]``) for firmware with paginated help (e.g. DVB submenu). + Merges results from all help commands, deduplicating by command name. + """ + if extra_help_cmds is None: + extra_help_cmds = ["man"] + + # Primary help + help_resp = send_cmd(ser, help_cmd, profile, timeout=3.0) + entries = parse_help_structured(help_resp, profile) + + # Try extra help commands for multi-page help + seen_names = {e.name for e in entries} + for extra_cmd in extra_help_cmds: + extra_resp = send_cmd(ser, extra_cmd, profile, timeout=3.0) + extra_entries = parse_help_structured(extra_resp, profile) + + # Only merge if the response looks like real help output + # (at least 3 entries and has new commands) + new_entries = [e for e in extra_entries if e.name not in seen_names] + if len(extra_entries) >= 3 and new_entries: + entries.extend(new_entries) + seen_names.update(e.name for e in new_entries) + + return entries + + +# --------------------------------------------------------------------------- +# Error string detection +# --------------------------------------------------------------------------- + + +def detect_error_string(ser: serial.Serial, profile: DeviceProfile) -> str | None: + """Send a garbage command and extract the error message template.""" + resp = send_cmd(ser, "__xyzzy_probe__", profile, timeout=1.0) + + # Strip the echo of our command and any prompt + lines = resp.replace("__xyzzy_probe__", "").strip().split("\n") + content_lines = [] + for line in lines: + stripped = line.strip() + if not stripped: + continue + if PROMPT_RE.fullmatch(stripped): + continue + stripped = PROMPT_RE.sub("", stripped).strip() + if stripped: + content_lines.append(stripped) + + if content_lines: + return content_lines[0].strip() + return None + + +# --------------------------------------------------------------------------- +# Auto-discovery +# --------------------------------------------------------------------------- + + +def auto_discover(ser: serial.Serial, profile: DeviceProfile) -> DeviceProfile: + """Run the discovery sequence and populate the profile.""" + print("Phase 1: Auto-discovering device console...\n") + + prompt = detect_prompt(ser, profile) + if prompt: + profile.root_prompt = prompt + profile.prompts = [prompt] + print(f" Root prompt: {prompt}") + else: + print(" WARNING: Could not detect root prompt.") + print(" Use --prompt to specify manually.") + + err = detect_error_string(ser, profile) + if err: + profile.error_string = err + print(f' Error string: "{err}"') + else: + print(" WARNING: Could not detect error string.") + print(" Use --error to specify manually.") + + print(" Sending help command: ?") + help_resp = send_cmd(ser, "?", profile, timeout=2.0) + + commands, submenus = parse_help_output(help_resp, profile) + if commands: + profile.known_commands = commands + print(f" Known commands ({len(commands)}): {', '.join(sorted(commands))}") + else: + print(" No commands parsed from help output.") + + if submenus: + profile.submenus = submenus + print(f" Detected submenus ({len(submenus)}): {', '.join(submenus)}") + else: + print(" No submenus detected from help output.") + + # Build prompt list from submenus + for sub in profile.submenus: + sub_prompt = f"{sub.upper()}>" + if sub_prompt not in profile.prompts: + profile.prompts.append(sub_prompt) + + print() + return profile + + +# --------------------------------------------------------------------------- +# Navigation +# --------------------------------------------------------------------------- + + +def navigate_to_root(ser: serial.Serial, profile: DeviceProfile) -> str: + """Send exit command until we're at the root prompt.""" + # Check if already at root (bare CR won't kill anything) + resp = send_cmd(ser, "", profile) + if profile.root_prompt and profile.root_prompt in resp: + return profile.root_prompt + + for _ in range(5): + resp = send_cmd(ser, profile.exit_cmd, profile) + if profile.root_prompt and profile.root_prompt in resp: + return profile.root_prompt + last_line = resp.strip().split("\n")[-1].strip() + m = PROMPT_RE.search(last_line) + if m: + detected = m.group(1) + if not profile.root_prompt: + profile.root_prompt = detected + if detected == profile.root_prompt: + return detected + + # Last resort + resp = send_cmd(ser, "", profile) + last_line = resp.strip().split("\n")[-1].strip() + return last_line + + +def enter_submenu(ser: serial.Serial, menu: str, profile: DeviceProfile) -> str: + """Enter a submenu and return the prompt we land on.""" + navigate_to_root(ser, profile) + resp = send_cmd(ser, menu, profile) + lines = resp.strip().split("\n") + last = lines[-1].strip() if lines else "" + + m = PROMPT_RE.search(last) + if m: + new_prompt = m.group(1) + if new_prompt not in profile.prompts: + profile.prompts.append(new_prompt) + return new_prompt + return last + + +# --------------------------------------------------------------------------- +# Response cleaning +# --------------------------------------------------------------------------- + + +def clean_response( + resp: str, + cmd: str, + profile: DeviceProfile, +) -> str: + """Strip echo, prompts, and whitespace from a response.""" + clean = resp + + for suffix in ("\r\n", "\r", "\n", ""): + clean = clean.replace(f"{cmd}{suffix}", "", 1) + + clean = clean.strip() + + for p in profile.prompts: + clean = clean.replace(p, "") + + clean = PROMPT_RE.sub("", clean) + return clean.strip() + + +# --------------------------------------------------------------------------- +# Probing +# --------------------------------------------------------------------------- + + +def probe_commands( + ser: serial.Serial, + candidates: list[str], + prompt: str, + label: str, + profile: DeviceProfile, + timeout: float = 0.5, +) -> list[tuple[str, str]]: + """Probe candidates at the current menu level. Returns (cmd, preview) hits.""" + hits = [] + total = len(candidates) + + for i, cmd in enumerate(candidates): + if (i + 1) % 50 == 0: + print(f" [{label}] Progress: {i + 1}/{total}...", flush=True) + + resp = send_cmd(ser, cmd, profile, timeout=timeout) + clean = clean_response(resp, cmd, profile) + + is_error = profile.error_string and profile.error_string in resp + if not is_error and clean: + preview = clean[:100].replace("\r\n", " | ").replace("\n", " | ") + hits.append((cmd, preview)) + print(f" *** HIT: '{cmd}' -> {preview}", flush=True) + + # Recover from submenu exits + if "Terminating shell" in resp or "exiting" in resp.lower(): + if profile.root_prompt and prompt != profile.root_prompt: + print(f" ('{cmd}' exited submenu, re-entering...)") + menu_name = prompt.replace(">", "").strip().lower() + enter_submenu(ser, menu_name, profile) + else: + print(f" ('{cmd}' terminated shell, waiting for restart...)") + time.sleep(1.0) + check = send_cmd(ser, "", profile) + if profile.root_prompt and profile.root_prompt not in check: + print( + " WARNING: Shell did not restart. Remaining " + "results may be incomplete.", + flush=True, + ) + + return hits + + +# --------------------------------------------------------------------------- +# Candidate generation +# --------------------------------------------------------------------------- + + +def generate_candidates( + blocklist: set[str], + wordlist_paths: list | None = None, +) -> list[str]: + """Build the candidate command list. + + Includes generic embedded debug commands + single chars + two-letter combos. + Merges in any external wordlist files. Applies blocklist last. + """ + import string + from pathlib import Path + + candidates: list[str] = [] + + # Single characters + candidates.extend(list(string.ascii_lowercase)) + candidates.extend(list(string.ascii_uppercase)) + candidates.extend(list(string.digits)) + + # Generic embedded debug commands + generic = [ + # Memory access + "md", + "mw", + "mm", + "mr", + "mem", + "peek", + "poke", + "rd", + "wr", + "read", + "write", + "dump", + "load", + "save", + "md.b", + "md.w", + "md.l", + "x", + "xx", + "xd", + # Flash + "flash", + "fl", + "erase", + "program", + "verify", + "protect", + "flinfo", + "fldump", + "flashdump", + # Boot / system + "boot", + "reboot", + "reset", + "go", + "run", + "exec", + "jump", + "bootd", + "bootm", + "bootp", + "version", + "ver", + "info", + "about", + "sysinfo", + "uptime", + "date", + "time", + "clk", + "clock", + # Debug + "debug", + "dbg", + "trace", + "log", + "print", + "echo", + "test", + "diag", + "selftest", + "bist", + "bench", + "assert", + "crash", + "fault", + "panic", + # Shell / OS + "sh", + "shell", + "cmd", + "command", + "cli", + "task", + "tasks", + "ps", + "top", + "threads", + "kill", + "suspend", + "resume", + "heap", + "stack", + "free", + "malloc", + "meminfo", + "cpu", + "cpuinfo", + "temp", + "temperature", + # Network / comms + "ping", + "net", + "ifconfig", + "ip", + "mac", + "uart", + "serial", + "spi", + "i2c", + "can", + # Service / factory + "factory", + "service", + "mfg", + "production", + "prod", + "cal", + "calibrate", + "calibration", + "config", + "cfg", + "setup", + "settings", + "hidden", + "secret", + "admin", + "su", + "root", + "login", + "password", + "passwd", + "auth", + "unlock", + # Update + "update", + "upgrade", + "firmware", + "fw", + "ota", + "download", + "upload", + "xmodem", + "ymodem", + "zmodem", + "tftp", + "ftp", + # Generic hardware + "sw", + "hw", + "id", + "sn", + "help", + "man", + "usage", + # Two-letter combos + "bl", + "bt", + "db", + "dm", + "dp", + "ds", + "dt", + "eb", + "ed", + "ee", + "ef", + "em", + "en", + "ep", + "er", + "es", + "et", + "fa", + "fb", + "fc", + "fd", + "fe", + "ff", + "fg", + "fh", + "fi", + "fj", + "ga", + "gb", + "gc", + "gd", + "ge", + "gf", + "gg", + "gh", + "gi", + "gj", + "ha", + "hb", + "hc", + "hd", + "he", + "hf", + "hg", + "hh", + "hi", + "hj", + "ia", + "ib", + "ic", + "io", + "ir", + "ka", + "kb", + "kc", + "kd", + "ke", + "la", + "lb", + "lc", + "ld", + "le", + "lf", + "lg", + "lh", + "li", + "lj", + "ma", + "mb", + "mc", + "me", + "mf", + "mg", + "mh", + "mi", + "mj", + "na", + "nb", + "nc", + "nd", + "ne", + "nf", + "ng", + "nh", + "ni", + "nj", + "oa", + "ob", + "oc", + "od", + "oe", + "of", + "og", + "oh", + "oi", + "oj", + "pa", + "pb", + "pc", + "pd", + "pe", + "pf", + "pg", + "ph", + "pi", + "pj", + "ra", + "rb", + "rc", + "re", + "rf", + "rg", + "rh", + "ri", + "rj", + "sa", + "sb", + "sc", + "sd", + "se", + "sf", + "sg", + "si", + "sj", + "ta", + "tb", + "tc", + "td", + "te", + "tf", + "tg", + "th", + "ti", + "tj", + "ua", + "ub", + "uc", + "ud", + "ue", + "uf", + "ug", + "uh", + "ui", + "uj", + "va", + "vb", + "vc", + "vd", + "ve", + "vf", + "vg", + "vh", + "vi", + "vj", + "wa", + "wb", + "wc", + "wd", + "we", + "wf", + "wg", + "wh", + "wi", + "wj", + "za", + "zb", + "zc", + "zd", + "ze", + "zf", + ] + candidates.extend(generic) + + # Merge external wordlists + if wordlist_paths: + for wl_path in wordlist_paths: + p = Path(wl_path) if not isinstance(wl_path, Path) else wl_path + try: + text = p.read_text() + for line in text.split("\n"): + word = line.strip() + if word and not word.startswith("#"): + candidates.append(word) + except OSError as exc: + print( + f"WARNING: Could not read wordlist {p}: {exc}", + file=sys.stderr, + ) + + # Deduplicate preserving order + seen: set[str] = set() + unique: list[str] = [] + for c in candidates: + if c not in seen: + seen.add(c) + unique.append(c) + + # Apply blocklist + unique = [c for c in unique if c not in blocklist] + return unique diff --git a/src/console_probe/profile.py b/src/console_probe/profile.py new file mode 100644 index 0000000..a878660 --- /dev/null +++ b/src/console_probe/profile.py @@ -0,0 +1,34 @@ +"""Device profile and data structures for console probing.""" + +from __future__ import annotations + +from dataclasses import dataclass, field + + +@dataclass +class HelpEntry: + """A single command parsed from firmware help output. + + Captures the command name, any parameter syntax shown in brackets/angles, + and the description text (everything after the separator). + """ + + name: str # command name (lowercase) + description: str = "" # help description text + params: str = "" # parameter syntax from brackets, e.g. "[ [angle]]" + + +@dataclass +class DeviceProfile: + """Everything we know (or detected) about the attached console.""" + + port: str = "/dev/ttyUSB0" + baud: int = 115200 + root_prompt: str = "" # e.g. "TRK>" + prompts: list[str] = field(default_factory=list) # all known prompts + error_string: str = "" # e.g. "Invalid command." + known_commands: set[str] = field(default_factory=set) # from help output + submenus: list[str] = field(default_factory=list) # detected submenu names + exit_cmd: str = "q" + line_ending: str = "\r" + submenu_help: dict[str, list[HelpEntry]] = field(default_factory=dict) diff --git a/src/console_probe/report.py b/src/console_probe/report.py new file mode 100644 index 0000000..e28b9f1 --- /dev/null +++ b/src/console_probe/report.py @@ -0,0 +1,105 @@ +"""JSON report generation for console probe results.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from console_probe.profile import DeviceProfile + +FORMAT_VERSION = 2 + + +def build_report( + profile: DeviceProfile, + results: dict[str, list[tuple[str, str]]], +) -> dict: + """Build the full JSON report structure. + + ``results`` maps menu labels (e.g. "TRK", "MOT") to lists of + ``(cmd, preview)`` probe hits. + + When ``profile.submenu_help`` is populated, the report includes a + ``menus`` section with per-submenu ``help_commands``, ``probe_hits``, + ``undiscovered`` (probe-only commands not in help), and stats. + """ + report: dict = { + "format_version": FORMAT_VERSION, + "device": {"port": profile.port, "baud": profile.baud}, + "detected": { + "root_prompt": profile.root_prompt, + "error_string": profile.error_string, + "known_commands": sorted(profile.known_commands), + "submenus": profile.submenus, + }, + } + + # Per-submenu structured output + if profile.submenu_help: + menus: dict = {} + for label, help_entries in profile.submenu_help.items(): + help_names = {e.name for e in help_entries} + + # Probe hits for this menu (may be empty if --discover-only) + probe_hits = results.get(label, []) + + # Undiscovered = probe hits whose names are NOT in help output + undiscovered = [ + {"cmd": cmd, "response": resp} + for cmd, resp in probe_hits + if cmd.lower() not in help_names + ] + + menus[label] = { + "prompt": f"{label}>", + "help_commands": [ + { + "cmd": e.name, + "description": e.description, + "params": e.params, + } + for e in help_entries + ], + "probe_hits": [ + {"cmd": cmd, "response": resp} for cmd, resp in probe_hits + ], + "undiscovered": undiscovered, + "stats": { + "help_count": len(help_entries), + "probe_count": len(probe_hits), + "undiscovered_count": len(undiscovered), + }, + } + + report["menus"] = menus + + # Legacy results section (backward compat) + report["results"] = {} + for label, hits in results.items(): + known_hits = [ + (cmd, resp) for cmd, resp in hits if cmd.lower() in profile.known_commands + ] + unknown_hits = [ + (cmd, resp) + for cmd, resp in hits + if cmd.lower() not in profile.known_commands + ] + report["results"][label] = { + "total_hits": len(hits), + "known": len(known_hits), + "unknown": len(unknown_hits), + "hits": [{"cmd": cmd, "response": resp} for cmd, resp in hits], + } + + return report + + +def write_json_report( + path: Path, + profile: DeviceProfile, + results: dict[str, list[tuple[str, str]]], +) -> None: + """Write machine-readable JSON probe report.""" + report = build_report(profile, results) + path.write_text(json.dumps(report, indent=2) + "\n") + print(f"\nJSON report written to {path}") diff --git a/src/console_probe/serial_io.py b/src/console_probe/serial_io.py new file mode 100644 index 0000000..e499b20 --- /dev/null +++ b/src/console_probe/serial_io.py @@ -0,0 +1,105 @@ +"""Serial I/O for prompt-terminated embedded consoles.""" + +from __future__ import annotations + +import re +import time + +import serial # pyright: ignore[reportMissingImports] + +from console_probe.profile import DeviceProfile + +PROMPT_RE = re.compile(r"(\S+[>$#])\s*$") + + +def _is_prompt_terminated(text: str, profile: DeviceProfile) -> bool: + """Check if *text* ends with a firmware prompt (not parameter syntax). + + When ``profile.prompts`` is populated, checks whether the last line ends + with one of the known prompt strings. Also accepts any PROMPT_RE match + on the last line **if** that line contains no ``[`` bracket (which would + indicate parameter syntax like ``[]``). + + When ``profile.prompts`` is empty (initial discovery phase), falls back + to the original heuristic: ``stripped.endswith(">")``. + """ + stripped = text.rstrip() + if not stripped: + return False + + last_line = stripped.split("\n")[-1] + + if profile.prompts: + # Check known prompts first (fast path) + last_stripped = last_line.rstrip() + for p in profile.prompts: + if last_stripped.endswith(p): + return True + + # Accept a PROMPT_RE match only if no brackets on that line + if "[" not in last_line: + m = PROMPT_RE.search(last_line) + if m: + return True + + return False + + # No known prompts yet — fallback to bare > check + return stripped.endswith(">") + + +def send_cmd( + ser: serial.Serial, + cmd: str, + profile: DeviceProfile, + timeout: float = 1.0, +) -> str: + """Send *cmd* + line-ending, read until a known prompt or timeout.""" + ser.reset_input_buffer() + ser.write(f"{cmd}{profile.line_ending}".encode("ascii", errors="replace")) + ser.timeout = timeout + + buf = bytearray() + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + chunk = ser.read(4096) + if chunk: + buf.extend(chunk) + text = buf.decode("utf-8", errors="replace") + if _is_prompt_terminated(text, profile): + break + elif buf: + text = buf.decode("utf-8", errors="replace") + if _is_prompt_terminated(text, profile): + break + + return buf.decode("utf-8", errors="replace") + + +def detect_prompt(ser: serial.Serial, profile: DeviceProfile) -> str | None: + """Send a bare line-ending and extract the prompt from the response.""" + ser.reset_input_buffer() + ser.write(profile.line_ending.encode("ascii")) + ser.timeout = 2.0 + + buf = bytearray() + deadline = time.monotonic() + 2.0 + while time.monotonic() < deadline: + chunk = ser.read(4096) + if chunk: + buf.extend(chunk) + text = buf.decode("utf-8", errors="replace") + tail = text.rstrip() + if tail.endswith((">", "#", "$")): + break + elif buf: + break + + text = buf.decode("utf-8", errors="replace").strip() + if not text: + return None + + # Take the last line, look for a prompt-like token + last_line = text.split("\n")[-1].strip() + m = PROMPT_RE.search(last_line) + return m.group(1) if m else (last_line if last_line else None)