openocd-python/src/openocd/svd/peripheral.py
Ryan Malloy bc7cb77ec4 Fix reliability issues from code review, add error-path tests
Critical fixes:
- Separate notification and command connections to eliminate dual-reader
  race condition on the TCL RPC stream (C-1)
- Fix _get_or_create_loop() swallowing its own RuntimeError, causing
  deadlock when sync API called from async context (C-2)
- Add bounds checking to config string parser (C-3)
- Clean up OpenOCD subprocess on connection failure in Session.start (H-1)

Defense in depth:
- Add MAX_RESPONSE_SIZE (10MB) guard against unbounded buffer growth
- Preserve bytes after separator in _read_until_separator remainder buffer
- Set notification_failed flag when listener crashes, warn on next send
- Standardize error detection to case-insensitive across all modules
- Escape TCL special characters in RTT channelwrite to prevent injection
- Redirect OpenOCD stdout to DEVNULL to prevent pipe buffer deadlock
- Run SVD XML parsing in asyncio.to_thread to avoid blocking event loop

Consistency:
- Cache SyncSession subsystem wrappers (match async Session pattern)
- Make DecodedRegister frozen (match all other dataclasses)
- Add py.typed marker for PEP 561 type checker support
- Accept list[str] config in OpenOCDProcess.start for paths with spaces

Tests:
- Add 50 error-path tests covering connection, target, memory, register,
  flash, breakpoint, session, process, and notification failure modes
2026-02-12 18:52:38 -07:00

187 lines
6.1 KiB
Python

"""SVDManager — combines SVD parsing, register decoding, and hardware reads.
This is the primary interface for SVD-based register inspection. It ties
the SVD parser, bitfield decoder, and the Memory subsystem together so
callers can do things like:
decoded = await svd.read_register("GPIOA", "ODR")
print(decoded)
"""
from __future__ import annotations
import asyncio
import logging
from pathlib import Path
from typing import TYPE_CHECKING
from openocd.svd.decoder import decode_register
from openocd.svd.parser import SVDParserWrapper
from openocd.types import DecodedRegister
if TYPE_CHECKING:
from openocd.connection.base import Connection
from openocd.memory import Memory
log = logging.getLogger(__name__)
class SVDManager:
"""High-level SVD register access: parse, read, decode."""
def __init__(self, conn: Connection, memory: Memory) -> None:
self._conn = conn
self._memory = memory
self._parser = SVDParserWrapper()
@property
def loaded(self) -> bool:
"""Whether an SVD file has been loaded."""
return self._parser.loaded
async def load(self, svd_path: Path) -> None:
"""Parse an SVD file and make its peripherals available.
This is a synchronous file parse wrapped in the async interface
for consistency with the rest of the API.
Args:
svd_path: Path to the .svd XML file.
Raises:
SVDError: If the file is missing or unparseable.
"""
await asyncio.to_thread(self._parser.load, svd_path)
def list_peripherals(self) -> list[str]:
"""Return sorted peripheral names from the loaded SVD.
Raises:
SVDError: If no SVD is loaded.
"""
return self._parser.list_peripherals()
def list_registers(self, peripheral: str) -> list[str]:
"""Return sorted register names for a peripheral.
Args:
peripheral: Peripheral name (e.g. "GPIOA").
Raises:
SVDError: If no SVD is loaded or peripheral not found.
"""
return self._parser.list_registers(peripheral)
async def read_register(self, peripheral: str, register: str) -> DecodedRegister:
"""Read a register from hardware and decode it using SVD metadata.
This is the primary method: it computes the register's memory-mapped
address from the SVD, reads 32 bits from the target, and returns
a fully decoded result with named bitfields.
Args:
peripheral: Peripheral name (e.g. "GPIOA").
register: Register name (e.g. "ODR").
Returns:
DecodedRegister with address, raw value, and decoded fields.
Raises:
SVDError: If peripheral/register not found.
TargetError: If the memory read fails.
"""
periph_obj = self._parser.get_peripheral(peripheral)
reg_obj = self._parser.get_register(peripheral, register)
address = periph_obj.base_address + reg_obj.address_offset
values = await self._memory.read_u32(address)
raw = values[0]
return decode_register(periph_obj, reg_obj, raw)
async def read_peripheral(self, peripheral: str) -> dict[str, DecodedRegister]:
"""Read and decode every register in a peripheral.
Args:
peripheral: Peripheral name.
Returns:
Dict mapping register name to its DecodedRegister.
Raises:
SVDError: If peripheral not found.
TargetError: If any memory read fails.
"""
periph_obj = self._parser.get_peripheral(peripheral)
registers = periph_obj.registers or []
result: dict[str, DecodedRegister] = {}
for reg_obj in registers:
address = periph_obj.base_address + reg_obj.address_offset
try:
values = await self._memory.read_u32(address)
raw = values[0]
result[reg_obj.name] = decode_register(periph_obj, reg_obj, raw)
except Exception as exc:
log.warning(
"Failed to read %s.%s @ 0x%08X: %s",
peripheral,
reg_obj.name,
address,
exc,
)
# Skip unreadable registers (write-only, reserved, etc.)
return result
def decode(self, peripheral: str, register: str, value: int) -> DecodedRegister:
"""Decode a raw value without reading hardware.
Useful when you already have the register value (from a log,
a previous read, or a known reset value).
Args:
peripheral: Peripheral name.
register: Register name.
value: Raw 32-bit register value.
Returns:
DecodedRegister with the decoded bitfields.
"""
periph_obj = self._parser.get_peripheral(peripheral)
reg_obj = self._parser.get_register(peripheral, register)
return decode_register(periph_obj, reg_obj, value)
class SyncSVDManager:
"""Synchronous wrapper around SVDManager."""
def __init__(self, manager: SVDManager, loop: asyncio.AbstractEventLoop) -> None:
self._manager = manager
self._loop = loop
@property
def loaded(self) -> bool:
return self._manager.loaded
def load(self, svd_path: Path) -> None:
self._loop.run_until_complete(self._manager.load(svd_path))
def list_peripherals(self) -> list[str]:
return self._manager.list_peripherals()
def list_registers(self, peripheral: str) -> list[str]:
return self._manager.list_registers(peripheral)
def read_register(self, peripheral: str, register: str) -> DecodedRegister:
return self._loop.run_until_complete(
self._manager.read_register(peripheral, register)
)
def read_peripheral(self, peripheral: str) -> dict[str, DecodedRegister]:
return self._loop.run_until_complete(
self._manager.read_peripheral(peripheral)
)
def decode(self, peripheral: str, register: str, value: int) -> DecodedRegister:
return self._manager.decode(peripheral, register, value)