diff --git a/src/content/docs/examples/debug-session.mdx b/src/content/docs/examples/debug-session.mdx index c5b9179..b8dc5af 100644 --- a/src/content/docs/examples/debug-session.mdx +++ b/src/content/docs/examples/debug-session.mdx @@ -1,6 +1,204 @@ --- title: Debug Session -description: Complete example of a debug session with target control +description: Complete walkthrough of connecting to a target, reading state, inspecting registers and memory, single-stepping, and resuming. --- -Content coming soon. +import { Tabs, TabItem, Aside } from '@astrojs/starlight/components'; + +This example demonstrates a typical debug workflow: connect to OpenOCD, halt the target, inspect registers and memory, single-step through code, and resume execution. + +## Async version + +```python +import asyncio +import logging +from openocd import Session, TargetError, TargetNotHaltedError, ConnectionError + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger("debug-session") + + +async def main(): + # --- Connect --- + try: + session = await Session.connect(host="localhost", port=6666, timeout=5.0) + except ConnectionError as e: + log.error("Cannot connect to OpenOCD: %s", e) + log.error("Make sure OpenOCD is running with TCL RPC enabled on port 6666") + return + + async with session: + # --- Check initial state --- + state = await session.target.state() + log.info("Target: %s, State: %s", state.name, state.state) + + # --- Halt --- + if state.state != "halted": + log.info("Halting target...") + try: + state = await session.target.halt() + except TargetError as e: + log.error("Failed to halt: %s", e) + return + log.info("Halted at PC=0x%08X", state.current_pc or 0) + + # --- Read registers --- + pc = await session.registers.pc() + sp = await session.registers.sp() + lr = await session.registers.lr() + log.info("PC=0x%08X SP=0x%08X LR=0x%08X", pc, sp, lr) + + # Read all general-purpose registers + all_regs = await session.registers.read_all() + for name, reg in sorted(all_regs.items(), key=lambda x: x[1].number): + if reg.number <= 15: # r0-r15 on ARM + log.info(" %-6s = 0x%08X%s", + reg.name, reg.value, + " (dirty)" if reg.dirty else "") + + # --- Read memory around PC --- + log.info("Memory at PC (8 words):") + words = await session.memory.read_u32(pc, count=8) + for i, word in enumerate(words): + log.info(" 0x%08X: 0x%08X", pc + i * 4, word) + + # --- Hexdump of stack --- + log.info("Stack (64 bytes from SP):") + hexdump = await session.memory.hexdump(sp, 64) + for line in hexdump.splitlines(): + log.info(" %s", line) + + # --- Single-step --- + log.info("Single-stepping 3 instructions...") + for i in range(3): + state = await session.target.step() + new_pc = state.current_pc or 0 + log.info(" Step %d: PC=0x%08X", i + 1, new_pc) + + # --- Resume --- + log.info("Resuming execution...") + await session.target.resume() + + # Verify the target is running + state = await session.target.state() + log.info("Target state after resume: %s", state.state) + + log.info("Session closed") + + +asyncio.run(main()) +``` + +## Sync version + +```python +import logging +from openocd import Session, TargetError, ConnectionError + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger("debug-session") + + +def main(): + # --- Connect --- + try: + session = Session.connect_sync(host="localhost", port=6666, timeout=5.0) + except ConnectionError as e: + log.error("Cannot connect to OpenOCD: %s", e) + return + + with session: + # --- Check initial state --- + state = session.target.state() + log.info("Target: %s, State: %s", state.name, state.state) + + # --- Halt --- + if state.state != "halted": + log.info("Halting target...") + try: + state = session.target.halt() + except TargetError as e: + log.error("Failed to halt: %s", e) + return + log.info("Halted at PC=0x%08X", state.current_pc or 0) + + # --- Read registers --- + pc = session.registers.pc() + sp = session.registers.sp() + lr = session.registers.lr() + log.info("PC=0x%08X SP=0x%08X LR=0x%08X", pc, sp, lr) + + # Read all general-purpose registers + all_regs = session.registers.read_all() + for name, reg in sorted(all_regs.items(), key=lambda x: x[1].number): + if reg.number <= 15: + log.info(" %-6s = 0x%08X%s", + reg.name, reg.value, + " (dirty)" if reg.dirty else "") + + # --- Read memory around PC --- + log.info("Memory at PC (8 words):") + words = session.memory.read_u32(pc, count=8) + for i, word in enumerate(words): + log.info(" 0x%08X: 0x%08X", pc + i * 4, word) + + # --- Hexdump of stack --- + log.info("Stack (64 bytes from SP):") + hexdump = session.memory.hexdump(sp, 64) + for line in hexdump.splitlines(): + log.info(" %s", line) + + # --- Single-step --- + log.info("Single-stepping 3 instructions...") + for i in range(3): + state = session.target.step() + new_pc = state.current_pc or 0 + log.info(" Step %d: PC=0x%08X", i + 1, new_pc) + + # --- Resume --- + log.info("Resuming execution...") + session.target.resume() + + state = session.target.state() + log.info("Target state after resume: %s", state.state) + + log.info("Session closed") + + +main() +``` + +## Expected output + +``` +INFO:debug-session:Target: stm32f1x.cpu, State: running +INFO:debug-session:Halting target... +INFO:debug-session:Halted at PC=0x08001234 +INFO:debug-session:PC=0x08001234 SP=0x20004FF0 LR=0x08000A51 +INFO:debug-session: r0 = 0x00000001 +INFO:debug-session: r1 = 0x20000100 +INFO:debug-session: r2 = 0x00000000 +INFO:debug-session: ... +INFO:debug-session:Memory at PC (8 words): +INFO:debug-session: 0x08001234: 0x4B02B510 +INFO:debug-session: 0x08001238: 0x47984601 +INFO:debug-session: ... +INFO:debug-session:Stack (64 bytes from SP): +INFO:debug-session: 20004FF0: 08 00 0A 51 00 00 00 00 01 00 00 00 00 01 00 20 |...Q........... | +INFO:debug-session: ... +INFO:debug-session:Single-stepping 3 instructions... +INFO:debug-session: Step 1: PC=0x08001236 +INFO:debug-session: Step 2: PC=0x08001238 +INFO:debug-session: Step 3: PC=0x0800123A +INFO:debug-session:Resuming execution... +INFO:debug-session:Target state after resume: running +INFO:debug-session:Session closed +``` + +## Key points + +- Always check the target state before operating. Some operations (register reads, memory inspection) require a halted target. +- The `TargetState` dataclass includes `current_pc` only when the target is halted. Check for `None` before using it. +- Use `session.registers.read_all()` for a snapshot of all registers, or individual accessors like `session.registers.pc()` for specific values. +- `session.memory.hexdump()` returns a formatted string -- it does not print directly. This lets you log or display it however you prefer. +- The session context manager (`async with` / `with`) ensures the connection is closed cleanly on exit, even if an exception occurs. diff --git a/src/content/docs/examples/flash-programming.mdx b/src/content/docs/examples/flash-programming.mdx index 2510d0d..49634cf 100644 --- a/src/content/docs/examples/flash-programming.mdx +++ b/src/content/docs/examples/flash-programming.mdx @@ -1,6 +1,267 @@ --- title: Flash Programming -description: End-to-end flash erase, write, and verify example +description: End-to-end firmware update workflow with erase, program, verify, and reset. --- -Content coming soon. +import { Tabs, TabItem, Aside } from '@astrojs/starlight/components'; + +This example demonstrates a complete firmware update workflow: inspect the flash layout, erase, program a firmware image, verify, and reset the target. + +## Async version + +```python +import asyncio +import logging +import sys +from pathlib import Path + +from openocd import Session, FlashError, ConnectionError + +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") +log = logging.getLogger("flash-program") + + +async def flash_firmware(firmware_path: Path): + """Program a firmware image and verify it.""" + + if not firmware_path.exists(): + log.error("Firmware file not found: %s", firmware_path) + return False + + file_size = firmware_path.stat().st_size + log.info("Firmware: %s (%d bytes)", firmware_path.name, file_size) + + try: + session = await Session.connect(timeout=5.0) + except ConnectionError as e: + log.error("Cannot connect to OpenOCD: %s", e) + return False + + async with session: + # --- Halt the target first --- + log.info("Halting target...") + await session.target.halt() + + # --- Inspect flash layout --- + banks = await session.flash.banks() + log.info("Found %d flash bank(s):", len(banks)) + for bank in banks: + log.info(" Bank #%d: %s @ 0x%08X, size=0x%X (%d KB)", + bank.index, bank.name, bank.base, bank.size, + bank.size // 1024) + + # Get detailed sector info for bank 0 + bank_info = await session.flash.info(0) + log.info("Bank 0 has %d sectors", len(bank_info.sectors)) + if bank_info.sectors: + first = bank_info.sectors[0] + last = bank_info.sectors[-1] + log.info(" First sector: offset=0x%X, size=0x%X", first.offset, first.size) + log.info(" Last sector: offset=0x%X, size=0x%X", last.offset, last.size) + + # Check if firmware fits + if file_size > bank_info.size: + log.error("Firmware (%d bytes) is larger than flash bank (%d bytes)", + file_size, bank_info.size) + return False + + # --- Erase --- + log.info("Erasing flash bank 0...") + try: + await session.flash.erase_all(bank=0) + log.info("Erase complete") + except FlashError as e: + log.error("Erase failed: %s", e) + return False + + # --- Program --- + log.info("Programming %s...", firmware_path.name) + try: + await session.flash.write_image( + firmware_path, + erase=False, # Already erased above + verify=True, # Built-in post-write verification + ) + log.info("Programming and verification complete") + except FlashError as e: + log.error("Programming failed: %s", e) + return False + + # --- Optional: second verification pass --- + log.info("Running standalone verification...") + if firmware_path.suffix in (".bin",): + matches = await session.flash.verify(bank=0, path=firmware_path) + if matches: + log.info("Standalone verification PASSED") + else: + log.error("Standalone verification FAILED") + return False + + # --- Reset and run --- + log.info("Resetting target to run new firmware...") + await session.target.reset(mode="run") + + # Brief pause, then check state + await asyncio.sleep(0.5) + state = await session.target.state() + log.info("Target state: %s", state.state) + + log.info("Firmware update complete") + return True + + +async def main(): + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} ") + sys.exit(1) + + firmware = Path(sys.argv[1]) + success = await flash_firmware(firmware) + sys.exit(0 if success else 1) + + +asyncio.run(main()) +``` + +## Sync version + +```python +import logging +import sys +from pathlib import Path + +from openocd import Session, FlashError, ConnectionError + +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") +log = logging.getLogger("flash-program") + + +def flash_firmware(firmware_path: Path) -> bool: + """Program a firmware image and verify it.""" + + if not firmware_path.exists(): + log.error("Firmware file not found: %s", firmware_path) + return False + + file_size = firmware_path.stat().st_size + log.info("Firmware: %s (%d bytes)", firmware_path.name, file_size) + + try: + session = Session.connect_sync(timeout=5.0) + except ConnectionError as e: + log.error("Cannot connect to OpenOCD: %s", e) + return False + + with session: + log.info("Halting target...") + session.target.halt() + + # Inspect flash + banks = session.flash.banks() + log.info("Found %d flash bank(s)", len(banks)) + + bank_info = session.flash.info(0) + log.info("Bank 0: %d sectors, %d KB", + len(bank_info.sectors), bank_info.size // 1024) + + if file_size > bank_info.size: + log.error("Firmware too large for flash bank") + return False + + # Erase + log.info("Erasing...") + try: + session.flash.erase_all(bank=0) + except FlashError as e: + log.error("Erase failed: %s", e) + return False + + # Program with built-in verify + log.info("Programming %s...", firmware_path.name) + try: + session.flash.write_image(firmware_path, erase=False, verify=True) + log.info("Programming complete") + except FlashError as e: + log.error("Programming failed: %s", e) + return False + + # Reset and run + log.info("Resetting target...") + session.target.reset(mode="run") + log.info("Done") + return True + + +def main(): + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} ") + sys.exit(1) + + success = flash_firmware(Path(sys.argv[1])) + sys.exit(0 if success else 1) + + +main() +``` + +## Expected output + +``` +INFO: Firmware: firmware.hex (32768 bytes) +INFO: Halting target... +INFO: Found 1 flash bank(s): +INFO: Bank #0: stm32f1x.flash @ 0x08000000, size=0x20000 (128 KB) +INFO: Bank 0 has 128 sectors +INFO: First sector: offset=0x0, size=0x400 +INFO: Last sector: offset=0x1FC00, size=0x400 +INFO: Erasing flash bank 0... +INFO: Erase complete +INFO: Programming firmware.hex... +INFO: Programming and verification complete +INFO: Running standalone verification... +INFO: Standalone verification PASSED +INFO: Resetting target to run new firmware... +INFO: Target state: running +INFO: Firmware update complete +``` + +## Selective sector erase + +For faster updates when only part of the flash changes, erase only the affected sectors instead of the whole bank: + +```python +async with await Session.connect() as session: + await session.target.halt() + + bank = await session.flash.info(0) + firmware_size = Path("firmware.bin").stat().st_size + + # Calculate which sectors the firmware occupies + last_sector = 0 + cumulative = 0 + for sector in bank.sectors: + cumulative = sector.offset + sector.size + if cumulative >= firmware_size: + last_sector = sector.index + break + + log.info("Erasing sectors 0-%d (covers %d bytes)", last_sector, cumulative) + await session.flash.erase_sector(bank=0, first=0, last=last_sector) + await session.flash.write_image(Path("firmware.bin"), erase=False, verify=True) +``` + +## Protecting the bootloader + +After programming, protect the bootloader region so it cannot be accidentally overwritten: + +```python +async with await Session.connect() as session: + await session.target.halt() + + # Program the full image + await session.flash.write_image(Path("firmware.hex"), erase=True, verify=True) + + # Protect the first 2 sectors (bootloader) + await session.flash.protect(bank=0, first=0, last=1, on=True) + log.info("Bootloader sectors protected") +``` diff --git a/src/content/docs/examples/svd-inspection.mdx b/src/content/docs/examples/svd-inspection.mdx index 6a01345..65f4f81 100644 --- a/src/content/docs/examples/svd-inspection.mdx +++ b/src/content/docs/examples/svd-inspection.mdx @@ -1,6 +1,224 @@ --- title: SVD Inspection -description: Load an SVD file and decode peripheral registers live +description: Load an SVD file, browse peripherals, and decode hardware register values into named bitfields. --- -Content coming soon. +import { Tabs, TabItem, Aside } from '@astrojs/starlight/components'; + +This example demonstrates using SVD metadata to inspect hardware registers on a live target. Instead of manually masking and shifting bits from raw hex values, the SVD subsystem decodes register contents into named fields with descriptions. + +## Async version + +```python +import asyncio +import logging +from pathlib import Path + +from openocd import Session, SVDError, ConnectionError + +logging.basicConfig(level=logging.INFO, format="%(message)s") +log = logging.getLogger("svd-inspect") + + +async def main(): + svd_path = Path("STM32F103.svd") + if not svd_path.exists(): + log.error("SVD file not found: %s", svd_path) + log.error("Download from your vendor's CMSIS pack or cmsis-svd-data on GitHub") + return + + try: + session = await Session.connect(timeout=5.0) + except ConnectionError as e: + log.error("Cannot connect to OpenOCD: %s", e) + return + + async with session: + # --- Load SVD --- + log.info("Loading SVD file: %s", svd_path) + await session.svd.load(svd_path) + + # --- Browse peripherals --- + peripherals = session.svd.list_peripherals() + log.info("Found %d peripherals", len(peripherals)) + log.info("First 10: %s", ", ".join(peripherals[:10])) + + # --- Browse registers in GPIOA --- + gpio_regs = session.svd.list_registers("GPIOA") + log.info("\nGPIOA registers: %s", ", ".join(gpio_regs)) + + # --- Halt and read live registers --- + await session.target.halt() + + # Read and decode GPIOA.ODR (Output Data Register) + log.info("\n--- GPIOA Output Data Register ---") + odr = await session.svd.read_register("GPIOA", "ODR") + log.info("%s", odr) + + # Read and decode GPIOA.IDR (Input Data Register) + log.info("\n--- GPIOA Input Data Register ---") + idr = await session.svd.read_register("GPIOA", "IDR") + log.info("%s", idr) + + # --- Inspect clock configuration --- + log.info("\n--- RCC Clock Control Register ---") + rcc_cr = await session.svd.read_register("RCC", "CR") + log.info("%s", rcc_cr) + + # Check specific fields + for field in rcc_cr.fields: + if field.name in ("HSEON", "HSERDY", "PLLON", "PLLRDY"): + status = "ON" if field.value else "OFF" + log.info(" %s: %s - %s", field.name, status, field.description) + + # --- Read an entire peripheral --- + log.info("\n--- All USART1 registers ---") + try: + usart_regs = await session.svd.read_peripheral("USART1") + for name, decoded in usart_regs.items(): + log.info("%s", decoded) + log.info("") # blank line between registers + except SVDError as e: + log.warning("Could not read USART1: %s", e) + + # --- Decode without hardware read --- + log.info("\n--- Offline decode example ---") + # Suppose we captured this value from a log file + captured_value = 0x0300_0083 + decoded = session.svd.decode("RCC", "CR", captured_value) + log.info("Decoding RCC.CR = 0x%08X:", captured_value) + log.info("%s", decoded) + + # --- Access individual field values programmatically --- + log.info("\n--- Programmatic field access ---") + odr = await session.svd.read_register("GPIOA", "ODR") + pin_states = [] + for field in odr.fields: + if field.value: + pin_states.append(field.name) + if pin_states: + log.info("GPIOA pins driven high: %s", ", ".join(pin_states)) + else: + log.info("No GPIOA pins driven high") + + # Resume the target + await session.target.resume() + + +asyncio.run(main()) +``` + +## Sync version + +```python +import logging +from pathlib import Path + +from openocd import Session, SVDError, ConnectionError + +logging.basicConfig(level=logging.INFO, format="%(message)s") +log = logging.getLogger("svd-inspect") + + +def main(): + svd_path = Path("STM32F103.svd") + if not svd_path.exists(): + log.error("SVD file not found: %s", svd_path) + return + + try: + session = Session.connect_sync(timeout=5.0) + except ConnectionError as e: + log.error("Cannot connect to OpenOCD: %s", e) + return + + with session: + # Load SVD + log.info("Loading SVD file: %s", svd_path) + session.svd.load(svd_path) + + # Browse + peripherals = session.svd.list_peripherals() + log.info("Found %d peripherals", len(peripherals)) + + gpio_regs = session.svd.list_registers("GPIOA") + log.info("GPIOA registers: %s", ", ".join(gpio_regs)) + + # Halt and read + session.target.halt() + + log.info("\n--- GPIOA Output Data Register ---") + odr = session.svd.read_register("GPIOA", "ODR") + log.info("%s", odr) + + log.info("\n--- RCC Clock Control Register ---") + rcc_cr = session.svd.read_register("RCC", "CR") + log.info("%s", rcc_cr) + + for field in rcc_cr.fields: + if field.name in ("HSEON", "HSERDY", "PLLON", "PLLRDY"): + status = "ON" if field.value else "OFF" + log.info(" %s: %s", field.name, status) + + # Decode a captured value without reading hardware + decoded = session.svd.decode("RCC", "CR", 0x0300_0083) + log.info("\nOffline decode of RCC.CR = 0x03000083:") + log.info("%s", decoded) + + session.target.resume() + + +main() +``` + +## Expected output + +``` +Loading SVD file: STM32F103.svd +Found 51 peripherals +First 10: ADC1, ADC2, AFIO, BKP, CAN, CRC, DAC, DMA1, DMA2, EXTI + +GPIOA registers: BRR, BSRR, CRH, CRL, IDR, LCKR, ODR + +--- GPIOA Output Data Register --- +GPIOA.ODR @ 0x4001080C = 0x00000001 + [ 0:0] ODR0 = 0x1 Port output data bit 0 + [ 1:1] ODR1 = 0x0 Port output data bit 1 + [ 2:2] ODR2 = 0x0 Port output data bit 2 + [ 3:3] ODR3 = 0x0 Port output data bit 3 + ... + +--- GPIOA Input Data Register --- +GPIOA.IDR @ 0x40010808 = 0x0000FFFD + [ 0:0] IDR0 = 0x1 Port input data bit 0 + [ 1:1] IDR1 = 0x0 Port input data bit 1 + ... + +--- RCC Clock Control Register --- +RCC.CR @ 0x40021000 = 0x03000083 + [ 0:0] HSION = 0x1 Internal high-speed clock enable + [ 1:1] HSIRDY = 0x1 Internal high-speed clock ready flag + [ 16:16] HSEON = 0x1 HSE clock enable + [ 17:17] HSERDY = 0x1 External high-speed clock ready flag + [ 24:24] PLLON = 0x1 PLL enable + [ 25:25] PLLRDY = 0x1 PLL clock ready flag + HSEON: ON - HSE clock enable + HSERDY: ON - External high-speed clock ready flag + PLLON: ON - PLL enable + PLLRDY: ON - PLL clock ready flag + +--- Offline decode example --- +Decoding RCC.CR = 0x03000083: +RCC.CR @ 0x40021000 = 0x03000083 + [ 0:0] HSION = 0x1 Internal high-speed clock enable + ... +``` + +## Key points + +- **`load()` is required first.** All other SVD methods raise `SVDError` if no SVD file has been loaded. +- **`list_peripherals()` and `list_registers()` are synchronous** -- they operate on in-memory data after the initial parse. No `await` needed in async mode. +- **`read_register()` reads from live hardware.** It computes the register's memory-mapped address from the SVD, reads 32 bits via the memory subsystem, and decodes the result. The target should be halted for consistent reads. +- **`decode()` works offline.** Pass a raw integer value and it returns the same `DecodedRegister` structure without touching hardware. Useful for analyzing values from log files or crash dumps. +- **`read_peripheral()` is fault-tolerant.** It silently skips write-only or otherwise unreadable registers and logs warnings. The returned dict contains only registers that were successfully read. +- **`DecodedRegister.__str__`** produces formatted output. Each field shows its bit range, name, value, and description -- no manual bit manipulation needed. diff --git a/src/content/docs/guides/breakpoints.mdx b/src/content/docs/guides/breakpoints.mdx index a1fe3df..17e496f 100644 --- a/src/content/docs/guides/breakpoints.mdx +++ b/src/content/docs/guides/breakpoints.mdx @@ -1,6 +1,259 @@ --- -title: Breakpoints & Watchpoints -description: Set and manage hardware and software breakpoints +title: Breakpoints and Watchpoints +description: Set, remove, and list hardware/software breakpoints and data watchpoints. --- -Content coming soon. +import { Tabs, TabItem, Aside } from '@astrojs/starlight/components'; + +The `BreakpointManager` subsystem wraps OpenOCD's `bp`, `rbp`, `wp`, and `rwp` commands to manage breakpoints that pause execution at a specific instruction address, and watchpoints that trigger on data memory access. + +Access it through the session: + +```python +# Async +session.breakpoints + +# Sync +sync_session.breakpoints +``` + +## Hardware vs. software breakpoints + +Breakpoints come in two flavors, each with different tradeoffs: + +**Software breakpoints** replace the instruction at the target address with a special breakpoint instruction (e.g. `BKPT` on ARM). When the CPU executes that address, it traps into debug mode. The original instruction is restored transparently. Software breakpoints are plentiful but require writable memory -- they do not work in flash unless the debugger patches flash. + +**Hardware breakpoints** use dedicated comparator registers built into the CPU core (e.g. the Flash Patch and Breakpoint unit on Cortex-M). They work on any memory region including flash, but the number available is limited -- typically 4 to 8 on Cortex-M devices. + + + +## Setting breakpoints + +`add(address, length=2, hw=False)` sets a breakpoint at the given instruction address. + +The `length` parameter indicates the instruction size: +- `2` for Thumb instructions (16-bit, the default on Cortex-M) +- `4` for ARM instructions (32-bit) + + + +```python +import asyncio +from openocd import Session + +async def main(): + async with await Session.connect() as session: + # Software breakpoint on a Thumb instruction + await session.breakpoints.add(0x0800_1234, length=2) + + # Hardware breakpoint on an ARM instruction + await session.breakpoints.add(0x0800_5678, length=4, hw=True) + +asyncio.run(main()) +``` + + +```python +from openocd import Session + +with Session.connect_sync() as session: + # Software breakpoint on a Thumb instruction + session.breakpoints.add(0x0800_1234, length=2) + + # Hardware breakpoint on an ARM instruction + session.breakpoints.add(0x0800_5678, length=4, hw=True) +``` + + + +## Removing breakpoints + +`remove(address)` removes the breakpoint at the specified address. + + + +```python +async with await Session.connect() as session: + await session.breakpoints.add(0x0800_1234) + # ... debug ... + await session.breakpoints.remove(0x0800_1234) +``` + + +```python +with Session.connect_sync() as session: + session.breakpoints.add(0x0800_1234) + # ... debug ... + session.breakpoints.remove(0x0800_1234) +``` + + + +## Listing breakpoints + +`list()` returns all active breakpoints as a list of `Breakpoint` dataclasses. + + + +```python +async with await Session.connect() as session: + await session.breakpoints.add(0x0800_1234) + await session.breakpoints.add(0x0800_5678, hw=True) + + bps = await session.breakpoints.list() + for bp in bps: + print(f"BP #{bp.number}: 0x{bp.address:08X} " + f"type={bp.type} len={bp.length} enabled={bp.enabled}") +``` + + +```python +with Session.connect_sync() as session: + session.breakpoints.add(0x0800_1234) + session.breakpoints.add(0x0800_5678, hw=True) + + bps = session.breakpoints.list() + for bp in bps: + print(f"BP #{bp.number}: 0x{bp.address:08X} " + f"type={bp.type} len={bp.length} enabled={bp.enabled}") +``` + + + +## Data watchpoints + +Watchpoints monitor data memory access rather than instruction execution. They trigger when the CPU reads from, writes to, or accesses a specific address range. + +### Setting a watchpoint + +`add_watchpoint(address, length, access="rw")` creates a data watchpoint. + +The `access` parameter controls which operations trigger the watchpoint: +- `"r"` -- read access only +- `"w"` -- write access only +- `"rw"` -- any access (read or write) + + + +```python +async with await Session.connect() as session: + # Break on any write to a 4-byte variable + await session.breakpoints.add_watchpoint( + address=0x2000_0100, + length=4, + access="w" + ) + + # Break on any access to a buffer + await session.breakpoints.add_watchpoint( + address=0x2000_0200, + length=16, + access="rw" + ) +``` + + +```python +with Session.connect_sync() as session: + session.breakpoints.add_watchpoint( + address=0x2000_0100, length=4, access="w" + ) + session.breakpoints.add_watchpoint( + address=0x2000_0200, length=16, access="rw" + ) +``` + + + + + +### Removing a watchpoint + +`remove_watchpoint(address)` removes the watchpoint at the given address. + +```python +await session.breakpoints.remove_watchpoint(0x2000_0100) +``` + +### Listing watchpoints + +`list_watchpoints()` returns all active watchpoints. + + + +```python +async with await Session.connect() as session: + await session.breakpoints.add_watchpoint(0x2000_0100, 4, "w") + + wps = await session.breakpoints.list_watchpoints() + for wp in wps: + print(f"WP #{wp.number}: 0x{wp.address:08X} " + f"len={wp.length} access={wp.access}") +``` + + +```python +with Session.connect_sync() as session: + session.breakpoints.add_watchpoint(0x2000_0100, 4, "w") + + wps = session.breakpoints.list_watchpoints() + for wp in wps: + print(f"WP #{wp.number}: 0x{wp.address:08X} " + f"len={wp.length} access={wp.access}") +``` + + + +## Data types + +### Breakpoint + +| Field | Type | Description | +|-------|------|-------------| +| `number` | `int` | Breakpoint index | +| `type` | `Literal["hw", "sw"]` | Hardware or software breakpoint | +| `address` | `int` | Instruction address | +| `length` | `int` | Instruction length in bytes (2 = Thumb, 4 = ARM) | +| `enabled` | `bool` | Whether the breakpoint is active | + +### Watchpoint + +| Field | Type | Description | +|-------|------|-------------| +| `number` | `int` | Watchpoint index | +| `address` | `int` | Watched memory address | +| `length` | `int` | Size of watched region in bytes | +| `access` | `Literal["r", "w", "rw"]` | Access type that triggers the watchpoint | + +## Error handling + +Breakpoint and watchpoint operations raise `BreakpointError` (a subclass of `OpenOCDError`) when OpenOCD reports a failure. + +```python +from openocd.breakpoints import BreakpointError + +try: + session.breakpoints.add(0x0800_1234, hw=True) +except BreakpointError as e: + print(f"Could not set breakpoint: {e}") +``` + +Common failure causes: +- No hardware breakpoint comparators available (reduce the number of HW breakpoints) +- Target not halted when attempting to set a software breakpoint +- Invalid address or length + +## Method reference + +| Method | Return Type | Description | +|--------|-------------|-------------| +| `add(address, length=2, hw=False)` | `None` | Set a breakpoint | +| `remove(address)` | `None` | Remove a breakpoint | +| `list()` | `list[Breakpoint]` | List active breakpoints | +| `add_watchpoint(address, length, access="rw")` | `None` | Set a data watchpoint | +| `remove_watchpoint(address)` | `None` | Remove a watchpoint | +| `list_watchpoints()` | `list[Watchpoint]` | List active watchpoints | diff --git a/src/content/docs/guides/event-callbacks.mdx b/src/content/docs/guides/event-callbacks.mdx index 3a638e7..5a14bad 100644 --- a/src/content/docs/guides/event-callbacks.mdx +++ b/src/content/docs/guides/event-callbacks.mdx @@ -1,6 +1,192 @@ --- title: Event Callbacks -description: Subscribe to target events and state changes +description: Receive asynchronous notifications when the target halts, resumes, resets, or changes state. --- -Content coming soon. +import { Tabs, TabItem, Aside } from '@astrojs/starlight/components'; + +The `EventManager` enables asynchronous target state notifications from OpenOCD. When enabled, OpenOCD pushes messages over a dedicated TCP connection whenever the target halts, resumes, resets, or a GDB client attaches/detaches. You register callbacks to react to these events without polling. + +The `Session` class also provides convenience shortcuts (`on_halt`, `on_reset`) for the most common cases. + +## Architecture: dual-socket design + +OpenOCD's TCL RPC notification system uses a **separate TCP connection** from the command channel. This prevents notifications from interleaving with command responses on the same stream. + +When you enable notifications: + +1. A second TCP connection opens to the same OpenOCD host and port +2. The command `tcl_notifications on` is sent on this second connection +3. A background asyncio task reads notification messages from this dedicated socket +4. Incoming messages are dispatched to registered callbacks + +The command connection remains unaffected -- you can send commands and receive notifications simultaneously without race conditions. + +## Quick start with Session shortcuts + +The simplest way to react to events is through the `Session.on_halt()` and `Session.on_reset()` methods. These register notification callbacks that filter for specific keywords in the message. + +```python +import asyncio +from openocd import Session + +async def main(): + async with await Session.connect() as session: + # Register event handlers + session.on_halt(lambda msg: print(f"Target halted: {msg}")) + session.on_reset(lambda msg: print(f"Target reset: {msg}")) + + # Resume the target and wait for it to hit a breakpoint + await session.breakpoints.add(0x0800_1234, hw=True) + await session.target.resume() + + # Give it time to trigger + await asyncio.sleep(2.0) + +asyncio.run(main()) +``` + + + +## Using EventManager directly + +For finer control, use the `EventManager` class. It supports multiple event types and allows registering and unregistering individual callbacks. + +### Enabling notifications + +`enable()` opens the notification socket and starts the background listener. Call this before registering callbacks. + +```python +from openocd.events import EventManager + +async with await Session.connect() as session: + events = EventManager(session._conn) + await events.enable() + print(f"Notifications enabled: {events.enabled}") +``` + +### Registering callbacks + +`on(event_type, callback)` registers a callback for a specific event. Matching is case-insensitive substring: a notification containing "halted" anywhere in its text triggers all callbacks registered for the `"halted"` event type. + +```python +from openocd.events import ( + EventManager, + EVENT_HALTED, + EVENT_RESUMED, + EVENT_RESET, + EVENT_GDB_ATTACHED, + EVENT_GDB_DETACHED, +) + +def on_halted(msg: str) -> None: + print(f"[HALT] {msg}") + +def on_resumed(msg: str) -> None: + print(f"[RESUME] {msg}") + +events = EventManager(session._conn) +await events.enable() + +events.on(EVENT_HALTED, on_halted) +events.on(EVENT_RESUMED, on_resumed) +events.on(EVENT_RESET, lambda msg: print(f"[RESET] {msg}")) +events.on(EVENT_GDB_ATTACHED, lambda msg: print(f"[GDB+] {msg}")) +events.on(EVENT_GDB_DETACHED, lambda msg: print(f"[GDB-] {msg}")) +``` + +### Unregistering callbacks + +`off(event_type, callback)` removes a previously registered callback. If the callback was not registered, `off()` silently does nothing. + +```python +events.off(EVENT_HALTED, on_halted) +``` + +## Known event types + +The `events` module defines constants for known notification types: + +| Constant | String Value | Trigger | +|----------|-------------|---------| +| `EVENT_HALTED` | `"halted"` | Target entered halted state | +| `EVENT_RESUMED` | `"resumed"` | Target resumed execution | +| `EVENT_RESET` | `"reset"` | Target was reset | +| `EVENT_GDB_ATTACHED` | `"gdb-attached"` | GDB client connected | +| `EVENT_GDB_DETACHED` | `"gdb-detached"` | GDB client disconnected | + +These are not an exhaustive list -- OpenOCD may emit other notification strings depending on configuration. You can register callbacks for any substring pattern. + +## Complete example + +```python +import asyncio +from openocd import Session +from openocd.events import EventManager, EVENT_HALTED, EVENT_RESUMED + +halt_count = 0 + +def count_halts(msg: str) -> None: + global halt_count + halt_count += 1 + print(f"Halt #{halt_count}: {msg}") + +async def main(): + async with await Session.connect() as session: + # Set up event monitoring + events = EventManager(session._conn) + await events.enable() + events.on(EVENT_HALTED, count_halts) + events.on(EVENT_RESUMED, lambda m: print(f"Resumed: {m}")) + + # Set a breakpoint and let the target run + await session.breakpoints.add(0x0800_1234, hw=True) + await session.target.resume() + + # Monitor for 5 seconds + await asyncio.sleep(5.0) + + print(f"Total halts observed: {halt_count}") + +asyncio.run(main()) +``` + +## Callback behavior + +Callbacks run synchronously within the notification reader's asyncio task. Keep these points in mind: + +- Callbacks receive the **full notification message string** as their single argument. +- Callbacks should be fast and non-blocking. Dispatch long-running work to a separate task. +- Exceptions in callbacks are caught and logged -- they do not crash the notification loop. +- Multiple callbacks for the same event type are called in registration order. +- The same callback function will not be registered twice for the same event type. + +```python +# Fast callback -- good +def on_halt(msg: str) -> None: + print(f"Halted: {msg}") + +# Dispatching slow work -- good +def on_halt_with_work(msg: str) -> None: + asyncio.create_task(analyze_state(msg)) +``` + +## EventManager API reference + +| Member | Type | Description | +|--------|------|-------------| +| `enable()` | `async` | Send `tcl_notifications on`, open notification socket | +| `on(event_type, callback)` | sync | Register a callback (case-insensitive substring match) | +| `off(event_type, callback)` | sync | Unregister a callback | +| `enabled` (property) | `bool` | Whether notifications are active | + +## Session shortcuts + +| Method | Trigger Keyword | Description | +|--------|----------------|-------------| +| `session.on_halt(callback)` | `"halted"` | Register a halt callback on the connection | +| `session.on_reset(callback)` | `"reset"` | Register a reset callback on the connection | + +These shortcuts register directly on the connection's notification handler and work whenever the notification listener is active. diff --git a/src/content/docs/guides/flash-programming.mdx b/src/content/docs/guides/flash-programming.mdx index 219e83c..8d45e39 100644 --- a/src/content/docs/guides/flash-programming.mdx +++ b/src/content/docs/guides/flash-programming.mdx @@ -1,6 +1,316 @@ --- title: Flash Programming -description: Erase, write, and verify flash memory +description: Read, write, erase, verify, and protect on-chip flash memory banks through OpenOCD. --- -Content coming soon. +import { Tabs, TabItem, Aside } from '@astrojs/starlight/components'; + +The `Flash` subsystem wraps OpenOCD's `flash` command family for programming on-chip flash memory. It handles bank enumeration, sector-level erase, raw byte read/write through temporary files, high-level firmware image programming, verification, and write protection. + +Access it through the session: + +```python +# Async +session.flash + +# Sync +sync_session.flash +``` + +## Listing flash banks + +`banks()` returns a list of `FlashBank` descriptors for every flash bank OpenOCD has configured. These come without detailed sector information -- use `info()` for that. + + + +```python +import asyncio +from openocd import Session + +async def main(): + async with await Session.connect() as session: + banks = await session.flash.banks() + for bank in banks: + print(f"Bank #{bank.index}: {bank.name}") + print(f" Base: 0x{bank.base:08X}, Size: 0x{bank.size:X}") + print(f" Bus width: {bank.bus_width}, Chip width: {bank.chip_width}") + print(f" Target: {bank.target}") + +asyncio.run(main()) +``` + + +```python +from openocd import Session + +with Session.connect_sync() as session: + banks = session.flash.banks() + for bank in banks: + print(f"Bank #{bank.index}: {bank.name}") + print(f" Base: 0x{bank.base:08X}, Size: 0x{bank.size:X}") + print(f" Bus width: {bank.bus_width}, Chip width: {bank.chip_width}") + print(f" Target: {bank.target}") +``` + + + +## Getting bank details with sectors + +`info(bank=0)` returns a `FlashBank` with its `sectors` list populated. Each sector is a `FlashSector` with index, offset, size, and protection status. + + + +```python +async with await Session.connect() as session: + bank = await session.flash.info(0) + print(f"{bank.name}: {len(bank.sectors)} sectors") + for sector in bank.sectors: + prot = "protected" if sector.protected else "unprotected" + print(f" Sector {sector.index}: offset=0x{sector.offset:X}, " + f"size=0x{sector.size:X}, {prot}") +``` + + +```python +with Session.connect_sync() as session: + bank = session.flash.info(0) + print(f"{bank.name}: {len(bank.sectors)} sectors") + for sector in bank.sectors: + prot = "protected" if sector.protected else "unprotected" + print(f" Sector {sector.index}: offset=0x{sector.offset:X}, " + f"size=0x{sector.size:X}, {prot}") +``` + + + +## Reading flash + +Two methods for reading flash content: + +- **`read(bank, offset, size)`** -- returns raw `bytes` by writing to a temp file, then reading the data back through TCL or from the local filesystem. +- **`read_to_file(bank, path)`** -- dumps the entire bank directly to a file on disk. + + + +```python +from pathlib import Path + +async with await Session.connect() as session: + # Read 256 bytes from the start of bank 0 + data = await session.flash.read(bank=0, offset=0, size=256) + print(f"Read {len(data)} bytes: {data[:16].hex()}") + + # Dump the entire bank to a file + await session.flash.read_to_file(bank=0, path=Path("flash_dump.bin")) +``` + + +```python +from pathlib import Path + +with Session.connect_sync() as session: + data = session.flash.read(bank=0, offset=0, size=256) + print(f"Read {len(data)} bytes: {data[:16].hex()}") + + session.flash.read_to_file(bank=0, path=Path("flash_dump.bin")) +``` + + + + + +## Writing flash + +### Raw byte write + +`write(bank, offset, data)` writes raw bytes to a flash bank at a given offset. Like `read()`, it uses a temporary file since OpenOCD's `flash write_bank` reads from a file. + + + +```python +async with await Session.connect() as session: + config_data = b"\x01\x02\x03\x04" + await session.flash.write(bank=0, offset=0x1000, data=config_data) +``` + + +```python +with Session.connect_sync() as session: + config_data = b"\x01\x02\x03\x04" + session.flash.write(bank=0, offset=0x1000, data=config_data) +``` + + + + + +### Firmware image programming + +`write_image(path, erase=True, verify=True)` is the high-level "flash and go" command. It handles erase, write, and verification in one operation. It accepts `.bin`, `.hex`, and `.elf` files. + + + +```python +from pathlib import Path + +async with await Session.connect() as session: + firmware = Path("build/firmware.hex") + await session.flash.write_image(firmware, erase=True, verify=True) + print("Firmware programmed and verified") +``` + + +```python +from pathlib import Path + +with Session.connect_sync() as session: + firmware = Path("build/firmware.hex") + session.flash.write_image(firmware, erase=True, verify=True) + print("Firmware programmed and verified") +``` + + + +When `verify=True`, the method runs `verify_image` after writing. If the verification finds a mismatch, it raises `FlashError`. + +## Erasing flash + +### Erase a sector range + +`erase_sector(bank, first, last)` erases sectors from `first` to `last` (both inclusive). Validates that `first <= last` and raises `FlashError` if the range is invalid. + + + +```python +async with await Session.connect() as session: + # Erase sectors 0 through 3 in bank 0 + await session.flash.erase_sector(bank=0, first=0, last=3) +``` + + +```python +with Session.connect_sync() as session: + session.flash.erase_sector(bank=0, first=0, last=3) +``` + + + +### Erase an entire bank + +`erase_all(bank=0)` queries the bank info to find the last sector, then erases the full range. + +```python +await session.flash.erase_all(bank=0) +``` + +## Write protection + +`protect(bank, first, last, on)` sets or clears hardware write protection on a range of sectors. + + + +```python +async with await Session.connect() as session: + # Protect the bootloader (sectors 0-1) + await session.flash.protect(bank=0, first=0, last=1, on=True) + + # Unprotect application sectors (2-7) + await session.flash.protect(bank=0, first=2, last=7, on=False) +``` + + +```python +with Session.connect_sync() as session: + session.flash.protect(bank=0, first=0, last=1, on=True) + session.flash.protect(bank=0, first=2, last=7, on=False) +``` + + + +## Verifying flash + +`verify(bank, path)` compares flash contents against a reference binary file and returns `True` if they match, `False` otherwise. + + + +```python +from pathlib import Path + +async with await Session.connect() as session: + matches = await session.flash.verify(bank=0, path=Path("golden.bin")) + if matches: + print("Flash contents match reference file") + else: + print("MISMATCH detected") +``` + + +```python +from pathlib import Path + +with Session.connect_sync() as session: + matches = session.flash.verify(bank=0, path=Path("golden.bin")) + if matches: + print("Flash contents match reference file") + else: + print("MISMATCH detected") +``` + + + +## Data types + +### FlashBank + +| Field | Type | Description | +|-------|------|-------------| +| `index` | `int` | Bank number | +| `name` | `str` | Bank name (e.g. `stm32f1x.flash`) | +| `base` | `int` | Base address | +| `size` | `int` | Total size in bytes | +| `bus_width` | `int` | Bus width | +| `chip_width` | `int` | Chip width | +| `target` | `str` | Associated target or driver name | +| `sectors` | `list[FlashSector]` | Sector list (empty from `banks()`, populated from `info()`) | + +### FlashSector + +| Field | Type | Description | +|-------|------|-------------| +| `index` | `int` | Sector number within the bank | +| `offset` | `int` | Byte offset from the bank base | +| `size` | `int` | Sector size in bytes | +| `protected` | `bool` | Whether write protection is enabled | + +## Error handling + +All flash operations raise `FlashError` on failure. The error message includes the OpenOCD response for diagnostics. + +```python +from openocd import Session, FlashError + +try: + with Session.connect_sync() as session: + session.flash.write_image(Path("firmware.bin")) +except FlashError as e: + print(f"Flash operation failed: {e}") +``` + +## Method reference + +| Method | Return Type | Description | +|--------|-------------|-------------| +| `banks()` | `list[FlashBank]` | List all configured flash banks | +| `info(bank=0)` | `FlashBank` | Detailed bank info with sectors | +| `read(bank, offset, size)` | `bytes` | Read raw flash via temp file | +| `read_to_file(bank, path)` | `None` | Dump entire bank to file | +| `write(bank, offset, data)` | `None` | Write raw bytes via temp file | +| `write_image(path, erase=True, verify=True)` | `None` | High-level flash programming | +| `erase_sector(bank, first, last)` | `None` | Erase a sector range | +| `erase_all(bank=0)` | `None` | Erase entire bank | +| `protect(bank, first, last, on)` | `None` | Set/clear write protection | +| `verify(bank, path)` | `bool` | Verify flash against a file | diff --git a/src/content/docs/guides/jtag-operations.mdx b/src/content/docs/guides/jtag-operations.mdx index 7468a4c..2a49592 100644 --- a/src/content/docs/guides/jtag-operations.mdx +++ b/src/content/docs/guides/jtag-operations.mdx @@ -1,6 +1,308 @@ --- title: JTAG Operations -description: Low-level JTAG scan chain access and TAP control +description: Scan chain enumeration, IR/DR scan operations, TAP state control, and boundary scan file execution. --- -Content coming soon. +import { Tabs, TabItem, Aside } from '@astrojs/starlight/components'; + +The `JTAGController` provides direct access to the JTAG interface: chain discovery, register scanning, TAP state machine control, and boundary scan file execution. It acts as a facade that delegates to specialized submodules (`chain`, `scan`, `state`, `boundary`). + +Access it through the session: + +```python +# Async +session.jtag + +# Sync +sync_session.jtag +``` + + + +## Scan chain discovery + +`scan_chain()` queries OpenOCD for every TAP (Test Access Port) on the JTAG chain and returns them as `TAPInfo` dataclasses. + + + +```python +import asyncio +from openocd import Session + +async def main(): + async with await Session.connect() as session: + taps = await session.jtag.scan_chain() + for tap in taps: + print(f"TAP: {tap.name}") + print(f" Chip: {tap.chip}, TAP name: {tap.tap_name}") + print(f" IDCODE: 0x{tap.idcode:08X}") + print(f" IR length: {tap.ir_length} bits") + print(f" Enabled: {tap.enabled}") + +asyncio.run(main()) +``` + + +```python +from openocd import Session + +with Session.connect_sync() as session: + taps = session.jtag.scan_chain() + for tap in taps: + print(f"TAP: {tap.name}") + print(f" Chip: {tap.chip}, TAP name: {tap.tap_name}") + print(f" IDCODE: 0x{tap.idcode:08X}") + print(f" IR length: {tap.ir_length} bits") + print(f" Enabled: {tap.enabled}") +``` + + + +A typical STM32 output: + +``` +TAP: stm32f1x.cpu + Chip: stm32f1x, TAP name: cpu + IDCODE: 0x3BA00477 + IR length: 4 bits + Enabled: True +``` + +## Adding a new TAP + +`new_tap(chip, tap, ir_len, expected_id=None)` declares a new TAP on the chain. This is typically done before scan chain initialization, but can be useful when dynamically configuring multi-device chains. + + + +```python +async with await Session.connect() as session: + # Declare a TAP with known IDCODE + await session.jtag.new_tap( + chip="fpga", + tap="bs", + ir_len=6, + expected_id=0x0362D093 + ) + + # Declare a TAP without IDCODE verification + await session.jtag.new_tap(chip="cpld", tap="cpu", ir_len=8) +``` + + +```python +with Session.connect_sync() as session: + session.jtag.new_tap( + chip="fpga", tap="bs", ir_len=6, expected_id=0x0362D093 + ) +``` + + + +## Scan operations + +### Instruction register scan + +`irscan(tap, instruction)` shifts an instruction into a TAP's instruction register (IR) and returns the value shifted out. + + + +```python +async with await Session.connect() as session: + # Select the IDCODE instruction (commonly 0x0E on ARM DAPs) + shifted_out = await session.jtag.irscan("stm32f1x.cpu", 0x0E) + print(f"IR shifted out: 0x{shifted_out:X}") +``` + + +```python +with Session.connect_sync() as session: + shifted_out = session.jtag.irscan("stm32f1x.cpu", 0x0E) + print(f"IR shifted out: 0x{shifted_out:X}") +``` + + + +### Data register scan + +`drscan(tap, bits, value)` shifts a value of the specified bit width through the data register (DR) and returns the captured output. + + + +```python +async with await Session.connect() as session: + # Read IDCODE: shift 32 bits through DR after selecting IDCODE via IR + await session.jtag.irscan("stm32f1x.cpu", 0x0E) + idcode = await session.jtag.drscan("stm32f1x.cpu", 32, 0x0) + print(f"IDCODE: 0x{idcode:08X}") +``` + + +```python +with Session.connect_sync() as session: + session.jtag.irscan("stm32f1x.cpu", 0x0E) + idcode = session.jtag.drscan("stm32f1x.cpu", 32, 0x0) + print(f"IDCODE: 0x{idcode:08X}") +``` + + + +### Run-Test/Idle clocking + +`runtest(cycles)` clocks the specified number of TCK pulses while the TAP controller is in the Run-Test/Idle state. Some devices require idle clocking between operations. + +```python +# Clock 100 TCK cycles in Run-Test/Idle +await session.jtag.runtest(100) +``` + +The cycle count must be non-negative; passing a negative value raises `JTAGError`. + +## TAP state machine control + +`pathmove(states)` walks the TAP controller through an explicit sequence of IEEE 1149.1 states. Each state must be a legal single-step transition from the previous one. OpenOCD validates the path and reports an error for illegal transitions. + + + +```python +from openocd import Session, JTAGState + +async with await Session.connect() as session: + await session.jtag.pathmove([ + JTAGState.DRSELECT, + JTAGState.DRCAPTURE, + JTAGState.DRSHIFT, + ]) +``` + + +```python +from openocd import Session, JTAGState + +with Session.connect_sync() as session: + session.jtag.pathmove([ + JTAGState.DRSELECT, + JTAGState.DRCAPTURE, + JTAGState.DRSHIFT, + ]) +``` + + + +The list must contain at least one state. An empty list raises `JTAGError`. + +### JTAGState enum + +The `JTAGState` enum defines all 16 IEEE 1149.1 TAP controller states: + +| State | Description | +|-------|-------------| +| `RESET` | Test-Logic-Reset | +| `IDLE` | Run-Test/Idle | +| `DRSELECT` | Select-DR-Scan | +| `DRCAPTURE` | Capture-DR | +| `DRSHIFT` | Shift-DR | +| `DREXIT1` | Exit1-DR | +| `DRPAUSE` | Pause-DR | +| `DREXIT2` | Exit2-DR | +| `DRUPDATE` | Update-DR | +| `IRSELECT` | Select-IR-Scan | +| `IRCAPTURE` | Capture-IR | +| `IRSHIFT` | Shift-IR | +| `IREXIT1` | Exit1-IR | +| `IRPAUSE` | Pause-IR | +| `IREXIT2` | Exit2-IR | +| `IRUPDATE` | Update-IR | + +`JTAGState` is a `str` enum, so `JTAGState.IDLE.value` returns the string `"IDLE"`. + +## Boundary scan files + +### SVF execution + +`svf(path, tap=None, quiet=False, progress=True)` executes a Serial Vector Format file. SVF files describe JTAG test vectors and are commonly used for FPGA configuration, board-level test, and CPLD programming. + + + +```python +from pathlib import Path + +async with await Session.connect() as session: + await session.jtag.svf( + path=Path("board_test.svf"), + tap="fpga.bs", + quiet=False, + progress=True, + ) +``` + + +```python +from pathlib import Path + +with Session.connect_sync() as session: + session.jtag.svf( + path=Path("board_test.svf"), + tap="fpga.bs", + quiet=False, + progress=True, + ) +``` + + + +Parameters: +- `path` -- path to the `.svf` file +- `tap` -- restrict operations to a specific TAP (optional; when `None`, OpenOCD applies vectors to the appropriate TAP) +- `quiet` -- suppress per-statement logging inside OpenOCD +- `progress` -- show a progress indicator (default `True`) + +### XSVF execution + +`xsvf(tap, path)` executes a Xilinx-extended SVF file against a specific TAP. + +```python +from pathlib import Path + +await session.jtag.xsvf("cpld.bs", Path("config.xsvf")) +``` + +## Data types + +### TAPInfo + +| Field | Type | Description | +|-------|------|-------------| +| `name` | `str` | Full TAP name (e.g. `stm32f1x.cpu`) | +| `chip` | `str` | Chip portion of the name | +| `tap_name` | `str` | TAP portion of the name | +| `idcode` | `int` | Detected IDCODE | +| `ir_length` | `int` | Instruction register length in bits | +| `enabled` | `bool` | Whether the TAP is enabled | + +## Error handling + +All JTAG operations raise `JTAGError` on failure. + +```python +from openocd import JTAGError + +try: + await session.jtag.irscan("nonexistent.tap", 0x0E) +except JTAGError as e: + print(f"JTAG error: {e}") +``` + +## Method reference + +| Method | Return Type | Description | +|--------|-------------|-------------| +| `scan_chain()` | `list[TAPInfo]` | Enumerate TAPs on the chain | +| `new_tap(chip, tap, ir_len, expected_id=None)` | `None` | Declare a new TAP | +| `irscan(tap, instruction)` | `int` | Shift instruction into IR | +| `drscan(tap, bits, value)` | `int` | Shift data through DR | +| `runtest(cycles)` | `None` | Clock TCK in Run-Test/Idle | +| `pathmove(states)` | `None` | Walk TAP through state sequence | +| `svf(path, tap=None, quiet=False, progress=True)` | `None` | Execute SVF file | +| `xsvf(tap, path)` | `None` | Execute XSVF file | diff --git a/src/content/docs/guides/rtt-communication.mdx b/src/content/docs/guides/rtt-communication.mdx index ffc881c..9fc155b 100644 --- a/src/content/docs/guides/rtt-communication.mdx +++ b/src/content/docs/guides/rtt-communication.mdx @@ -1,6 +1,237 @@ --- title: RTT Communication -description: Real-Time Transfer channel setup and data streaming +description: High-speed bidirectional data transfer between host and target using SEGGER Real-Time Transfer. --- -Content coming soon. +import { Tabs, TabItem, Aside } from '@astrojs/starlight/components'; + +The `RTTManager` subsystem provides access to SEGGER Real-Time Transfer (RTT), a protocol for high-speed bidirectional communication between a debug host and an embedded target. RTT uses a shared control block in target RAM rather than dedicated hardware, making it significantly faster than semihosting while requiring no additional pins. + +Access it through the session: + +```python +session.rtt +``` + +## How RTT works + +RTT places a small control block in the target's RAM that contains ring buffers for communication. The control block starts with a known identifier string (typically `"SEGGER RTT"`) so the debugger can locate it by scanning a RAM region. + +Communication flows through numbered **channels**: +- **Up-channels** (target to host): the firmware writes data that the host reads +- **Down-channels** (host to target): the host writes data that the firmware reads + +Channel 0 is conventionally used as a terminal for `printf`-style logging. + + + +## Setup and lifecycle + +The typical RTT flow is: **setup** (configure search parameters), then **start** (find the control block and activate channels), then **read/write**, then **stop**. + +### Configuring the search region + +`setup(address, size, id_string="SEGGER RTT")` tells OpenOCD where to look for the RTT control block in target RAM. + + + +```python +import asyncio +from openocd import Session + +async def main(): + async with await Session.connect() as session: + # Search for the control block in the first 8KB of SRAM + await session.rtt.setup( + address=0x2000_0000, + size=0x2000, + id_string="SEGGER RTT" + ) + +asyncio.run(main()) +``` + + +```python +from openocd import Session + +with Session.connect_sync() as session: + # Use the raw command interface for sync RTT access + session.command('rtt setup 0x20000000 0x2000 "SEGGER RTT"') +``` + + + +Parameters: +- `address` -- start address of the RAM region to search +- `size` -- size of the search region in bytes +- `id_string` -- RTT control block identifier (default `"SEGGER RTT"`) + + + +### Starting RTT + +`start()` scans the configured region for the control block and activates all discovered channels. + +```python +await session.rtt.start() +``` + +Raises `OpenOCDError` if the control block is not found. Make sure the target firmware is running and has initialized RTT before calling `start()`. + +### Stopping RTT + +`stop()` deactivates RTT communication. + +```python +await session.rtt.stop() +``` + +## Discovering channels + +`channels()` returns a list of `RTTChannel` descriptors after `start()` has been called. + +```python +async with await Session.connect() as session: + await session.rtt.setup(address=0x2000_0000, size=0x2000) + await session.rtt.start() + + channels = await session.rtt.channels() + for ch in channels: + direction = "target->host" if ch.direction == "up" else "host->target" + print(f"Channel {ch.index}: {ch.name} " + f"(size={ch.size}, {direction})") +``` + +Typical output: + +``` +Channel 0: Terminal (size=1024, target->host) +Channel 0: Terminal (size=16, host->target) +``` + +## Reading data + +`read(channel)` reads pending data from an up-channel (target to host). Returns an empty string if nothing is available. + +```python +data = await session.rtt.read(0) +if data: + print(f"Received: {data}") +``` + +For continuous monitoring, poll in a loop: + +```python +import asyncio + +async def rtt_monitor(session): + await session.rtt.setup(address=0x2000_0000, size=0x2000) + await session.rtt.start() + + try: + while True: + data = await session.rtt.read(0) + if data: + print(data, end="", flush=True) + await asyncio.sleep(0.05) # 50ms poll interval + finally: + await session.rtt.stop() +``` + +## Writing data + +`write(channel, data)` sends a string to a down-channel (host to target). + +```python +await session.rtt.write(0, "help\n") + +# Wait for the target to process and respond +await asyncio.sleep(0.1) +response = await session.rtt.read(0) +print(response) +``` + + + +## Complete example + +```python +import asyncio +from openocd import Session + +async def main(): + async with await Session.connect() as session: + # Make sure the target is running (RTT needs active firmware) + await session.target.reset(mode="run") + await asyncio.sleep(0.5) # Let firmware initialize + + # Configure and start RTT + await session.rtt.setup( + address=0x2000_0000, + size=0x4000, + id_string="SEGGER RTT" + ) + await session.rtt.start() + + # List available channels + channels = await session.rtt.channels() + print(f"Found {len(channels)} RTT channels") + + # Read for a few seconds + for _ in range(20): + data = await session.rtt.read(0) + if data: + print(f"[RTT] {data}", end="") + await asyncio.sleep(0.1) + + await session.rtt.stop() + +asyncio.run(main()) +``` + +## Data types + +### RTTChannel + +| Field | Type | Description | +|-------|------|-------------| +| `index` | `int` | Channel number | +| `name` | `str` | Channel name (e.g. `Terminal`) | +| `size` | `int` | Buffer size in bytes | +| `direction` | `Literal["up", "down"]` | `up` = target-to-host, `down` = host-to-target | + +## Error handling + +RTT operations raise `OpenOCDError` on failure (there is no dedicated RTT exception type). + +```python +from openocd import OpenOCDError + +try: + await session.rtt.start() +except OpenOCDError as e: + print(f"RTT failed: {e}") +``` + +Common failure causes: +- Control block not found (firmware not running, wrong search address or size) +- RTT not set up before calling `start()` +- Channel index out of range + +## Method reference + +| Method | Return Type | Description | +|--------|-------------|-------------| +| `setup(address, size, id_string="SEGGER RTT")` | `None` | Configure control block search | +| `start()` | `None` | Find control block, activate channels | +| `stop()` | `None` | Deactivate RTT | +| `channels()` | `list[RTTChannel]` | List discovered channels | +| `read(channel)` | `str` | Read from an up-channel | +| `write(channel, data)` | `None` | Write to a down-channel | diff --git a/src/content/docs/guides/svd-decoding.mdx b/src/content/docs/guides/svd-decoding.mdx index 95b3c4e..17d7480 100644 --- a/src/content/docs/guides/svd-decoding.mdx +++ b/src/content/docs/guides/svd-decoding.mdx @@ -1,6 +1,258 @@ --- title: SVD Register Decoding -description: Decode peripheral registers using CMSIS-SVD metadata +description: Load CMSIS-SVD files and decode hardware register values into named bitfields. --- -Content coming soon. +import { Tabs, TabItem, Aside } from '@astrojs/starlight/components'; + +The `SVDManager` subsystem integrates CMSIS-SVD metadata with live hardware reads. Given an SVD file describing a microcontroller's peripheral registers, it can read a register from the target and decode its raw value into named bitfields -- turning opaque hex values into human-readable output. + +Access it through the session: + +```python +# Async +session.svd + +# Sync +sync_session.svd +``` + + + +## Prerequisites + +The SVD subsystem requires the `cmsis-svd` package, which is included in the default dependencies: + +```bash +uv add openocd-python +``` + +## Loading an SVD file + +`load(svd_path)` parses an SVD XML file and indexes its peripherals and registers. The file parse runs in a background thread (via `asyncio.to_thread`) to avoid blocking the event loop on large SVD files. + + + +```python +import asyncio +from pathlib import Path +from openocd import Session + +async def main(): + async with await Session.connect() as session: + await session.svd.load(Path("STM32F103.svd")) + print(f"SVD loaded: {session.svd.loaded}") + +asyncio.run(main()) +``` + + +```python +from pathlib import Path +from openocd import Session + +with Session.connect_sync() as session: + session.svd.load(Path("STM32F103.svd")) + print(f"SVD loaded: {session.svd.loaded}") +``` + + + +Raises `SVDError` if the file is not found or cannot be parsed. + +## Browsing peripherals and registers + +After loading, enumerate what is available. Both `list_peripherals()` and `list_registers()` are synchronous methods (no `await` needed) since they operate on in-memory data. + + + +```python +async with await Session.connect() as session: + await session.svd.load(Path("STM32F103.svd")) + + # List all peripherals + peripherals = session.svd.list_peripherals() + print(f"Found {len(peripherals)} peripherals:") + for name in peripherals[:10]: + print(f" {name}") + + # List registers in a specific peripheral + regs = session.svd.list_registers("GPIOA") + print(f"\nGPIOA registers: {', '.join(regs)}") +``` + + +```python +with Session.connect_sync() as session: + session.svd.load(Path("STM32F103.svd")) + + peripherals = session.svd.list_peripherals() + print(f"Found {len(peripherals)} peripherals:") + for name in peripherals[:10]: + print(f" {name}") + + regs = session.svd.list_registers("GPIOA") + print(f"\nGPIOA registers: {', '.join(regs)}") +``` + + + +## Reading and decoding a register + +`read_register(peripheral, register)` is the primary method. It computes the register's memory-mapped address from the SVD metadata, reads 32 bits from that address on the target, and decodes the raw value into named bitfields. + + + +```python +async with await Session.connect() as session: + await session.svd.load(Path("STM32F103.svd")) + await session.target.halt() + + decoded = await session.svd.read_register("GPIOA", "ODR") + print(decoded) +``` + + +```python +with Session.connect_sync() as session: + session.svd.load(Path("STM32F103.svd")) + session.target.halt() + + decoded = session.svd.read_register("GPIOA", "ODR") + print(decoded) +``` + + + +The `DecodedRegister.__str__` method formats the output: + +``` +GPIOA.ODR @ 0x4001080C = 0x00000001 + [ 0:0] ODR0 = 0x1 Port output data bit 0 + [ 1:1] ODR1 = 0x0 Port output data bit 1 + [ 2:2] ODR2 = 0x0 Port output data bit 2 + ... +``` + +Each line shows the bit range, field name, extracted value, and the description from the SVD metadata. + +## Reading an entire peripheral + +`read_peripheral(peripheral)` reads and decodes every register in a peripheral, returning a dict keyed by register name. + + + +```python +async with await Session.connect() as session: + await session.svd.load(Path("STM32F103.svd")) + await session.target.halt() + + all_regs = await session.svd.read_peripheral("GPIOA") + for name, decoded in all_regs.items(): + print(f"\n{decoded}") +``` + + +```python +with Session.connect_sync() as session: + session.svd.load(Path("STM32F103.svd")) + session.target.halt() + + all_regs = session.svd.read_peripheral("GPIOA") + for name, decoded in all_regs.items(): + print(f"\n{decoded}") +``` + + + + + +## Decoding without hardware + +`decode(peripheral, register, value)` decodes a raw integer value using SVD metadata without performing any hardware read. Useful when you already have the value from a log file, a previous read, or a known reset value. + + + +```python +async with await Session.connect() as session: + await session.svd.load(Path("STM32F103.svd")) + + # Decode a known value -- no target read needed + decoded = session.svd.decode("RCC", "CR", 0x0300_0083) + print(decoded) +``` + + +```python +with Session.connect_sync() as session: + session.svd.load(Path("STM32F103.svd")) + + decoded = session.svd.decode("RCC", "CR", 0x0300_0083) + print(decoded) +``` + + + +`decode()` is synchronous in both APIs -- it operates purely on in-memory data. + +## Data types + +### DecodedRegister + +| Field | Type | Description | +|-------|------|-------------| +| `peripheral` | `str` | Peripheral name (e.g. `GPIOA`) | +| `register` | `str` | Register name (e.g. `ODR`) | +| `address` | `int` | Memory-mapped address | +| `raw_value` | `int` | Raw 32-bit value read from hardware | +| `fields` | `list[BitField]` | Decoded bitfields, sorted by bit offset | + +### BitField + +| Field | Type | Description | +|-------|------|-------------| +| `name` | `str` | Field name from the SVD (e.g. `ODR0`) | +| `offset` | `int` | Bit offset within the register | +| `width` | `int` | Field width in bits | +| `value` | `int` | Extracted field value | +| `description` | `str` | Description from the SVD metadata | + +## Error handling + +```python +from openocd import SVDError + +try: + session.svd.list_peripherals() +except SVDError as e: + print(f"No SVD loaded: {e}") + +try: + decoded = await session.svd.read_register("NONEXISTENT", "REG") +except SVDError as e: + print(f"Lookup failed: {e}") +``` + +Common `SVDError` cases: +- No SVD file loaded (call `load()` first) +- Peripheral name not found in the SVD +- Register name not found within the peripheral +- SVD file does not exist or is malformed + +Hardware read failures from `read_register` and `read_peripheral` raise `TargetError`, not `SVDError`. + +## Method reference + +| Method | Return Type | Description | +|--------|-------------|-------------| +| `load(svd_path)` | `None` | Parse an SVD XML file | +| `loaded` (property) | `bool` | Whether an SVD is loaded | +| `list_peripherals()` | `list[str]` | Sorted peripheral names | +| `list_registers(peripheral)` | `list[str]` | Sorted register names for a peripheral | +| `read_register(peripheral, register)` | `DecodedRegister` | Read from hardware and decode | +| `read_peripheral(peripheral)` | `dict[str, DecodedRegister]` | Read all registers in a peripheral | +| `decode(peripheral, register, value)` | `DecodedRegister` | Decode without hardware read | diff --git a/src/content/docs/guides/transport-adapter.mdx b/src/content/docs/guides/transport-adapter.mdx index 11466cd..344c26f 100644 --- a/src/content/docs/guides/transport-adapter.mdx +++ b/src/content/docs/guides/transport-adapter.mdx @@ -1,6 +1,155 @@ --- -title: Transport & Adapter -description: Query and configure debug transport and adapter settings +title: Transport and Adapter +description: Query the debug transport, identify the adapter interface, and control the clock speed. --- -Content coming soon. +import { Tabs, TabItem, Aside } from '@astrojs/starlight/components'; + +The `Transport` subsystem provides access to the debug transport layer -- the physical protocol used to communicate between the debug adapter and the target. OpenOCD supports several transports including JTAG, SWD, and SWIM, used with adapter hardware like CMSIS-DAP, ST-Link, and J-Link probes. + +Access it through the session: + +```python +session.transport +``` + +## Querying the active transport + +`select()` returns the name of the currently active transport as a string. + + + +```python +import asyncio +from openocd import Session + +async def main(): + async with await Session.connect() as session: + transport = await session.transport.select() + print(f"Active transport: {transport}") + # Typical output: "swd" or "jtag" + +asyncio.run(main()) +``` + + +```python +from openocd import Session + +with Session.connect_sync() as session: + transport = session.command("transport select").strip() + print(f"Active transport: {transport}") +``` + + + +Common return values: + +| Value | Protocol | +|-------|----------| +| `"jtag"` | IEEE 1149.1 JTAG | +| `"swd"` | ARM Serial Wire Debug | +| `"swim"` | STM8 Single Wire Interface Module | + +## Listing available transports + +`list()` returns all transports supported by the current adapter configuration. + + + +```python +async with await Session.connect() as session: + available = await session.transport.list() + print(f"Available transports: {', '.join(available)}") +``` + + +```python +with Session.connect_sync() as session: + response = session.command("transport list").strip() + print(f"Available transports: {response}") +``` + + + +A CMSIS-DAP adapter typically supports both `["jtag", "swd"]`, while an ST-Link V2 may report `["hla_swd"]`. + +## Identifying the adapter + +`adapter_info()` returns a description of the connected debug adapter. It tries the `adapter name` command first (OpenOCD 0.12+) and falls back to `adapter info` for older versions. + +```python +async with await Session.connect() as session: + info = await session.transport.adapter_info() + print(f"Adapter: {info}") + # Example: "cmsis-dap" or "st-link" +``` + +## Adapter clock speed + +`adapter_speed(khz=None)` gets or sets the debug adapter clock frequency in kHz. When called without arguments it returns the current speed. When called with a value it sets the speed and returns the new value. + + + +```python +async with await Session.connect() as session: + # Query current speed + current = await session.transport.adapter_speed() + print(f"Current speed: {current} kHz") + + # Set to 4 MHz + new_speed = await session.transport.adapter_speed(4000) + print(f"Speed set to: {new_speed} kHz") +``` + + +```python +with Session.connect_sync() as session: + current = session.command("adapter speed").strip() + print(f"Current speed: {current}") + + session.command("adapter speed 4000") + print("Speed set to 4000 kHz") +``` + + + + + + + +## Common adapter interfaces + +| Adapter | Typical Transports | Notes | +|---------|--------------------|-------| +| CMSIS-DAP | JTAG, SWD | Open standard, wide device support | +| ST-Link | SWD (HLA) | ST Microelectronics probes | +| J-Link | JTAG, SWD | SEGGER probes, high performance | +| FTDI | JTAG, SWD | FT2232-based adapters | +| Raspberry Pi GPIO | JTAG, SWD | Direct bitbang via GPIO pins | + +## Error handling + +Transport and adapter operations raise `OpenOCDError` on failure. + +```python +from openocd import OpenOCDError + +try: + speed = await session.transport.adapter_speed(99999) +except OpenOCDError as e: + print(f"Speed setting failed: {e}") +``` + +## Method reference + +| Method | Return Type | Description | +|--------|-------------|-------------| +| `select()` | `str` | Get the active transport name | +| `list()` | `list[str]` | List available transports | +| `adapter_info()` | `str` | Get adapter description string | +| `adapter_speed(khz=None)` | `int` | Get or set adapter speed in kHz | diff --git a/src/content/docs/reference/connection-layer.mdx b/src/content/docs/reference/connection-layer.mdx index df7fd64..72700d6 100644 --- a/src/content/docs/reference/connection-layer.mdx +++ b/src/content/docs/reference/connection-layer.mdx @@ -1,6 +1,196 @@ --- title: Connection Layer -description: TclRpcConnection and TelnetConnection internals +description: Internals of TclRpcConnection, TelnetConnection, and OpenOCDProcess. --- -Content coming soon. +import { Aside } from '@astrojs/starlight/components'; + +The connection layer provides the transport between `openocd-python` and the OpenOCD process. Most users never interact with these classes directly -- the `Session` facade handles everything. This reference is for contributors, advanced users, and anyone debugging connection issues. + +## Connection ABC + +All connection backends implement the abstract base class in `openocd.connection.base`: + +```python +class Connection(ABC): + async def connect(self, host: str, port: int) -> None: ... + async def send(self, command: str) -> str: ... + async def close(self) -> None: ... + async def enable_notifications(self) -> None: ... + def on_notification(self, callback: Callable[[str], None]) -> None: ... +``` + +| Method | Description | +|--------|-------------| +| `connect(host, port)` | Open a TCP connection to the given host and port | +| `send(command)` | Send a command string and return the response | +| `close()` | Close the connection and release resources | +| `enable_notifications()` | Enable asynchronous event notifications from OpenOCD | +| `on_notification(callback)` | Register a callback for incoming notification messages | + +## TclRpcConnection + +**Module:** `openocd.connection.tcl_rpc` + +The primary connection backend. Speaks OpenOCD's TCL RPC binary protocol on port 6666. + +### Protocol + +The TCL RPC protocol uses a simple framing scheme: + +- **Client sends:** `command_bytes` + `\x1a` +- **Server replies:** `response_bytes` + `\x1a` + +The `\x1a` byte (ASCII SUB / Ctrl-Z) acts as an unambiguous message delimiter. Commands and responses are UTF-8 encoded strings. + +### Constructor + +```python +TclRpcConnection(timeout: float = 10.0) +``` + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `timeout` | `float` | `10.0` | Timeout in seconds for connect and send operations | + +### Dual-socket design + +`TclRpcConnection` maintains **two separate TCP connections** to OpenOCD: + +1. **Command socket** -- handles request/response pairs. An async lock serializes all commands to prevent interleaving. +2. **Notification socket** -- opened by `enable_notifications()`. Sends `tcl_notifications on` and then exclusively reads unsolicited event messages. + +This separation prevents notifications from corrupting the command response stream, which would happen if both shared a single socket with two concurrent readers. + +### Command flow + +1. Acquire the async lock +2. Write `command.encode("utf-8") + b"\x1a"` to the command socket +3. Read from the socket until `\x1a` is found in the response stream +4. Any bytes after the separator are preserved in a remainder buffer for the next call +5. Release the lock +6. Return the response decoded as UTF-8 + +### Constants + +| Constant | Value | Description | +|----------|-------|-------------| +| `SEPARATOR` | `b"\x1a"` | Message delimiter byte | +| `DEFAULT_TIMEOUT` | `10.0` | Default timeout in seconds | +| `MAX_RESPONSE_SIZE` | `10 * 1024 * 1024` | 10 MB guard against runaway reads | + +### Notification loop + +When `enable_notifications()` is called: + +1. A second TCP connection opens to the same host:port +2. `tcl_notifications on\x1a` is sent and the acknowledgement consumed +3. A background `asyncio.Task` enters `_notification_loop()`, which reads messages delimited by `\x1a` +4. Each message is dispatched to all registered callbacks +5. If the notification connection drops, `_notification_failed` is set to `True` and subsequent `send()` calls log a warning + +### Error conditions + +- **Connection closed:** If `send()` reads zero bytes, `ConnectionError` is raised +- **Response too large:** If the response buffer exceeds `MAX_RESPONSE_SIZE` without a separator, `ConnectionError` is raised (likely connected to the wrong port) +- **Timeout:** If the response does not arrive within the configured timeout, `TimeoutError` is raised + +## TelnetConnection + +**Module:** `openocd.connection.telnet` + +A fallback connection backend that speaks to OpenOCD's human-oriented telnet interface on port 4444. + +### Protocol + +- **Client sends:** `command\n` +- **Server replies:** response text ending with `"> "` prompt + +The telnet connection: +- Reads until the `"> "` prompt after each command +- Strips the echoed command from the first line of the response +- Does **not** support notifications (`enable_notifications()` logs a warning and does nothing) + +### Constructor + +```python +TelnetConnection(timeout: float = 10.0) +``` + +### Limitations + +| Feature | TclRpcConnection | TelnetConnection | +|---------|------------------|------------------| +| Default port | 6666 | 4444 | +| Binary framing | `\x1a` delimiter | `"> "` prompt | +| Notifications | Supported (dual-socket) | Not supported | +| Output consistency | Structured | Varies by version | +| Recommended | Yes | Fallback only | + + + +## OpenOCDProcess + +**Module:** `openocd.process` + +Spawns and manages an OpenOCD subprocess. Used internally by `Session.start()`. + +### Constructor + +```python +OpenOCDProcess() +``` + +### Properties + +| Property | Type | Description | +|----------|------|-------------| +| `pid` | `int \| None` | Process ID, or `None` if not started | +| `running` | `bool` | Whether the process is still alive | +| `tcl_port` | `int` | The TCL RPC port (default 6666) | + +### start() + +```python +async def start( + config: str | list[str], + extra_args: list[str] | None = None, + tcl_port: int = 6666, + openocd_bin: str | None = None, +) -> None +``` + +Build the command line and spawn `openocd` as an async subprocess. + +The `config` parameter accepts: +- A string like `"interface/cmsis-dap.cfg -f target/stm32f1x.cfg"` (automatically split and prefixed with `-f` where needed) +- A pre-split list like `["-f", "my board/config.cfg"]` (used as-is, preserving paths with spaces) + +The method always appends `-c "tcl_port "` to ensure the TCL RPC port matches what `Session` will connect to. + +OpenOCD binary detection: +1. Uses `openocd_bin` if provided +2. Otherwise calls `shutil.which("openocd")` +3. Raises `ProcessError` if not found + +### wait_ready() + +```python +async def wait_ready(timeout: float = 10.0) -> None +``` + +Poll the TCL RPC port every 0.25 seconds until it accepts a TCP connection, or raise `TimeoutError`. Also checks whether the process has died and reports stderr output if so. + +### stop() + +```python +async def stop() -> None +``` + +Graceful shutdown sequence: +1. Send `SIGTERM` +2. Wait up to 5 seconds for the process to exit +3. If still running after 5 seconds, send `SIGKILL` +4. Wait for final exit diff --git a/src/content/docs/reference/exceptions.mdx b/src/content/docs/reference/exceptions.mdx index cee1017..9a980d4 100644 --- a/src/content/docs/reference/exceptions.mdx +++ b/src/content/docs/reference/exceptions.mdx @@ -1,6 +1,304 @@ --- title: Exceptions -description: Exception hierarchy rooted at OpenOCDError +description: Complete exception hierarchy rooted at OpenOCDError with guidance on when each is raised. --- -Content coming soon. +import { Aside } from '@astrojs/starlight/components'; + +All exceptions in `openocd-python` inherit from `OpenOCDError`, allowing callers to catch broadly or narrowly as needed. Import them from the top-level package: + +```python +from openocd import ( + OpenOCDError, + ConnectionError, + TimeoutError, + TargetError, + TargetNotHaltedError, + FlashError, + JTAGError, + SVDError, + ProcessError, +) +from openocd.breakpoints import BreakpointError +``` + +## Hierarchy + +``` +OpenOCDError + +-- ConnectionError + +-- TimeoutError + +-- TargetError + | +-- TargetNotHaltedError + +-- FlashError + +-- JTAGError + +-- SVDError + +-- ProcessError + +-- BreakpointError +``` + + + +## OpenOCDError + +```python +class OpenOCDError(Exception) +``` + +Base exception for all `openocd-python` errors. Catch this to handle any library error in a single clause. + +```python +from openocd import OpenOCDError + +try: + async with await Session.connect() as session: + await session.target.halt() + data = await session.memory.read_u32(0x0800_0000) +except OpenOCDError as e: + print(f"Something went wrong: {e}") +``` + +## ConnectionError + +```python +class ConnectionError(OpenOCDError) +``` + +Raised when the library cannot establish or maintain a TCP connection to OpenOCD. + +**When raised:** +- `Session.connect()` or `Session.start()` cannot reach the TCL RPC port +- The OpenOCD process closes the connection mid-session +- A response exceeds `MAX_RESPONSE_SIZE` (10 MB) without a separator -- likely connected to the wrong port +- The notification socket fails to open + +**What to check:** +- Is OpenOCD running and listening on the expected port? +- Is the host/port correct? +- Is a firewall blocking the connection? + +```python +from openocd import ConnectionError + +try: + session = await Session.connect(host="192.168.1.100", port=6666) +except ConnectionError as e: + print(f"Cannot reach OpenOCD: {e}") +``` + +## TimeoutError + +```python +class TimeoutError(OpenOCDError) +``` + +Raised when an operation exceeds its deadline. + +**When raised:** +- Connection attempt times out +- A command does not receive a response within the configured timeout +- `OpenOCDProcess.wait_ready()` exceeds its timeout waiting for the TCL port to accept connections + +```python +from openocd import TimeoutError + +try: + session = await Session.connect(timeout=2.0) +except TimeoutError: + print("OpenOCD did not respond within 2 seconds") +``` + +## TargetError + +```python +class TargetError(OpenOCDError) +``` + +Raised when a target operation fails -- the target is not responding or returned an error. + +**When raised:** +- `target.halt()`, `target.resume()`, `target.step()`, or `target.reset()` fails +- `memory.read_*()` or `memory.write_*()` encounters an error response +- `registers.read()` or `registers.write()` fails (for reasons other than "not halted") + +```python +from openocd import TargetError + +try: + state = await session.target.halt() +except TargetError as e: + print(f"Target operation failed: {e}") +``` + +### TargetNotHaltedError + +```python +class TargetNotHaltedError(TargetError) +``` + +A specialization of `TargetError` for operations that require a halted target. + +**When raised:** +- Register reads/writes when the target is still running +- Any operation that OpenOCD rejects with "target not halted" + +```python +from openocd import TargetNotHaltedError + +try: + regs = await session.registers.read_all() +except TargetNotHaltedError: + print("Halt the target before reading registers") + await session.target.halt() + regs = await session.registers.read_all() +``` + +## FlashError + +```python +class FlashError(OpenOCDError) +``` + +Raised when a flash memory operation fails. + +**When raised:** +- `flash.banks()` or `flash.info()` cannot parse OpenOCD output +- `flash.read()`, `flash.write()`, `flash.erase_sector()`, or `flash.erase_all()` encounters an error +- `flash.write_image()` fails to program or verify the image +- `flash.verify()` encounters an error (note: a mismatch returns `False` rather than raising) +- `flash.protect()` fails +- `flash.erase_sector()` receives an invalid sector range (`first > last`) + +```python +from openocd import FlashError + +try: + await session.flash.write_image(Path("firmware.bin")) +except FlashError as e: + print(f"Flash programming failed: {e}") +``` + +## JTAGError + +```python +class JTAGError(OpenOCDError) +``` + +Raised when a JTAG operation fails. + +**When raised:** +- `jtag.scan_chain()` encounters an error +- `jtag.irscan()`, `jtag.drscan()`, or `jtag.runtest()` fails +- `jtag.pathmove()` receives an empty state list or an invalid state transition +- `jtag.svf()` or `jtag.xsvf()` encounters an error during file execution +- `jtag.new_tap()` fails + +```python +from openocd import JTAGError + +try: + taps = await session.jtag.scan_chain() +except JTAGError as e: + print(f"JTAG chain error: {e}") +``` + +## SVDError + +```python +class SVDError(OpenOCDError) +``` + +Raised when SVD file loading or metadata lookup fails. + +**When raised:** +- `svd.load()` given a path that does not exist +- `svd.load()` encounters a parse error in the XML +- `svd.list_peripherals()` or `svd.list_registers()` called before `load()` +- `svd.read_register()` or `svd.decode()` references a peripheral or register not in the SVD + +```python +from openocd import SVDError + +try: + await session.svd.load(Path("nonexistent.svd")) +except SVDError as e: + print(f"SVD error: {e}") +``` + +## ProcessError + +```python +class ProcessError(OpenOCDError) +``` + +Raised when OpenOCD subprocess management fails. + +**When raised:** +- `Session.start()` cannot find the `openocd` binary +- The OpenOCD process exits unexpectedly during startup (error message includes stderr output) +- An empty config string is passed + +```python +from openocd import ProcessError + +try: + session = await Session.start("target/stm32f1x.cfg") +except ProcessError as e: + print(f"OpenOCD failed to start: {e}") +``` + +## BreakpointError + +```python +class BreakpointError(OpenOCDError) +``` + +Raised when a breakpoint or watchpoint operation fails. Defined in `openocd.breakpoints`. + +**When raised:** +- `breakpoints.add()` or `breakpoints.remove()` encounters an error (e.g. no HW breakpoints available) +- `breakpoints.add_watchpoint()` or `breakpoints.remove_watchpoint()` fails + +```python +from openocd.breakpoints import BreakpointError + +try: + await session.breakpoints.add(0x0800_1234, hw=True) +except BreakpointError as e: + print(f"Breakpoint error: {e}") +``` + +## Catching at different levels + +```python +from openocd import ( + OpenOCDError, + ConnectionError, + TargetError, + TargetNotHaltedError, +) + +try: + async with await Session.connect() as session: + await session.target.halt() + pc = await session.registers.pc() + data = await session.memory.read_u32(pc, 4) + +except TargetNotHaltedError: + # Most specific: target needs to be halted + print("Target is running -- halt it first") + +except TargetError as e: + # Broader: any target-related failure + print(f"Target problem: {e}") + +except ConnectionError as e: + # Connection lost + print(f"Lost connection to OpenOCD: {e}") + +except OpenOCDError as e: + # Catch-all for anything from this library + print(f"OpenOCD error: {e}") +``` diff --git a/src/content/docs/reference/method-index.mdx b/src/content/docs/reference/method-index.mdx index 935c1ae..9ceadbc 100644 --- a/src/content/docs/reference/method-index.mdx +++ b/src/content/docs/reference/method-index.mdx @@ -1,6 +1,142 @@ --- title: Method Index -description: Alphabetical index of all public methods +description: Comprehensive index of every public method grouped by subsystem. --- -Content coming soon. +A quick-reference index of every public method in `openocd-python`, organized by subsystem. Each entry shows the method signature, return type, and a brief description. + +## Session + +| Method | Return | Description | +|--------|--------|-------------| +| `connect(host="localhost", port=6666, timeout=10.0)` | `Session` | Connect to a running OpenOCD instance | +| `start(config, *, tcl_port=6666, openocd_bin=None, timeout=10.0, extra_args=None)` | `Session` | Spawn OpenOCD and connect | +| `connect_sync(host="localhost", port=6666, **kwargs)` | `SyncSession` | Sync version of `connect()` | +| `start_sync(config, **kwargs)` | `SyncSession` | Sync version of `start()` | +| `close()` | `None` | Close connection and stop subprocess | +| `command(cmd)` | `str` | Send a raw command, return response | +| `on_halt(callback)` | `None` | Register halt notification callback | +| `on_reset(callback)` | `None` | Register reset notification callback | + +**Properties:** `target`, `memory`, `registers`, `flash`, `jtag`, `breakpoints`, `rtt`, `svd`, `transport` + +## Target + +| Method | Return | Description | +|--------|--------|-------------| +| `halt()` | `TargetState` | Halt the target | +| `resume(address=None)` | `None` | Resume execution, optionally from an address | +| `step(address=None)` | `TargetState` | Single-step one instruction | +| `reset(mode="halt")` | `None` | Reset the target (`"run"`, `"halt"`, or `"init"`) | +| `wait_halt(timeout_ms=5000)` | `TargetState` | Block until halted or timeout | +| `state()` | `TargetState` | Query current target state | + +## Memory + +| Method | Return | Description | +|--------|--------|-------------| +| `read_u8(addr, count=1)` | `list[int]` | Read 8-bit values | +| `read_u16(addr, count=1)` | `list[int]` | Read 16-bit values | +| `read_u32(addr, count=1)` | `list[int]` | Read 32-bit values | +| `read_u64(addr, count=1)` | `list[int]` | Read 64-bit values | +| `read_bytes(addr, size)` | `bytes` | Read raw bytes | +| `write_u8(addr, values)` | `None` | Write 8-bit values | +| `write_u16(addr, values)` | `None` | Write 16-bit values | +| `write_u32(addr, values)` | `None` | Write 32-bit values | +| `write_bytes(addr, data)` | `None` | Write raw bytes | +| `search(pattern, start, end)` | `list[int]` | Search for byte pattern in memory | +| `dump(addr, size, path)` | `None` | Dump memory region to file | +| `hexdump(addr, size)` | `str` | Formatted hex+ASCII dump | + +## Registers + +| Method | Return | Description | +|--------|--------|-------------| +| `read(name)` | `int` | Read a single register by name | +| `write(name, value)` | `None` | Write a value to a register | +| `read_all()` | `dict[str, Register]` | Read all registers | +| `read_many(names)` | `dict[str, int]` | Read several registers by name | +| `pc()` | `int` | Read the program counter | +| `sp()` | `int` | Read the stack pointer | +| `lr()` | `int` | Read the link register | +| `xpsr()` | `int` | Read the xPSR register | + +## Flash + +| Method | Return | Description | +|--------|--------|-------------| +| `banks()` | `list[FlashBank]` | List all configured flash banks | +| `info(bank=0)` | `FlashBank` | Detailed bank info with sectors | +| `read(bank, offset, size)` | `bytes` | Read raw flash content | +| `read_to_file(bank, path)` | `None` | Dump entire bank to file | +| `write(bank, offset, data)` | `None` | Write raw bytes to flash | +| `write_image(path, erase=True, verify=True)` | `None` | High-level flash programming | +| `erase_sector(bank, first, last)` | `None` | Erase a sector range | +| `erase_all(bank=0)` | `None` | Erase entire bank | +| `protect(bank, first, last, on)` | `None` | Set/clear write protection | +| `verify(bank, path)` | `bool` | Verify flash against a file | + +## BreakpointManager + +| Method | Return | Description | +|--------|--------|-------------| +| `add(address, length=2, hw=False)` | `None` | Set a breakpoint | +| `remove(address)` | `None` | Remove a breakpoint | +| `list()` | `list[Breakpoint]` | List active breakpoints | +| `add_watchpoint(address, length, access="rw")` | `None` | Set a data watchpoint | +| `remove_watchpoint(address)` | `None` | Remove a watchpoint | +| `list_watchpoints()` | `list[Watchpoint]` | List active watchpoints | + +## JTAGController + +| Method | Return | Description | +|--------|--------|-------------| +| `scan_chain()` | `list[TAPInfo]` | Enumerate TAPs on the chain | +| `new_tap(chip, tap, ir_len, expected_id=None)` | `None` | Declare a new TAP | +| `irscan(tap, instruction)` | `int` | Shift instruction into IR | +| `drscan(tap, bits, value)` | `int` | Shift data through DR | +| `runtest(cycles)` | `None` | Clock TCK in Run-Test/Idle | +| `pathmove(states)` | `None` | Walk TAP through state sequence | +| `svf(path, tap=None, *, quiet=False, progress=True)` | `None` | Execute SVF file | +| `xsvf(tap, path)` | `None` | Execute XSVF file | + +## SVDManager + +| Method / Property | Return | Description | +|-------------------|--------|-------------| +| `load(svd_path)` | `None` | Parse an SVD XML file | +| `loaded` (property) | `bool` | Whether an SVD is loaded | +| `list_peripherals()` | `list[str]` | Sorted peripheral names | +| `list_registers(peripheral)` | `list[str]` | Sorted register names | +| `read_register(peripheral, register)` | `DecodedRegister` | Read from hardware and decode | +| `read_peripheral(peripheral)` | `dict[str, DecodedRegister]` | Read all registers in a peripheral | +| `decode(peripheral, register, value)` | `DecodedRegister` | Decode without hardware read | + +## RTTManager + +| Method | Return | Description | +|--------|--------|-------------| +| `setup(address, size, id_string="SEGGER RTT")` | `None` | Configure control block search | +| `start()` | `None` | Find control block, activate channels | +| `stop()` | `None` | Deactivate RTT | +| `channels()` | `list[RTTChannel]` | List discovered channels | +| `read(channel)` | `str` | Read from an up-channel | +| `write(channel, data)` | `None` | Write to a down-channel | + +## Transport + +| Method | Return | Description | +|--------|--------|-------------| +| `select()` | `str` | Get the active transport name | +| `list()` | `list[str]` | List available transports | +| `adapter_info()` | `str` | Get adapter description | +| `adapter_speed(khz=None)` | `int` | Get or set adapter speed in kHz | + +## EventManager + +| Method / Property | Return | Description | +|-------------------|--------|-------------| +| `enable()` | `None` | Enable TCL notifications | +| `on(event_type, callback)` | `None` | Register event callback | +| `off(event_type, callback)` | `None` | Unregister event callback | +| `enabled` (property) | `bool` | Whether notifications are active | diff --git a/src/content/docs/reference/session-api.mdx b/src/content/docs/reference/session-api.mdx index 1ce6c60..2c91171 100644 --- a/src/content/docs/reference/session-api.mdx +++ b/src/content/docs/reference/session-api.mdx @@ -1,6 +1,235 @@ --- title: Session API -description: Complete Session and SyncSession class reference +description: Complete reference for Session and SyncSession, the main entry points to openocd-python. --- -Content coming soon. +import { Tabs, TabItem, Aside } from '@astrojs/starlight/components'; + +`Session` is the primary entry point for all interaction with OpenOCD. It manages the connection lifecycle and provides lazy access to every subsystem (target, memory, registers, flash, JTAG, breakpoints, RTT, SVD, transport). + +`SyncSession` is the synchronous counterpart for use outside async contexts. + +## Session (async) + +### Constructor + +```python +Session.__init__(connection: TclRpcConnection, process: OpenOCDProcess | None = None) +``` + +You rarely call this directly. Use the factory methods `connect()` or `start()` instead. + +| Parameter | Type | Description | +|-----------|------|-------------| +| `connection` | `TclRpcConnection` | An established TCL RPC connection | +| `process` | `OpenOCDProcess \| None` | Optional managed subprocess (closed on `close()`) | + +### Factory methods + +#### Session.connect() + +```python +@classmethod +async def connect( + host: str = "localhost", + port: int = 6666, + timeout: float = 10.0, +) -> Session +``` + +Connect to an already-running OpenOCD instance. + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `host` | `str` | `"localhost"` | OpenOCD host address | +| `port` | `int` | `6666` | TCL RPC port | +| `timeout` | `float` | `10.0` | Connection timeout in seconds | + +**Returns:** `Session` + +**Raises:** `ConnectionError`, `TimeoutError` + +```python +async with await Session.connect() as session: + state = await session.target.state() +``` + +#### Session.start() + +```python +@classmethod +async def start( + config: str | Path, + *, + tcl_port: int = 6666, + openocd_bin: str | None = None, + timeout: float = 10.0, + extra_args: list[str] | None = None, +) -> Session +``` + +Spawn an OpenOCD process with the given configuration, wait for it to become ready, then connect. + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `config` | `str \| Path` | *(required)* | Config file path or `-f`/`-c` flags string | +| `tcl_port` | `int` | `6666` | TCL RPC port for the spawned process | +| `openocd_bin` | `str \| None` | `None` | Custom OpenOCD binary path (auto-detected if `None`) | +| `timeout` | `float` | `10.0` | Seconds to wait for OpenOCD readiness | +| `extra_args` | `list[str] \| None` | `None` | Additional CLI arguments | + +**Returns:** `Session` + +**Raises:** `ProcessError`, `ConnectionError`, `TimeoutError` + +```python +async with await Session.start("interface/cmsis-dap.cfg -f target/stm32f1x.cfg") as session: + await session.target.halt() +``` + +The config parameter accepts several formats: +- File path: `"board/stm32f1discovery.cfg"` +- Flag string: `"-f interface/cmsis-dap.cfg -f target/stm32f1x.cfg"` +- List form: `["-f", "interface/cmsis-dap.cfg", "-f", "target/stm32f1x.cfg"]` + +#### Session.connect_sync() + +```python +@classmethod +def connect_sync( + host: str = "localhost", + port: int = 6666, + **kwargs, +) -> SyncSession +``` + +Synchronous version of `connect()`. Returns a `SyncSession`. + +**Raises:** `RuntimeError` if called from inside a running async event loop. + +```python +with Session.connect_sync() as session: + state = session.target.state() +``` + +#### Session.start_sync() + +```python +@classmethod +def start_sync(config: str | Path, **kwargs) -> SyncSession +``` + +Synchronous version of `start()`. Returns a `SyncSession`. + +### Context manager + +`Session` implements `__aenter__` and `__aexit__`. On exit, it calls `close()`. + +```python +async with await Session.connect() as session: + # session is open here + pass +# session.close() called automatically +``` + +### Methods + +#### close() + +```python +async def close() -> None +``` + +Close the TCP connection and stop the OpenOCD subprocess if one was spawned via `start()`. + +#### command() + +```python +async def command(cmd: str) -> str +``` + +Send a raw OpenOCD command string and return the response. This is the escape hatch for commands not covered by the typed subsystem APIs. + +| Parameter | Type | Description | +|-----------|------|-------------| +| `cmd` | `str` | Any valid OpenOCD command | + +**Returns:** The raw response string from OpenOCD. + +```python +resp = await session.command("version") +print(resp) +``` + +#### on_halt() + +```python +def on_halt(callback: Callable[[str], None]) -> None +``` + +Register a callback that fires when a notification containing "halted" is received. + +#### on_reset() + +```python +def on_reset(callback: Callable[[str], None]) -> None +``` + +Register a callback that fires when a notification containing "reset" is received. + +### Properties (lazy subsystems) + +Each property instantiates its subsystem on first access. All subsystems share the same underlying connection. + +| Property | Async Type | Description | +|----------|-----------|-------------| +| `target` | `Target` | Halt, resume, step, reset, state queries | +| `memory` | `Memory` | Read/write target memory at various widths | +| `registers` | `Registers` | Read/write CPU registers | +| `flash` | `Flash` | Flash bank enumeration, programming, erase, verify | +| `jtag` | `JTAGController` | JTAG scan chain, IR/DR scan, boundary scan | +| `breakpoints` | `BreakpointManager` | Set/remove breakpoints and watchpoints | +| `rtt` | `RTTManager` | SEGGER RTT channel read/write | +| `svd` | `SVDManager` | SVD file loading, register decoding | +| `transport` | `Transport` | Transport and adapter queries | + +--- + +## SyncSession + +Wraps an async `Session` for synchronous use. Every method runs through `loop.run_until_complete()` on an internally managed event loop. + +### Context manager + +```python +with Session.connect_sync() as session: + print(session.target.state()) +``` + +Calls `close()` on exit. + +### Methods + +#### command() + +```python +def command(cmd: str) -> str +``` + +Synchronous version of `Session.command()`. + +### Properties (lazy sync subsystems) + +| Property | Sync Type | Description | +|----------|----------|-------------| +| `target` | `SyncTarget` | Halt, resume, step, reset, state | +| `memory` | `SyncMemory` | Read/write memory | +| `registers` | `SyncRegisters` | Read/write registers | +| `flash` | `SyncFlash` | Flash operations | +| `jtag` | `SyncJTAGController` | JTAG operations | +| `breakpoints` | `SyncBreakpointManager` | Breakpoints and watchpoints | +| `svd` | `SyncSVDManager` | SVD loading and decoding | + + diff --git a/src/content/docs/reference/types.mdx b/src/content/docs/reference/types.mdx index 38278d7..5ec0efa 100644 --- a/src/content/docs/reference/types.mdx +++ b/src/content/docs/reference/types.mdx @@ -1,6 +1,304 @@ --- title: Types -description: Frozen dataclass types returned by the API +description: Complete reference for all frozen dataclass types and enums returned by the API. --- -Content coming soon. +import { Aside } from '@astrojs/starlight/components'; + +All types in `openocd-python` are **frozen dataclasses** -- they are immutable after construction. Nothing mutable leaves the API surface. Import them from the top-level package: + +```python +from openocd import ( + TargetState, Register, FlashSector, FlashBank, + TAPInfo, JTAGState, MemoryRegion, BitField, + DecodedRegister, Breakpoint, Watchpoint, RTTChannel, +) +``` + +## Target types + +### TargetState + +Snapshot of target execution state, returned by `target.halt()`, `target.step()`, `target.state()`, and `target.wait_halt()`. + +```python +@dataclass(frozen=True) +class TargetState: + name: str + state: Literal["running", "halted", "reset", "debug-running", "unknown"] + current_pc: int | None = None +``` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `name` | `str` | *(required)* | Target name (e.g. `stm32f1x.cpu`) | +| `state` | `Literal[...]` | *(required)* | One of `"running"`, `"halted"`, `"reset"`, `"debug-running"`, `"unknown"` | +| `current_pc` | `int \| None` | `None` | Program counter (populated only when halted) | + +## Register types + +### Register + +A single CPU register, returned by `registers.read_all()`. + +```python +@dataclass(frozen=True) +class Register: + name: str + number: int + value: int + size: int + dirty: bool = False +``` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `name` | `str` | *(required)* | Register name (e.g. `r0`, `pc`, `xPSR`) | +| `number` | `int` | *(required)* | Register number in OpenOCD's numbering | +| `value` | `int` | *(required)* | Current register value | +| `size` | `int` | *(required)* | Register width in bits (typically 32) | +| `dirty` | `bool` | `False` | Whether the register has been modified but not committed | + +## Flash types + +### FlashSector + +One sector inside a flash bank. + +```python +@dataclass(frozen=True) +class FlashSector: + index: int + offset: int + size: int + protected: bool +``` + +| Field | Type | Description | +|-------|------|-------------| +| `index` | `int` | Sector number within the bank | +| `offset` | `int` | Byte offset from the bank base address | +| `size` | `int` | Sector size in bytes | +| `protected` | `bool` | `True` if write protection is enabled | + +### FlashBank + +A flash bank reported by OpenOCD, returned by `flash.banks()` and `flash.info()`. + +```python +@dataclass(frozen=True) +class FlashBank: + index: int + name: str + base: int + size: int + bus_width: int + chip_width: int + target: str + sectors: list[FlashSector] = field(default_factory=list) +``` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `index` | `int` | *(required)* | Bank number | +| `name` | `str` | *(required)* | Bank name (e.g. `stm32f1x.flash`) | +| `base` | `int` | *(required)* | Base address in memory | +| `size` | `int` | *(required)* | Total bank size in bytes | +| `bus_width` | `int` | *(required)* | Bus width | +| `chip_width` | `int` | *(required)* | Chip width | +| `target` | `str` | *(required)* | Associated target or driver name | +| `sectors` | `list[FlashSector]` | `[]` | Sector list (populated only by `flash.info()`) | + +## JTAG types + +### TAPInfo + +One TAP discovered on the JTAG chain, returned by `jtag.scan_chain()`. + +```python +@dataclass(frozen=True) +class TAPInfo: + name: str + chip: str + tap_name: str + idcode: int + ir_length: int + enabled: bool +``` + +| Field | Type | Description | +|-------|------|-------------| +| `name` | `str` | Full dotted name (e.g. `stm32f1x.cpu`) | +| `chip` | `str` | Chip portion of the name (e.g. `stm32f1x`) | +| `tap_name` | `str` | TAP portion of the name (e.g. `cpu`) | +| `idcode` | `int` | Detected IDCODE value | +| `ir_length` | `int` | Instruction register length in bits | +| `enabled` | `bool` | Whether the TAP is enabled in the scan chain | + +### JTAGState + +Enum of all 16 IEEE 1149.1 TAP controller states. Used with `jtag.pathmove()`. + +```python +class JTAGState(str, Enum): + RESET = "RESET" + IDLE = "IDLE" + DRSELECT = "DRSELECT" + DRCAPTURE = "DRCAPTURE" + DRSHIFT = "DRSHIFT" + DREXIT1 = "DREXIT1" + DRPAUSE = "DRPAUSE" + DREXIT2 = "DREXIT2" + DRUPDATE = "DRUPDATE" + IRSELECT = "IRSELECT" + IRCAPTURE = "IRCAPTURE" + IRSHIFT = "IRSHIFT" + IREXIT1 = "IREXIT1" + IRPAUSE = "IRPAUSE" + IREXIT2 = "IREXIT2" + IRUPDATE = "IRUPDATE" +``` + +`JTAGState` is a `str` enum, so each member's `.value` is its name as a string. + +## Memory types + +### MemoryRegion + +A chunk of memory read from the target. + +```python +@dataclass(frozen=True) +class MemoryRegion: + address: int + size: int + data: bytes +``` + +| Field | Type | Description | +|-------|------|-------------| +| `address` | `int` | Start address of the region | +| `size` | `int` | Size of the region in bytes | +| `data` | `bytes` | Raw memory contents | + +## SVD types + +### BitField + +One decoded bitfield inside a register, part of `DecodedRegister.fields`. + +```python +@dataclass(frozen=True) +class BitField: + name: str + offset: int + width: int + value: int + description: str +``` + +| Field | Type | Description | +|-------|------|-------------| +| `name` | `str` | Field name from the SVD (e.g. `ODR0`) | +| `offset` | `int` | Bit offset within the register | +| `width` | `int` | Field width in bits | +| `value` | `int` | Extracted field value | +| `description` | `str` | Description from SVD metadata | + +### DecodedRegister + +A register value decoded into named bitfields via SVD, returned by `svd.read_register()` and `svd.decode()`. + +```python +@dataclass(frozen=True) +class DecodedRegister: + peripheral: str + register: str + address: int + raw_value: int + fields: list[BitField] = field(default_factory=list) +``` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `peripheral` | `str` | *(required)* | Peripheral name (e.g. `GPIOA`) | +| `register` | `str` | *(required)* | Register name (e.g. `ODR`) | +| `address` | `int` | *(required)* | Memory-mapped address | +| `raw_value` | `int` | *(required)* | Raw 32-bit value | +| `fields` | `list[BitField]` | `[]` | Decoded bitfields, sorted by bit offset | + +`DecodedRegister` implements `__str__` for formatted output: + +``` +GPIOA.ODR @ 0x4001080C = 0x00000001 + [ 0:0] ODR0 = 0x1 Port output data bit 0 + [ 1:1] ODR1 = 0x0 Port output data bit 1 +``` + +Multi-bit fields show the range (e.g. `[7:4]`), single-bit fields show just the offset (e.g. `[0]` displayed as `[ 0:0]`). + +## Breakpoint types + +### Breakpoint + +An active breakpoint, returned by `breakpoints.list()`. + +```python +@dataclass(frozen=True) +class Breakpoint: + number: int + type: Literal["hw", "sw"] + address: int + length: int + enabled: bool +``` + +| Field | Type | Description | +|-------|------|-------------| +| `number` | `int` | Breakpoint index | +| `type` | `Literal["hw", "sw"]` | Hardware or software breakpoint | +| `address` | `int` | Instruction address | +| `length` | `int` | Instruction length in bytes (2 = Thumb, 4 = ARM) | +| `enabled` | `bool` | Whether the breakpoint is active | + +### Watchpoint + +An active data watchpoint, returned by `breakpoints.list_watchpoints()`. + +```python +@dataclass(frozen=True) +class Watchpoint: + number: int + address: int + length: int + access: Literal["r", "w", "rw"] +``` + +| Field | Type | Description | +|-------|------|-------------| +| `number` | `int` | Watchpoint index | +| `address` | `int` | Watched memory address | +| `length` | `int` | Size of watched region in bytes | +| `access` | `Literal["r", "w", "rw"]` | Access type: read, write, or both | + +## RTT types + +### RTTChannel + +An RTT channel descriptor, returned by `rtt.channels()`. + +```python +@dataclass(frozen=True) +class RTTChannel: + index: int + name: str + size: int + direction: Literal["up", "down"] +``` + +| Field | Type | Description | +|-------|------|-------------| +| `index` | `int` | Channel number | +| `name` | `str` | Channel name (e.g. `Terminal`) | +| `size` | `int` | Buffer size in bytes | +| `direction` | `Literal["up", "down"]` | `up` = target-to-host, `down` = host-to-target |