Add console-probe package for firmware console discovery
Automated deep probe of Winegard firmware console interfaces. Discovers available commands via help parsing, submenu probing, and wordlist brute-force. Handles prompt-terminated serial I/O with the > character termination strategy. Modules: serial_io (prompt-aware I/O), discovery (auto-discovery), profile (DeviceProfile dataclass), report (JSON output), cli (argparse).
This commit is contained in:
parent
13a9d804c6
commit
1c27a8d15d
1
src/console_probe/__init__.py
Normal file
1
src/console_probe/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
"""Generic embedded console command prober."""
|
||||
362
src/console_probe/cli.py
Normal file
362
src/console_probe/cli.py
Normal file
@ -0,0 +1,362 @@
|
||||
"""CLI entry point for the embedded console probe."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import serial # pyright: ignore[reportMissingImports]
|
||||
|
||||
from console_probe.discovery import (
|
||||
auto_discover,
|
||||
discover_submenu_help,
|
||||
enter_submenu,
|
||||
generate_candidates,
|
||||
navigate_to_root,
|
||||
parse_help_output,
|
||||
probe_commands,
|
||||
)
|
||||
from console_probe.profile import DeviceProfile
|
||||
from console_probe.report import write_json_report
|
||||
from console_probe.serial_io import send_cmd
|
||||
|
||||
LINE_ENDINGS = {"cr": "\r", "lf": "\n", "crlf": "\r\n"}
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Probe for hidden commands on an embedded console",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""\
|
||||
examples:
|
||||
%(prog)s --port /dev/ttyUSB2 --baud 115200
|
||||
%(prog)s --deep
|
||||
%(prog)s --discover-only --json /tmp/discover.json
|
||||
%(prog)s --submenu mot
|
||||
%(prog)s --wordlist scripts/wordlists/winegard.txt
|
||||
%(prog)s --prompt "U-Boot>" --error "Unknown command"
|
||||
""",
|
||||
)
|
||||
|
||||
conn = parser.add_argument_group("connection")
|
||||
conn.add_argument(
|
||||
"--port", default="/dev/ttyUSB0", help="Serial port (default: /dev/ttyUSB0)"
|
||||
)
|
||||
conn.add_argument(
|
||||
"--baud", type=int, default=115200, help="Baud rate (default: 115200)"
|
||||
)
|
||||
conn.add_argument(
|
||||
"--line-ending",
|
||||
choices=LINE_ENDINGS,
|
||||
default="cr",
|
||||
help="Line ending to send (default: cr)",
|
||||
)
|
||||
|
||||
disc = parser.add_argument_group("discovery overrides")
|
||||
disc.add_argument(
|
||||
"--prompt", default=None, help="Override auto-detected root prompt"
|
||||
)
|
||||
disc.add_argument(
|
||||
"--error", default=None, help="Override auto-detected error string"
|
||||
)
|
||||
disc.add_argument(
|
||||
"--help-cmd", default="?", help="Command to request help (default: ?)"
|
||||
)
|
||||
disc.add_argument(
|
||||
"--exit-cmd", default="q", help="Command to exit submenu (default: q)"
|
||||
)
|
||||
|
||||
probe = parser.add_argument_group("probing")
|
||||
probe.add_argument(
|
||||
"--discover-only",
|
||||
action="store_true",
|
||||
help="Discover commands via help only (no brute-force probing)",
|
||||
)
|
||||
probe.add_argument(
|
||||
"--deep", action="store_true", help="Probe all discovered submenus"
|
||||
)
|
||||
probe.add_argument(
|
||||
"--submenu", type=str, default=None, help="Probe a single submenu by name"
|
||||
)
|
||||
probe.add_argument(
|
||||
"--timeout",
|
||||
type=float,
|
||||
default=0.5,
|
||||
help="Per-command timeout in seconds (default: 0.5)",
|
||||
)
|
||||
probe.add_argument(
|
||||
"--blocklist",
|
||||
default="reboot,stow,def,q,Q",
|
||||
help="Comma-separated commands to never send (default: reboot,stow,def,q,Q)",
|
||||
)
|
||||
probe.add_argument(
|
||||
"--wordlist",
|
||||
action="append",
|
||||
default=None,
|
||||
metavar="FILE",
|
||||
help="Extra candidate words file (one per line, repeatable)",
|
||||
)
|
||||
probe.add_argument(
|
||||
"--bundled",
|
||||
action="append",
|
||||
default=None,
|
||||
metavar="FILE",
|
||||
help="Bundled wordlist file (alias for --wordlist, repeatable)",
|
||||
)
|
||||
|
||||
output = parser.add_argument_group("output")
|
||||
output.add_argument(
|
||||
"--json", metavar="FILE", default=None, help="Write results as JSON to FILE"
|
||||
)
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def _run_discover_only(
|
||||
ser: serial.Serial,
|
||||
profile: DeviceProfile,
|
||||
help_cmd: str,
|
||||
) -> dict[str, list[tuple[str, str]]]:
|
||||
"""Discover commands via submenu help only (no brute-force).
|
||||
|
||||
Enters each discovered submenu, queries help, stores results in
|
||||
``profile.submenu_help``, and prints a summary.
|
||||
|
||||
Returns an empty results dict (no probe hits).
|
||||
"""
|
||||
all_results: dict[str, list[tuple[str, str]]] = {}
|
||||
total_commands = 0
|
||||
|
||||
submenus = list(profile.submenus)
|
||||
if not submenus:
|
||||
print(" No submenus to discover. Use --deep for brute-force probing.")
|
||||
return all_results
|
||||
|
||||
print(f"\nPhase 2: Discovering commands in {len(submenus)} submenus...\n")
|
||||
|
||||
for menu in submenus:
|
||||
label = menu.upper()
|
||||
sub_prompt = enter_submenu(ser, menu, profile)
|
||||
print(f" [{label}] Prompt: {sub_prompt}")
|
||||
|
||||
entries = discover_submenu_help(ser, menu, profile, help_cmd=help_cmd)
|
||||
profile.submenu_help[label] = entries
|
||||
total_commands += len(entries)
|
||||
|
||||
if entries:
|
||||
print(f" [{label}] Found {len(entries)} commands:")
|
||||
for entry in entries:
|
||||
params = f" {entry.params}" if entry.params else ""
|
||||
desc = f" -- {entry.description}" if entry.description else ""
|
||||
print(f" {entry.name}{params}{desc}")
|
||||
else:
|
||||
print(f" [{label}] No commands found in help output.")
|
||||
|
||||
# Empty probe results for this menu (discover-only)
|
||||
all_results[label] = []
|
||||
|
||||
navigate_to_root(ser, profile)
|
||||
print()
|
||||
|
||||
print(
|
||||
f"Discovery complete: {total_commands} commands across {len(submenus)} submenus"
|
||||
)
|
||||
return all_results
|
||||
|
||||
|
||||
def _run_deep_probe(
|
||||
ser: serial.Serial,
|
||||
profile: DeviceProfile,
|
||||
candidates: list[str],
|
||||
submenus_to_probe: list[str],
|
||||
help_cmd: str,
|
||||
timeout: float,
|
||||
) -> dict[str, list[tuple[str, str]]]:
|
||||
"""Run submenu help discovery, then brute-force probe.
|
||||
|
||||
For each submenu: discover help first, then probe candidates.
|
||||
This populates both ``profile.submenu_help`` and the probe results,
|
||||
enabling the report to show ``undiscovered`` commands (probe-only).
|
||||
"""
|
||||
all_results: dict[str, list[tuple[str, str]]] = {}
|
||||
|
||||
for menu in submenus_to_probe:
|
||||
label = menu.upper()
|
||||
print(f"\n=== {label} submenu ===\n")
|
||||
sub_prompt = enter_submenu(ser, menu, profile)
|
||||
print(f" Prompt: {sub_prompt}")
|
||||
|
||||
# Step 1: Discover via help
|
||||
print(" Querying help...")
|
||||
entries = discover_submenu_help(ser, menu, profile, help_cmd=help_cmd)
|
||||
profile.submenu_help[label] = entries
|
||||
help_names = {e.name for e in entries}
|
||||
names_str = ", ".join(sorted(help_names))
|
||||
print(f" Help discovered {len(entries)} commands: {names_str}")
|
||||
|
||||
# Step 2: Brute-force probe
|
||||
print(f" Probing {len(candidates)} candidates...")
|
||||
sub_hits = probe_commands(
|
||||
ser, candidates, sub_prompt, label, profile, timeout=timeout
|
||||
)
|
||||
all_results[label] = sub_hits
|
||||
|
||||
# Classify
|
||||
undiscovered = [
|
||||
(cmd, resp) for cmd, resp in sub_hits if cmd.lower() not in help_names
|
||||
]
|
||||
|
||||
print(f"\n--- {label} Results ---")
|
||||
print(f" Help commands: {len(entries)}")
|
||||
print(f" Probe hits: {len(sub_hits)}")
|
||||
print(f" Undiscovered (probe-only): {len(undiscovered)}")
|
||||
for cmd, resp in undiscovered:
|
||||
print(f" '{cmd}' -> {resp}")
|
||||
|
||||
navigate_to_root(ser, profile)
|
||||
|
||||
return all_results
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
|
||||
profile = DeviceProfile(
|
||||
port=args.port,
|
||||
baud=args.baud,
|
||||
line_ending=LINE_ENDINGS[args.line_ending],
|
||||
exit_cmd=args.exit_cmd,
|
||||
)
|
||||
|
||||
if args.prompt:
|
||||
profile.root_prompt = args.prompt
|
||||
profile.prompts = [args.prompt]
|
||||
if args.error:
|
||||
profile.error_string = args.error
|
||||
|
||||
# Merge --bundled into --wordlist
|
||||
wordlist_raw = (args.wordlist or []) + (args.bundled or [])
|
||||
wordlist_paths = [Path(p) for p in wordlist_raw] if wordlist_raw else None
|
||||
|
||||
# Build candidates (skip if discover-only)
|
||||
blocklist = {w.strip() for w in args.blocklist.split(",") if w.strip()}
|
||||
if not args.discover_only:
|
||||
candidates = generate_candidates(blocklist, wordlist_paths)
|
||||
print(f"Generated {len(candidates)} candidate commands\n")
|
||||
else:
|
||||
candidates = []
|
||||
|
||||
# Open serial
|
||||
ser = serial.Serial(profile.port, profile.baud, timeout=1)
|
||||
ser.reset_input_buffer()
|
||||
|
||||
all_results: dict[str, list[tuple[str, str]]] = {}
|
||||
|
||||
try:
|
||||
# Auto-discovery
|
||||
if not profile.root_prompt or not profile.error_string:
|
||||
profile = auto_discover(ser, profile)
|
||||
else:
|
||||
print("Using overrides:")
|
||||
print(f" Root prompt: {profile.root_prompt}")
|
||||
print(f' Error string: "{profile.error_string}"')
|
||||
|
||||
print(f" Sending help command: {args.help_cmd}")
|
||||
help_resp = send_cmd(ser, args.help_cmd, profile, timeout=2.0)
|
||||
commands, submenus = parse_help_output(help_resp, profile)
|
||||
profile.known_commands = commands
|
||||
if not profile.submenus:
|
||||
profile.submenus = submenus
|
||||
for sub in profile.submenus:
|
||||
sub_prompt = f"{sub.upper()}>"
|
||||
if sub_prompt not in profile.prompts:
|
||||
profile.prompts.append(sub_prompt)
|
||||
|
||||
if commands:
|
||||
print(
|
||||
f" Known commands ({len(commands)}): {', '.join(sorted(commands))}"
|
||||
)
|
||||
if submenus:
|
||||
print(f" Submenus ({len(submenus)}): {', '.join(submenus)}")
|
||||
print()
|
||||
|
||||
if not profile.root_prompt:
|
||||
print(
|
||||
"ERROR: Could not determine root prompt. Use --prompt to specify.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
prompt = navigate_to_root(ser, profile)
|
||||
print(f"Starting at: {prompt}\n")
|
||||
|
||||
# --- Discover-only mode ---
|
||||
if args.discover_only:
|
||||
all_results = _run_discover_only(ser, profile, args.help_cmd)
|
||||
|
||||
if args.json:
|
||||
write_json_report(Path(args.json), profile, all_results)
|
||||
return
|
||||
|
||||
# --- Standard probing ---
|
||||
root_label = profile.root_prompt.rstrip(">$#").strip() or "ROOT"
|
||||
print(
|
||||
f"=== Probing {profile.root_prompt} (root) "
|
||||
f"-- {len(candidates)} candidates ===\n"
|
||||
)
|
||||
|
||||
root_hits = probe_commands(
|
||||
ser,
|
||||
candidates,
|
||||
profile.root_prompt,
|
||||
root_label,
|
||||
profile,
|
||||
timeout=args.timeout,
|
||||
)
|
||||
all_results[root_label] = root_hits
|
||||
|
||||
unknown_hits = [
|
||||
(cmd, resp)
|
||||
for cmd, resp in root_hits
|
||||
if cmd.lower() not in profile.known_commands
|
||||
]
|
||||
known_count = len(root_hits) - len(unknown_hits)
|
||||
|
||||
print("\n--- Root Results ---")
|
||||
print(f" Total hits: {len(root_hits)}")
|
||||
print(f" Known commands: {known_count}")
|
||||
print(f" UNKNOWN commands: {len(unknown_hits)}")
|
||||
for cmd, resp in unknown_hits:
|
||||
print(f" '{cmd}' -> {resp}")
|
||||
|
||||
# Submenu probing
|
||||
submenus_to_probe: list[str] = []
|
||||
if args.submenu:
|
||||
submenus_to_probe.append(args.submenu)
|
||||
elif args.deep:
|
||||
submenus_to_probe = list(profile.submenus)
|
||||
|
||||
if submenus_to_probe:
|
||||
sub_results = _run_deep_probe(
|
||||
ser,
|
||||
profile,
|
||||
candidates,
|
||||
submenus_to_probe,
|
||||
args.help_cmd,
|
||||
args.timeout,
|
||||
)
|
||||
all_results.update(sub_results)
|
||||
|
||||
if args.json:
|
||||
write_json_report(Path(args.json), profile, all_results)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\nInterrupted.")
|
||||
finally:
|
||||
ser.close()
|
||||
print("\nPort closed.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
847
src/console_probe/discovery.py
Normal file
847
src/console_probe/discovery.py
Normal file
@ -0,0 +1,847 @@
|
||||
"""Auto-discovery, help parsing, navigation, and probing for embedded consoles."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
import serial # pyright: ignore[reportMissingImports]
|
||||
|
||||
from console_probe.profile import DeviceProfile, HelpEntry
|
||||
from console_probe.serial_io import PROMPT_RE, detect_prompt, send_cmd
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Help output parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Common parameter placeholder names that should never be treated as commands.
|
||||
# These appear in help text like ``help [<command>]`` or ``set <value>``.
|
||||
_PARAM_PLACEHOLDERS: set[str] = {
|
||||
"command",
|
||||
"commands",
|
||||
"parameter",
|
||||
"parameters",
|
||||
"value",
|
||||
"values",
|
||||
"index",
|
||||
"name",
|
||||
"arg",
|
||||
"args",
|
||||
"argument",
|
||||
"arguments",
|
||||
"option",
|
||||
"options",
|
||||
"number",
|
||||
"string",
|
||||
"text",
|
||||
"file",
|
||||
"path",
|
||||
"addr",
|
||||
"address",
|
||||
"size",
|
||||
"count",
|
||||
"offset",
|
||||
"length",
|
||||
"data",
|
||||
"byte",
|
||||
"bytes",
|
||||
"word",
|
||||
"type",
|
||||
"mode",
|
||||
"level",
|
||||
"pin",
|
||||
"port",
|
||||
"id",
|
||||
"key",
|
||||
}
|
||||
|
||||
|
||||
def _inside_brackets(text: str, pos: int) -> bool:
|
||||
"""Return True if *pos* falls inside ``[...]`` in *text*.
|
||||
|
||||
Counts unmatched ``[`` characters before *pos*. If the count is odd,
|
||||
the position is inside brackets (parameter syntax).
|
||||
"""
|
||||
depth = 0
|
||||
for i in range(pos):
|
||||
ch = text[i]
|
||||
if ch == "[":
|
||||
depth += 1
|
||||
elif ch == "]":
|
||||
depth = max(0, depth - 1)
|
||||
return depth > 0
|
||||
|
||||
|
||||
# Shared regex patterns for help parsing
|
||||
_CMD_DASH_RE = re.compile(r"^\s*(\w[\w.]*)\s+[-\u2014]\s+")
|
||||
_CMD_SPACES_RE = re.compile(r"^\s{0,4}(\w[\w.]*)\s{2,}")
|
||||
_BARE_CMD_RE = re.compile(r"^\s{0,4}(\w[\w.]*)\s*$")
|
||||
_BRACKET_RE = re.compile(r"<(\w[\w.]*)>")
|
||||
_ENTER_RE = re.compile(r"[Ee]nter\s+<?(\w+)>?", re.IGNORECASE)
|
||||
_MENU_RE = re.compile(r"(\w+)\s+[Mm]enu")
|
||||
_SUBMENU_RE = re.compile(r"(\w+)\s+[Ss]ub-?[Mm]enu")
|
||||
|
||||
|
||||
def parse_help_output(
|
||||
help_text: str,
|
||||
profile: DeviceProfile,
|
||||
) -> tuple[set[str], list[str]]:
|
||||
"""Parse help output for command names and submenu hints.
|
||||
|
||||
Returns (known_commands, submenu_names).
|
||||
"""
|
||||
commands: set[str] = set()
|
||||
submenus: list[str] = []
|
||||
|
||||
for line in help_text.split("\n"):
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
continue
|
||||
if PROMPT_RE.fullmatch(stripped):
|
||||
continue
|
||||
|
||||
# Try angle-bracket commands first (e.g. "Enter <a3981> - ...")
|
||||
bracket_match = _BRACKET_RE.search(stripped)
|
||||
if bracket_match:
|
||||
cmd_name = bracket_match.group(1).lower()
|
||||
# Reject if inside [...] brackets or is a known placeholder
|
||||
if (
|
||||
len(cmd_name) <= 20
|
||||
and not _inside_brackets(stripped, bracket_match.start())
|
||||
and cmd_name not in _PARAM_PLACEHOLDERS
|
||||
):
|
||||
commands.add(cmd_name)
|
||||
else:
|
||||
# Fall back to generic patterns
|
||||
for pat in (_CMD_DASH_RE, _CMD_SPACES_RE, _BARE_CMD_RE):
|
||||
m = pat.match(stripped)
|
||||
if m:
|
||||
cmd_name = m.group(1).lower()
|
||||
if len(cmd_name) <= 20:
|
||||
commands.add(cmd_name)
|
||||
break
|
||||
|
||||
# Look for submenu hints
|
||||
enter_match = _ENTER_RE.search(stripped)
|
||||
if enter_match:
|
||||
sub = enter_match.group(1).lower()
|
||||
if sub not in submenus and len(sub) <= 20:
|
||||
submenus.append(sub)
|
||||
else:
|
||||
for pat in (_SUBMENU_RE, _MENU_RE):
|
||||
m = pat.search(stripped)
|
||||
if m:
|
||||
sub = m.group(1).lower()
|
||||
if sub not in submenus and len(sub) <= 20:
|
||||
submenus.append(sub)
|
||||
break
|
||||
|
||||
return commands, submenus
|
||||
|
||||
|
||||
def parse_help_structured(
|
||||
help_text: str,
|
||||
profile: DeviceProfile,
|
||||
) -> list[HelpEntry]:
|
||||
"""Parse help output into structured entries with descriptions and params.
|
||||
|
||||
Same regex logic as ``parse_help_output`` but captures description text
|
||||
(everything after `` - `` or double-space separator) and parameter syntax
|
||||
(text in ``[...]`` / ``<...>``).
|
||||
"""
|
||||
entries: list[HelpEntry] = []
|
||||
seen: set[str] = set()
|
||||
|
||||
# Extended patterns that capture descriptions
|
||||
# "command - description" or "command --- description"
|
||||
cmd_desc_re = re.compile(r"^\s*(\w[\w.]*)\s+[-\u2014]+\s+(.*)")
|
||||
# "command Description text" (2+ spaces after command)
|
||||
cmd_space_desc_re = re.compile(r"^\s{0,4}(\w[\w.]*)\s{2,}(\S.*)")
|
||||
|
||||
# Parameter syntax: text in [...] or <...> on the same line as the command
|
||||
param_re = re.compile(r"(\[.*?\]|<\w[\w.]*>)")
|
||||
|
||||
for line in help_text.split("\n"):
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
continue
|
||||
if PROMPT_RE.fullmatch(stripped):
|
||||
continue
|
||||
|
||||
cmd_name: str | None = None
|
||||
description = ""
|
||||
|
||||
# Try angle-bracket commands: "Enter <a3981> - Motor driver"
|
||||
bracket_match = _BRACKET_RE.search(stripped)
|
||||
if bracket_match:
|
||||
candidate = bracket_match.group(1).lower()
|
||||
if (
|
||||
len(candidate) <= 20
|
||||
and not _inside_brackets(stripped, bracket_match.start())
|
||||
and candidate not in _PARAM_PLACEHOLDERS
|
||||
):
|
||||
cmd_name = candidate
|
||||
# Description is everything after the bracket match + separator
|
||||
rest = stripped[bracket_match.end() :]
|
||||
desc_m = re.match(r"\s*[-\u2014]+\s*(.*)", rest)
|
||||
if desc_m:
|
||||
description = desc_m.group(1).strip()
|
||||
elif rest.strip():
|
||||
description = rest.strip()
|
||||
|
||||
if not cmd_name:
|
||||
# "command - description"
|
||||
m = cmd_desc_re.match(stripped)
|
||||
if m:
|
||||
cmd_name = m.group(1).lower()
|
||||
description = m.group(2).strip()
|
||||
else:
|
||||
# "command Description"
|
||||
m = cmd_space_desc_re.match(stripped)
|
||||
if m:
|
||||
cmd_name = m.group(1).lower()
|
||||
description = m.group(2).strip()
|
||||
else:
|
||||
# Bare command name
|
||||
m = _BARE_CMD_RE.match(stripped)
|
||||
if m:
|
||||
cmd_name = m.group(1).lower()
|
||||
|
||||
if cmd_name and len(cmd_name) <= 20 and cmd_name not in seen:
|
||||
# Extract parameter syntax from the full line
|
||||
params_parts = param_re.findall(stripped)
|
||||
# Filter out parts that are just the command name in brackets
|
||||
params = " ".join(
|
||||
p for p in params_parts if p.strip("[]<>").lower() != cmd_name
|
||||
)
|
||||
|
||||
seen.add(cmd_name)
|
||||
entries.append(
|
||||
HelpEntry(
|
||||
name=cmd_name,
|
||||
description=description,
|
||||
params=params,
|
||||
)
|
||||
)
|
||||
|
||||
return entries
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Submenu help discovery
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def discover_submenu_help(
|
||||
ser: serial.Serial,
|
||||
menu: str,
|
||||
profile: DeviceProfile,
|
||||
help_cmd: str = "?",
|
||||
extra_help_cmds: list[str] | None = None,
|
||||
) -> list[HelpEntry]:
|
||||
"""Query help in the current submenu and parse structured entries.
|
||||
|
||||
Assumes the caller has already navigated into the submenu.
|
||||
|
||||
Sends *help_cmd* (default ``?``), then tries *extra_help_cmds*
|
||||
(default ``["man"]``) for firmware with paginated help (e.g. DVB submenu).
|
||||
Merges results from all help commands, deduplicating by command name.
|
||||
"""
|
||||
if extra_help_cmds is None:
|
||||
extra_help_cmds = ["man"]
|
||||
|
||||
# Primary help
|
||||
help_resp = send_cmd(ser, help_cmd, profile, timeout=3.0)
|
||||
entries = parse_help_structured(help_resp, profile)
|
||||
|
||||
# Try extra help commands for multi-page help
|
||||
seen_names = {e.name for e in entries}
|
||||
for extra_cmd in extra_help_cmds:
|
||||
extra_resp = send_cmd(ser, extra_cmd, profile, timeout=3.0)
|
||||
extra_entries = parse_help_structured(extra_resp, profile)
|
||||
|
||||
# Only merge if the response looks like real help output
|
||||
# (at least 3 entries and has new commands)
|
||||
new_entries = [e for e in extra_entries if e.name not in seen_names]
|
||||
if len(extra_entries) >= 3 and new_entries:
|
||||
entries.extend(new_entries)
|
||||
seen_names.update(e.name for e in new_entries)
|
||||
|
||||
return entries
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Error string detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def detect_error_string(ser: serial.Serial, profile: DeviceProfile) -> str | None:
|
||||
"""Send a garbage command and extract the error message template."""
|
||||
resp = send_cmd(ser, "__xyzzy_probe__", profile, timeout=1.0)
|
||||
|
||||
# Strip the echo of our command and any prompt
|
||||
lines = resp.replace("__xyzzy_probe__", "").strip().split("\n")
|
||||
content_lines = []
|
||||
for line in lines:
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
continue
|
||||
if PROMPT_RE.fullmatch(stripped):
|
||||
continue
|
||||
stripped = PROMPT_RE.sub("", stripped).strip()
|
||||
if stripped:
|
||||
content_lines.append(stripped)
|
||||
|
||||
if content_lines:
|
||||
return content_lines[0].strip()
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Auto-discovery
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def auto_discover(ser: serial.Serial, profile: DeviceProfile) -> DeviceProfile:
|
||||
"""Run the discovery sequence and populate the profile."""
|
||||
print("Phase 1: Auto-discovering device console...\n")
|
||||
|
||||
prompt = detect_prompt(ser, profile)
|
||||
if prompt:
|
||||
profile.root_prompt = prompt
|
||||
profile.prompts = [prompt]
|
||||
print(f" Root prompt: {prompt}")
|
||||
else:
|
||||
print(" WARNING: Could not detect root prompt.")
|
||||
print(" Use --prompt to specify manually.")
|
||||
|
||||
err = detect_error_string(ser, profile)
|
||||
if err:
|
||||
profile.error_string = err
|
||||
print(f' Error string: "{err}"')
|
||||
else:
|
||||
print(" WARNING: Could not detect error string.")
|
||||
print(" Use --error to specify manually.")
|
||||
|
||||
print(" Sending help command: ?")
|
||||
help_resp = send_cmd(ser, "?", profile, timeout=2.0)
|
||||
|
||||
commands, submenus = parse_help_output(help_resp, profile)
|
||||
if commands:
|
||||
profile.known_commands = commands
|
||||
print(f" Known commands ({len(commands)}): {', '.join(sorted(commands))}")
|
||||
else:
|
||||
print(" No commands parsed from help output.")
|
||||
|
||||
if submenus:
|
||||
profile.submenus = submenus
|
||||
print(f" Detected submenus ({len(submenus)}): {', '.join(submenus)}")
|
||||
else:
|
||||
print(" No submenus detected from help output.")
|
||||
|
||||
# Build prompt list from submenus
|
||||
for sub in profile.submenus:
|
||||
sub_prompt = f"{sub.upper()}>"
|
||||
if sub_prompt not in profile.prompts:
|
||||
profile.prompts.append(sub_prompt)
|
||||
|
||||
print()
|
||||
return profile
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Navigation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def navigate_to_root(ser: serial.Serial, profile: DeviceProfile) -> str:
|
||||
"""Send exit command until we're at the root prompt."""
|
||||
# Check if already at root (bare CR won't kill anything)
|
||||
resp = send_cmd(ser, "", profile)
|
||||
if profile.root_prompt and profile.root_prompt in resp:
|
||||
return profile.root_prompt
|
||||
|
||||
for _ in range(5):
|
||||
resp = send_cmd(ser, profile.exit_cmd, profile)
|
||||
if profile.root_prompt and profile.root_prompt in resp:
|
||||
return profile.root_prompt
|
||||
last_line = resp.strip().split("\n")[-1].strip()
|
||||
m = PROMPT_RE.search(last_line)
|
||||
if m:
|
||||
detected = m.group(1)
|
||||
if not profile.root_prompt:
|
||||
profile.root_prompt = detected
|
||||
if detected == profile.root_prompt:
|
||||
return detected
|
||||
|
||||
# Last resort
|
||||
resp = send_cmd(ser, "", profile)
|
||||
last_line = resp.strip().split("\n")[-1].strip()
|
||||
return last_line
|
||||
|
||||
|
||||
def enter_submenu(ser: serial.Serial, menu: str, profile: DeviceProfile) -> str:
|
||||
"""Enter a submenu and return the prompt we land on."""
|
||||
navigate_to_root(ser, profile)
|
||||
resp = send_cmd(ser, menu, profile)
|
||||
lines = resp.strip().split("\n")
|
||||
last = lines[-1].strip() if lines else ""
|
||||
|
||||
m = PROMPT_RE.search(last)
|
||||
if m:
|
||||
new_prompt = m.group(1)
|
||||
if new_prompt not in profile.prompts:
|
||||
profile.prompts.append(new_prompt)
|
||||
return new_prompt
|
||||
return last
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Response cleaning
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def clean_response(
|
||||
resp: str,
|
||||
cmd: str,
|
||||
profile: DeviceProfile,
|
||||
) -> str:
|
||||
"""Strip echo, prompts, and whitespace from a response."""
|
||||
clean = resp
|
||||
|
||||
for suffix in ("\r\n", "\r", "\n", ""):
|
||||
clean = clean.replace(f"{cmd}{suffix}", "", 1)
|
||||
|
||||
clean = clean.strip()
|
||||
|
||||
for p in profile.prompts:
|
||||
clean = clean.replace(p, "")
|
||||
|
||||
clean = PROMPT_RE.sub("", clean)
|
||||
return clean.strip()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Probing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def probe_commands(
|
||||
ser: serial.Serial,
|
||||
candidates: list[str],
|
||||
prompt: str,
|
||||
label: str,
|
||||
profile: DeviceProfile,
|
||||
timeout: float = 0.5,
|
||||
) -> list[tuple[str, str]]:
|
||||
"""Probe candidates at the current menu level. Returns (cmd, preview) hits."""
|
||||
hits = []
|
||||
total = len(candidates)
|
||||
|
||||
for i, cmd in enumerate(candidates):
|
||||
if (i + 1) % 50 == 0:
|
||||
print(f" [{label}] Progress: {i + 1}/{total}...", flush=True)
|
||||
|
||||
resp = send_cmd(ser, cmd, profile, timeout=timeout)
|
||||
clean = clean_response(resp, cmd, profile)
|
||||
|
||||
is_error = profile.error_string and profile.error_string in resp
|
||||
if not is_error and clean:
|
||||
preview = clean[:100].replace("\r\n", " | ").replace("\n", " | ")
|
||||
hits.append((cmd, preview))
|
||||
print(f" *** HIT: '{cmd}' -> {preview}", flush=True)
|
||||
|
||||
# Recover from submenu exits
|
||||
if "Terminating shell" in resp or "exiting" in resp.lower():
|
||||
if profile.root_prompt and prompt != profile.root_prompt:
|
||||
print(f" ('{cmd}' exited submenu, re-entering...)")
|
||||
menu_name = prompt.replace(">", "").strip().lower()
|
||||
enter_submenu(ser, menu_name, profile)
|
||||
else:
|
||||
print(f" ('{cmd}' terminated shell, waiting for restart...)")
|
||||
time.sleep(1.0)
|
||||
check = send_cmd(ser, "", profile)
|
||||
if profile.root_prompt and profile.root_prompt not in check:
|
||||
print(
|
||||
" WARNING: Shell did not restart. Remaining "
|
||||
"results may be incomplete.",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
return hits
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Candidate generation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def generate_candidates(
|
||||
blocklist: set[str],
|
||||
wordlist_paths: list | None = None,
|
||||
) -> list[str]:
|
||||
"""Build the candidate command list.
|
||||
|
||||
Includes generic embedded debug commands + single chars + two-letter combos.
|
||||
Merges in any external wordlist files. Applies blocklist last.
|
||||
"""
|
||||
import string
|
||||
from pathlib import Path
|
||||
|
||||
candidates: list[str] = []
|
||||
|
||||
# Single characters
|
||||
candidates.extend(list(string.ascii_lowercase))
|
||||
candidates.extend(list(string.ascii_uppercase))
|
||||
candidates.extend(list(string.digits))
|
||||
|
||||
# Generic embedded debug commands
|
||||
generic = [
|
||||
# Memory access
|
||||
"md",
|
||||
"mw",
|
||||
"mm",
|
||||
"mr",
|
||||
"mem",
|
||||
"peek",
|
||||
"poke",
|
||||
"rd",
|
||||
"wr",
|
||||
"read",
|
||||
"write",
|
||||
"dump",
|
||||
"load",
|
||||
"save",
|
||||
"md.b",
|
||||
"md.w",
|
||||
"md.l",
|
||||
"x",
|
||||
"xx",
|
||||
"xd",
|
||||
# Flash
|
||||
"flash",
|
||||
"fl",
|
||||
"erase",
|
||||
"program",
|
||||
"verify",
|
||||
"protect",
|
||||
"flinfo",
|
||||
"fldump",
|
||||
"flashdump",
|
||||
# Boot / system
|
||||
"boot",
|
||||
"reboot",
|
||||
"reset",
|
||||
"go",
|
||||
"run",
|
||||
"exec",
|
||||
"jump",
|
||||
"bootd",
|
||||
"bootm",
|
||||
"bootp",
|
||||
"version",
|
||||
"ver",
|
||||
"info",
|
||||
"about",
|
||||
"sysinfo",
|
||||
"uptime",
|
||||
"date",
|
||||
"time",
|
||||
"clk",
|
||||
"clock",
|
||||
# Debug
|
||||
"debug",
|
||||
"dbg",
|
||||
"trace",
|
||||
"log",
|
||||
"print",
|
||||
"echo",
|
||||
"test",
|
||||
"diag",
|
||||
"selftest",
|
||||
"bist",
|
||||
"bench",
|
||||
"assert",
|
||||
"crash",
|
||||
"fault",
|
||||
"panic",
|
||||
# Shell / OS
|
||||
"sh",
|
||||
"shell",
|
||||
"cmd",
|
||||
"command",
|
||||
"cli",
|
||||
"task",
|
||||
"tasks",
|
||||
"ps",
|
||||
"top",
|
||||
"threads",
|
||||
"kill",
|
||||
"suspend",
|
||||
"resume",
|
||||
"heap",
|
||||
"stack",
|
||||
"free",
|
||||
"malloc",
|
||||
"meminfo",
|
||||
"cpu",
|
||||
"cpuinfo",
|
||||
"temp",
|
||||
"temperature",
|
||||
# Network / comms
|
||||
"ping",
|
||||
"net",
|
||||
"ifconfig",
|
||||
"ip",
|
||||
"mac",
|
||||
"uart",
|
||||
"serial",
|
||||
"spi",
|
||||
"i2c",
|
||||
"can",
|
||||
# Service / factory
|
||||
"factory",
|
||||
"service",
|
||||
"mfg",
|
||||
"production",
|
||||
"prod",
|
||||
"cal",
|
||||
"calibrate",
|
||||
"calibration",
|
||||
"config",
|
||||
"cfg",
|
||||
"setup",
|
||||
"settings",
|
||||
"hidden",
|
||||
"secret",
|
||||
"admin",
|
||||
"su",
|
||||
"root",
|
||||
"login",
|
||||
"password",
|
||||
"passwd",
|
||||
"auth",
|
||||
"unlock",
|
||||
# Update
|
||||
"update",
|
||||
"upgrade",
|
||||
"firmware",
|
||||
"fw",
|
||||
"ota",
|
||||
"download",
|
||||
"upload",
|
||||
"xmodem",
|
||||
"ymodem",
|
||||
"zmodem",
|
||||
"tftp",
|
||||
"ftp",
|
||||
# Generic hardware
|
||||
"sw",
|
||||
"hw",
|
||||
"id",
|
||||
"sn",
|
||||
"help",
|
||||
"man",
|
||||
"usage",
|
||||
# Two-letter combos
|
||||
"bl",
|
||||
"bt",
|
||||
"db",
|
||||
"dm",
|
||||
"dp",
|
||||
"ds",
|
||||
"dt",
|
||||
"eb",
|
||||
"ed",
|
||||
"ee",
|
||||
"ef",
|
||||
"em",
|
||||
"en",
|
||||
"ep",
|
||||
"er",
|
||||
"es",
|
||||
"et",
|
||||
"fa",
|
||||
"fb",
|
||||
"fc",
|
||||
"fd",
|
||||
"fe",
|
||||
"ff",
|
||||
"fg",
|
||||
"fh",
|
||||
"fi",
|
||||
"fj",
|
||||
"ga",
|
||||
"gb",
|
||||
"gc",
|
||||
"gd",
|
||||
"ge",
|
||||
"gf",
|
||||
"gg",
|
||||
"gh",
|
||||
"gi",
|
||||
"gj",
|
||||
"ha",
|
||||
"hb",
|
||||
"hc",
|
||||
"hd",
|
||||
"he",
|
||||
"hf",
|
||||
"hg",
|
||||
"hh",
|
||||
"hi",
|
||||
"hj",
|
||||
"ia",
|
||||
"ib",
|
||||
"ic",
|
||||
"io",
|
||||
"ir",
|
||||
"ka",
|
||||
"kb",
|
||||
"kc",
|
||||
"kd",
|
||||
"ke",
|
||||
"la",
|
||||
"lb",
|
||||
"lc",
|
||||
"ld",
|
||||
"le",
|
||||
"lf",
|
||||
"lg",
|
||||
"lh",
|
||||
"li",
|
||||
"lj",
|
||||
"ma",
|
||||
"mb",
|
||||
"mc",
|
||||
"me",
|
||||
"mf",
|
||||
"mg",
|
||||
"mh",
|
||||
"mi",
|
||||
"mj",
|
||||
"na",
|
||||
"nb",
|
||||
"nc",
|
||||
"nd",
|
||||
"ne",
|
||||
"nf",
|
||||
"ng",
|
||||
"nh",
|
||||
"ni",
|
||||
"nj",
|
||||
"oa",
|
||||
"ob",
|
||||
"oc",
|
||||
"od",
|
||||
"oe",
|
||||
"of",
|
||||
"og",
|
||||
"oh",
|
||||
"oi",
|
||||
"oj",
|
||||
"pa",
|
||||
"pb",
|
||||
"pc",
|
||||
"pd",
|
||||
"pe",
|
||||
"pf",
|
||||
"pg",
|
||||
"ph",
|
||||
"pi",
|
||||
"pj",
|
||||
"ra",
|
||||
"rb",
|
||||
"rc",
|
||||
"re",
|
||||
"rf",
|
||||
"rg",
|
||||
"rh",
|
||||
"ri",
|
||||
"rj",
|
||||
"sa",
|
||||
"sb",
|
||||
"sc",
|
||||
"sd",
|
||||
"se",
|
||||
"sf",
|
||||
"sg",
|
||||
"si",
|
||||
"sj",
|
||||
"ta",
|
||||
"tb",
|
||||
"tc",
|
||||
"td",
|
||||
"te",
|
||||
"tf",
|
||||
"tg",
|
||||
"th",
|
||||
"ti",
|
||||
"tj",
|
||||
"ua",
|
||||
"ub",
|
||||
"uc",
|
||||
"ud",
|
||||
"ue",
|
||||
"uf",
|
||||
"ug",
|
||||
"uh",
|
||||
"ui",
|
||||
"uj",
|
||||
"va",
|
||||
"vb",
|
||||
"vc",
|
||||
"vd",
|
||||
"ve",
|
||||
"vf",
|
||||
"vg",
|
||||
"vh",
|
||||
"vi",
|
||||
"vj",
|
||||
"wa",
|
||||
"wb",
|
||||
"wc",
|
||||
"wd",
|
||||
"we",
|
||||
"wf",
|
||||
"wg",
|
||||
"wh",
|
||||
"wi",
|
||||
"wj",
|
||||
"za",
|
||||
"zb",
|
||||
"zc",
|
||||
"zd",
|
||||
"ze",
|
||||
"zf",
|
||||
]
|
||||
candidates.extend(generic)
|
||||
|
||||
# Merge external wordlists
|
||||
if wordlist_paths:
|
||||
for wl_path in wordlist_paths:
|
||||
p = Path(wl_path) if not isinstance(wl_path, Path) else wl_path
|
||||
try:
|
||||
text = p.read_text()
|
||||
for line in text.split("\n"):
|
||||
word = line.strip()
|
||||
if word and not word.startswith("#"):
|
||||
candidates.append(word)
|
||||
except OSError as exc:
|
||||
print(
|
||||
f"WARNING: Could not read wordlist {p}: {exc}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
# Deduplicate preserving order
|
||||
seen: set[str] = set()
|
||||
unique: list[str] = []
|
||||
for c in candidates:
|
||||
if c not in seen:
|
||||
seen.add(c)
|
||||
unique.append(c)
|
||||
|
||||
# Apply blocklist
|
||||
unique = [c for c in unique if c not in blocklist]
|
||||
return unique
|
||||
34
src/console_probe/profile.py
Normal file
34
src/console_probe/profile.py
Normal file
@ -0,0 +1,34 @@
|
||||
"""Device profile and data structures for console probing."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
|
||||
@dataclass
|
||||
class HelpEntry:
|
||||
"""A single command parsed from firmware help output.
|
||||
|
||||
Captures the command name, any parameter syntax shown in brackets/angles,
|
||||
and the description text (everything after the separator).
|
||||
"""
|
||||
|
||||
name: str # command name (lowercase)
|
||||
description: str = "" # help description text
|
||||
params: str = "" # parameter syntax from brackets, e.g. "[<motor> [angle]]"
|
||||
|
||||
|
||||
@dataclass
|
||||
class DeviceProfile:
|
||||
"""Everything we know (or detected) about the attached console."""
|
||||
|
||||
port: str = "/dev/ttyUSB0"
|
||||
baud: int = 115200
|
||||
root_prompt: str = "" # e.g. "TRK>"
|
||||
prompts: list[str] = field(default_factory=list) # all known prompts
|
||||
error_string: str = "" # e.g. "Invalid command."
|
||||
known_commands: set[str] = field(default_factory=set) # from help output
|
||||
submenus: list[str] = field(default_factory=list) # detected submenu names
|
||||
exit_cmd: str = "q"
|
||||
line_ending: str = "\r"
|
||||
submenu_help: dict[str, list[HelpEntry]] = field(default_factory=dict)
|
||||
105
src/console_probe/report.py
Normal file
105
src/console_probe/report.py
Normal file
@ -0,0 +1,105 @@
|
||||
"""JSON report generation for console probe results."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from console_probe.profile import DeviceProfile
|
||||
|
||||
FORMAT_VERSION = 2
|
||||
|
||||
|
||||
def build_report(
|
||||
profile: DeviceProfile,
|
||||
results: dict[str, list[tuple[str, str]]],
|
||||
) -> dict:
|
||||
"""Build the full JSON report structure.
|
||||
|
||||
``results`` maps menu labels (e.g. "TRK", "MOT") to lists of
|
||||
``(cmd, preview)`` probe hits.
|
||||
|
||||
When ``profile.submenu_help`` is populated, the report includes a
|
||||
``menus`` section with per-submenu ``help_commands``, ``probe_hits``,
|
||||
``undiscovered`` (probe-only commands not in help), and stats.
|
||||
"""
|
||||
report: dict = {
|
||||
"format_version": FORMAT_VERSION,
|
||||
"device": {"port": profile.port, "baud": profile.baud},
|
||||
"detected": {
|
||||
"root_prompt": profile.root_prompt,
|
||||
"error_string": profile.error_string,
|
||||
"known_commands": sorted(profile.known_commands),
|
||||
"submenus": profile.submenus,
|
||||
},
|
||||
}
|
||||
|
||||
# Per-submenu structured output
|
||||
if profile.submenu_help:
|
||||
menus: dict = {}
|
||||
for label, help_entries in profile.submenu_help.items():
|
||||
help_names = {e.name for e in help_entries}
|
||||
|
||||
# Probe hits for this menu (may be empty if --discover-only)
|
||||
probe_hits = results.get(label, [])
|
||||
|
||||
# Undiscovered = probe hits whose names are NOT in help output
|
||||
undiscovered = [
|
||||
{"cmd": cmd, "response": resp}
|
||||
for cmd, resp in probe_hits
|
||||
if cmd.lower() not in help_names
|
||||
]
|
||||
|
||||
menus[label] = {
|
||||
"prompt": f"{label}>",
|
||||
"help_commands": [
|
||||
{
|
||||
"cmd": e.name,
|
||||
"description": e.description,
|
||||
"params": e.params,
|
||||
}
|
||||
for e in help_entries
|
||||
],
|
||||
"probe_hits": [
|
||||
{"cmd": cmd, "response": resp} for cmd, resp in probe_hits
|
||||
],
|
||||
"undiscovered": undiscovered,
|
||||
"stats": {
|
||||
"help_count": len(help_entries),
|
||||
"probe_count": len(probe_hits),
|
||||
"undiscovered_count": len(undiscovered),
|
||||
},
|
||||
}
|
||||
|
||||
report["menus"] = menus
|
||||
|
||||
# Legacy results section (backward compat)
|
||||
report["results"] = {}
|
||||
for label, hits in results.items():
|
||||
known_hits = [
|
||||
(cmd, resp) for cmd, resp in hits if cmd.lower() in profile.known_commands
|
||||
]
|
||||
unknown_hits = [
|
||||
(cmd, resp)
|
||||
for cmd, resp in hits
|
||||
if cmd.lower() not in profile.known_commands
|
||||
]
|
||||
report["results"][label] = {
|
||||
"total_hits": len(hits),
|
||||
"known": len(known_hits),
|
||||
"unknown": len(unknown_hits),
|
||||
"hits": [{"cmd": cmd, "response": resp} for cmd, resp in hits],
|
||||
}
|
||||
|
||||
return report
|
||||
|
||||
|
||||
def write_json_report(
|
||||
path: Path,
|
||||
profile: DeviceProfile,
|
||||
results: dict[str, list[tuple[str, str]]],
|
||||
) -> None:
|
||||
"""Write machine-readable JSON probe report."""
|
||||
report = build_report(profile, results)
|
||||
path.write_text(json.dumps(report, indent=2) + "\n")
|
||||
print(f"\nJSON report written to {path}")
|
||||
105
src/console_probe/serial_io.py
Normal file
105
src/console_probe/serial_io.py
Normal file
@ -0,0 +1,105 @@
|
||||
"""Serial I/O for prompt-terminated embedded consoles."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import time
|
||||
|
||||
import serial # pyright: ignore[reportMissingImports]
|
||||
|
||||
from console_probe.profile import DeviceProfile
|
||||
|
||||
PROMPT_RE = re.compile(r"(\S+[>$#])\s*$")
|
||||
|
||||
|
||||
def _is_prompt_terminated(text: str, profile: DeviceProfile) -> bool:
|
||||
"""Check if *text* ends with a firmware prompt (not parameter syntax).
|
||||
|
||||
When ``profile.prompts`` is populated, checks whether the last line ends
|
||||
with one of the known prompt strings. Also accepts any PROMPT_RE match
|
||||
on the last line **if** that line contains no ``[`` bracket (which would
|
||||
indicate parameter syntax like ``[<parameters>]``).
|
||||
|
||||
When ``profile.prompts`` is empty (initial discovery phase), falls back
|
||||
to the original heuristic: ``stripped.endswith(">")``.
|
||||
"""
|
||||
stripped = text.rstrip()
|
||||
if not stripped:
|
||||
return False
|
||||
|
||||
last_line = stripped.split("\n")[-1]
|
||||
|
||||
if profile.prompts:
|
||||
# Check known prompts first (fast path)
|
||||
last_stripped = last_line.rstrip()
|
||||
for p in profile.prompts:
|
||||
if last_stripped.endswith(p):
|
||||
return True
|
||||
|
||||
# Accept a PROMPT_RE match only if no brackets on that line
|
||||
if "[" not in last_line:
|
||||
m = PROMPT_RE.search(last_line)
|
||||
if m:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
# No known prompts yet — fallback to bare > check
|
||||
return stripped.endswith(">")
|
||||
|
||||
|
||||
def send_cmd(
|
||||
ser: serial.Serial,
|
||||
cmd: str,
|
||||
profile: DeviceProfile,
|
||||
timeout: float = 1.0,
|
||||
) -> str:
|
||||
"""Send *cmd* + line-ending, read until a known prompt or timeout."""
|
||||
ser.reset_input_buffer()
|
||||
ser.write(f"{cmd}{profile.line_ending}".encode("ascii", errors="replace"))
|
||||
ser.timeout = timeout
|
||||
|
||||
buf = bytearray()
|
||||
deadline = time.monotonic() + timeout
|
||||
while time.monotonic() < deadline:
|
||||
chunk = ser.read(4096)
|
||||
if chunk:
|
||||
buf.extend(chunk)
|
||||
text = buf.decode("utf-8", errors="replace")
|
||||
if _is_prompt_terminated(text, profile):
|
||||
break
|
||||
elif buf:
|
||||
text = buf.decode("utf-8", errors="replace")
|
||||
if _is_prompt_terminated(text, profile):
|
||||
break
|
||||
|
||||
return buf.decode("utf-8", errors="replace")
|
||||
|
||||
|
||||
def detect_prompt(ser: serial.Serial, profile: DeviceProfile) -> str | None:
|
||||
"""Send a bare line-ending and extract the prompt from the response."""
|
||||
ser.reset_input_buffer()
|
||||
ser.write(profile.line_ending.encode("ascii"))
|
||||
ser.timeout = 2.0
|
||||
|
||||
buf = bytearray()
|
||||
deadline = time.monotonic() + 2.0
|
||||
while time.monotonic() < deadline:
|
||||
chunk = ser.read(4096)
|
||||
if chunk:
|
||||
buf.extend(chunk)
|
||||
text = buf.decode("utf-8", errors="replace")
|
||||
tail = text.rstrip()
|
||||
if tail.endswith((">", "#", "$")):
|
||||
break
|
||||
elif buf:
|
||||
break
|
||||
|
||||
text = buf.decode("utf-8", errors="replace").strip()
|
||||
if not text:
|
||||
return None
|
||||
|
||||
# Take the last line, look for a prompt-like token
|
||||
last_line = text.split("\n")[-1].strip()
|
||||
m = PROMPT_RE.search(last_line)
|
||||
return m.group(1) if m else (last_line if last_line else None)
|
||||
Loading…
x
Reference in New Issue
Block a user