openocd-python/tests/test_error_paths.py

599 lines
23 KiB
Python

"""Error-path tests for openocd-python.
Exercises every error condition and exception branch across the
connection, target, memory, register, flash, breakpoint, session,
and process subsystems.
Each test configures a mock server to return error responses (or
misbehave at the protocol level) and asserts that the correct
exception type is raised with a meaningful message.
"""
from __future__ import annotations
import asyncio
import pytest
from openocd.breakpoints import BreakpointError, BreakpointManager
from openocd.connection.tcl_rpc import TclRpcConnection
from openocd.errors import (
ConnectionError,
FlashError,
ProcessError,
TargetError,
TargetNotHaltedError,
TimeoutError,
)
from openocd.flash import Flash
from openocd.memory import Memory
from openocd.process import OpenOCDProcess
from openocd.registers import Registers
from openocd.session import Session, _get_or_create_loop
from openocd.target import Target
from tests.mock_server import MockOpenOCDServer
# ======================================================================
# Helpers: error-returning mock servers
# ======================================================================
@pytest.fixture
async def error_server():
"""A MockOpenOCDServer pre-wired to return error strings.
The default response table is left intact so that "targets" and
similar plumbing commands still work. Individual tests prepend
error-producing routes via ``server.add_response()``.
"""
server = MockOpenOCDServer()
await server.start()
yield server
await server.stop()
@pytest.fixture
async def error_conn(error_server):
"""A TclRpcConnection wired to the error mock server."""
host, port = error_server.address
conn = TclRpcConnection(timeout=5.0)
await conn.connect(host, port)
yield conn
await conn.close()
# ======================================================================
# 1. Connection error paths
# ======================================================================
class TestConnectionErrors:
"""Errors at the transport / framing layer."""
async def test_send_on_closed_connection(self):
"""send() after close() raises ConnectionError."""
server = MockOpenOCDServer()
await server.start()
host, port = server.address
conn = TclRpcConnection(timeout=5.0)
await conn.connect(host, port)
await conn.close()
with pytest.raises(ConnectionError, match="Not connected"):
await conn.send("targets")
await server.stop()
async def test_send_before_connect(self):
"""send() without a prior connect() raises ConnectionError."""
conn = TclRpcConnection()
with pytest.raises(ConnectionError, match="Not connected"):
await conn.send("targets")
async def test_timeout_when_server_never_responds(self):
"""A server that reads but never sends \\x1a triggers TimeoutError."""
hang_event = asyncio.Event()
async def _black_hole(reader, writer):
await reader.read(4096)
# never send a response -- wait until test signals us to stop
try:
await hang_event.wait()
except asyncio.CancelledError:
pass
finally:
writer.close()
srv = await asyncio.start_server(_black_hole, "127.0.0.1", 0)
await srv.start_serving()
host, port = srv.sockets[0].getsockname()[:2]
conn = TclRpcConnection(timeout=0.3)
await conn.connect(host, port)
with pytest.raises(TimeoutError):
await conn.send("targets")
await conn.close()
hang_event.set()
srv.close()
await srv.wait_closed()
async def test_server_closes_connection_mid_stream(self):
"""Server closing the socket without a separator raises ConnectionError."""
async def _close_immediately(reader, writer):
await reader.read(4096)
# send partial data with no separator then close
writer.write(b"partial response")
await writer.drain()
writer.close()
await writer.wait_closed()
srv = await asyncio.start_server(_close_immediately, "127.0.0.1", 0)
await srv.start_serving()
host, port = srv.sockets[0].getsockname()[:2]
conn = TclRpcConnection(timeout=2.0)
await conn.connect(host, port)
with pytest.raises(ConnectionError, match="closed the connection"):
await conn.send("targets")
await conn.close()
srv.close()
await srv.wait_closed()
async def test_bounded_read_rejects_oversized_response(self):
"""Response exceeding MAX_RESPONSE_SIZE without separator raises ConnectionError."""
async def _flood(reader, writer):
await reader.read(4096)
# send a lot of data with no separator
chunk = b"A" * 65536
try:
while True:
writer.write(chunk)
await writer.drain()
except (BrokenPipeError, ConnectionResetError):
pass
srv = await asyncio.start_server(_flood, "127.0.0.1", 0)
await srv.start_serving()
host, port = srv.sockets[0].getsockname()[:2]
conn = TclRpcConnection(timeout=10.0)
await conn.connect(host, port)
with pytest.raises(ConnectionError, match="exceeded"):
await conn.send("targets")
await conn.close()
srv.close()
await srv.wait_closed()
async def test_connect_refused(self):
"""Connecting to a port with nothing listening raises ConnectionError."""
conn = TclRpcConnection(timeout=1.0)
with pytest.raises(ConnectionError):
await conn.connect("127.0.0.1", 1)
# ======================================================================
# 2. Target error paths
# ======================================================================
class TestTargetErrors:
"""Error conditions from the Target subsystem."""
async def test_halt_error_response(self, error_server, error_conn):
"""halt() with an error response raises TargetError."""
error_server.add_response(r"^halt$", "error: target not responding")
target = Target(error_conn)
with pytest.raises(TargetError, match="halt failed"):
await target.halt()
async def test_halt_already_halted_is_not_error(self, error_server, error_conn):
"""halt() when response says 'already halted' should NOT raise."""
error_server.add_response(r"^halt$", "error: target already halted")
target = Target(error_conn)
# "already halted" is a benign condition -- halt() checks for it
state = await target.halt()
assert state.state in ("halted", "unknown")
async def test_resume_error_response(self, error_server, error_conn):
"""resume() with an error response raises TargetError."""
error_server.add_response(r"^resume", "error: cannot resume target")
target = Target(error_conn)
with pytest.raises(TargetError, match="resume failed"):
await target.resume()
async def test_step_error_response(self, error_server, error_conn):
"""step() with an error response raises TargetError."""
error_server.add_response(r"^step", "error: step failed on target")
target = Target(error_conn)
with pytest.raises(TargetError, match="step failed"):
await target.step()
async def test_wait_halt_timeout(self, error_server, error_conn):
"""wait_halt receiving 'timed out' raises TimeoutError."""
error_server.add_response(r"^wait_halt", "timed out while waiting for target")
target = Target(error_conn)
with pytest.raises(TimeoutError, match="did not halt"):
await target.wait_halt(timeout_ms=100)
async def test_wait_halt_time_out_variant(self, error_server, error_conn):
"""wait_halt receiving 'time out' (two words) also raises TimeoutError."""
error_server.add_response(r"^wait_halt", "time out waiting for halt")
target = Target(error_conn)
with pytest.raises(TimeoutError, match="did not halt"):
await target.wait_halt(timeout_ms=100)
async def test_wait_halt_generic_error(self, error_server, error_conn):
"""wait_halt with a generic error (not timeout) raises TargetError."""
error_server.add_response(r"^wait_halt", "error: target communication failure")
target = Target(error_conn)
with pytest.raises(TargetError, match="wait_halt failed"):
await target.wait_halt(timeout_ms=100)
async def test_state_unexpected_format(self, error_server, error_conn):
"""targets returning garbage still produces a TargetState with 'unknown'."""
error_server.add_response(r"^targets$", "this is not valid target output")
# Also need to suppress the reg pc call that _parse_state makes
error_server.add_response(r"^reg\s+pc$", "no such register")
target = Target(error_conn)
state = await target.state()
assert state.name == "unknown"
assert state.state == "unknown"
assert state.current_pc is None
async def test_state_unrecognized_state_string(self, error_server, error_conn):
"""A target row with a bizarre state string normalizes to 'unknown'."""
weird_table = (
" TargetName Type Endian TapName State\n"
"-- ------------------ ---------- ------ ------------------ ------------\n"
" 0* stm32f1x.cpu cortex_m little stm32f1x.cpu exploding"
)
error_server.add_response(r"^targets$", weird_table)
target = Target(error_conn)
state = await target.state()
assert state.name == "stm32f1x.cpu"
assert state.state == "unknown"
assert state.current_pc is None
async def test_reset_error_response(self, error_server, error_conn):
"""reset() with an error response raises TargetError."""
error_server.add_response(r"^reset\s+", "error: reset failed, adapter not found")
target = Target(error_conn)
with pytest.raises(TargetError, match="reset failed"):
await target.reset("halt")
# ======================================================================
# 3. Memory error paths
# ======================================================================
class TestMemoryErrors:
"""Error conditions from the Memory subsystem."""
async def test_read_u32_target_not_halted(self, error_server, error_conn):
"""read_u32 with 'target not halted' in response raises TargetError."""
error_server.add_response(r"^read_memory\s+", "error: target not halted")
mem = Memory(error_conn)
with pytest.raises(TargetError, match="read_memory failed"):
await mem.read_u32(0x20000000, 1)
async def test_write_u32_error_response(self, error_server, error_conn):
"""write_u32 with an error response raises TargetError."""
error_server.add_response(r"^write_memory\s+", "error: target not halted")
mem = Memory(error_conn)
with pytest.raises(TargetError, match="write_memory failed"):
await mem.write_u32(0x20000000, 0xDEADBEEF)
async def test_read_u32_non_hex_tokens(self, error_server, error_conn):
"""read_memory returning non-hex garbage raises TargetError."""
error_server.add_response(r"^read_memory\s+", "not_a_hex_value xyz !!!")
mem = Memory(error_conn)
with pytest.raises(TargetError, match="Cannot parse read_memory"):
await mem.read_u32(0x20000000, 1)
async def test_read_u8_error_response(self, error_server, error_conn):
"""read_u8 with an error response raises TargetError."""
error_server.add_response(r"^read_memory\s+", "error: bus fault during memory read")
mem = Memory(error_conn)
with pytest.raises(TargetError, match="read_memory failed"):
await mem.read_u8(0xFFFFFFFF, 4)
async def test_write_bytes_error_response(self, error_server, error_conn):
"""write_bytes with an error response raises TargetError."""
error_server.add_response(r"^write_memory\s+", "error: write access violation")
mem = Memory(error_conn)
with pytest.raises(TargetError, match="write_memory failed"):
await mem.write_bytes(0x00000000, b"\x01\x02\x03")
async def test_read_u16_error_response(self, error_server, error_conn):
"""read_u16 with an error response raises TargetError."""
error_server.add_response(r"^read_memory\s+", "error: alignment fault")
mem = Memory(error_conn)
with pytest.raises(TargetError, match="read_memory failed"):
await mem.read_u16(0x20000001, 1)
# ======================================================================
# 4. Register error paths
# ======================================================================
class TestRegisterErrors:
"""Error conditions from the Registers subsystem."""
async def test_read_not_halted(self, error_server, error_conn):
"""read('pc') when target is not halted raises TargetNotHaltedError."""
error_server.add_response(r"^reg\s+pc$", "target not halted")
regs = Registers(error_conn)
with pytest.raises(TargetNotHaltedError, match="halted"):
await regs.read("pc")
async def test_read_nonexistent_register(self, error_server, error_conn):
"""read('nonexistent') with unparseable response raises TargetError."""
error_server.add_response(r"^reg\s+nonexistent$", 'invalid command name "nonexistent"')
regs = Registers(error_conn)
with pytest.raises(TargetError, match="Cannot parse register"):
await regs.read("nonexistent")
async def test_write_not_halted(self, error_server, error_conn):
"""write('pc', val) when target is not halted raises TargetNotHaltedError."""
error_server.add_response(r"^reg\s+pc\s+0x", "target not halted")
regs = Registers(error_conn)
with pytest.raises(TargetNotHaltedError, match="halted"):
await regs.write("pc", 0x1234)
async def test_write_generic_error(self, error_server, error_conn):
"""write() with a non-halted-related error raises TargetError."""
error_server.add_response(r"^reg\s+r0\s+0x", "error: register write failed")
regs = Registers(error_conn)
with pytest.raises(TargetError, match="reg write failed"):
await regs.write("r0", 0xDEAD)
async def test_read_all_not_halted(self, error_server, error_conn):
"""read_all() when target is not halted raises TargetNotHaltedError."""
error_server.add_response(r"^reg$", "target not halted")
regs = Registers(error_conn)
with pytest.raises(TargetNotHaltedError, match="halted"):
await regs.read_all()
async def test_read_many_partial_failure(self, error_server, error_conn):
"""read_many() should propagate the first register read failure."""
# pc succeeds, but sp returns not-halted
error_server.add_response(r"^reg\s+sp$", "target not halted")
regs = Registers(error_conn)
with pytest.raises(TargetNotHaltedError):
await regs.read_many(["pc", "sp"])
# ======================================================================
# 5. Flash error paths
# ======================================================================
class TestFlashErrors:
"""Error conditions from the Flash subsystem."""
async def test_banks_error(self, error_server, error_conn):
"""flash.banks() with an error response raises FlashError."""
error_server.add_response(r"^flash banks$", "error: no flash banks configured")
flash = Flash(error_conn)
with pytest.raises(FlashError, match="flash banks"):
await flash.banks()
async def test_info_error(self, error_server, error_conn):
"""flash.info() with an error response raises FlashError."""
error_server.add_response(r"^flash info\s+", "error: invalid bank number")
flash = Flash(error_conn)
with pytest.raises(FlashError, match="flash info"):
await flash.info(bank=99)
async def test_erase_sector_invalid_range(self, error_server, error_conn):
"""erase_sector with first > last raises FlashError locally."""
flash = Flash(error_conn)
with pytest.raises(FlashError, match="Invalid sector range"):
await flash.erase_sector(bank=0, first=10, last=5)
async def test_erase_sector_error_response(self, error_server, error_conn):
"""erase_sector with error from server raises FlashError."""
error_server.add_response(r"^flash erase_sector\s+", "error: erase failed")
flash = Flash(error_conn)
with pytest.raises(FlashError, match="flash erase_sector"):
await flash.erase_sector(bank=0, first=0, last=3)
async def test_write_image_error(self, error_server, error_conn):
"""write_image with error from server raises FlashError."""
error_server.add_response(r"^flash write_image\s+", "error: flash write failed")
flash = Flash(error_conn)
with pytest.raises(FlashError, match="flash write_image"):
from pathlib import Path
await flash.write_image(Path("/tmp/fake_firmware.bin"), verify=False)
async def test_protect_error(self, error_server, error_conn):
"""flash.protect() with error response raises FlashError."""
error_server.add_response(r"^flash protect\s+", "error: protection change not supported")
flash = Flash(error_conn)
with pytest.raises(FlashError, match="flash protect"):
await flash.protect(bank=0, first=0, last=3, on=True)
# ======================================================================
# 6. Breakpoint error paths
# ======================================================================
class TestBreakpointErrors:
"""Error conditions from the BreakpointManager subsystem."""
async def test_add_breakpoint_error(self, error_server, error_conn):
"""add() with error response raises BreakpointError."""
error_server.add_response(
r"^bp\s+0x", "error: can not add breakpoint, resource not available"
)
bp = BreakpointManager(error_conn)
with pytest.raises(BreakpointError, match="bp 0x"):
await bp.add(0x08001234)
async def test_remove_breakpoint_error(self, error_server, error_conn):
"""remove() with error response raises BreakpointError."""
error_server.add_response(r"^rbp\s+", "error: no breakpoint at address")
bp = BreakpointManager(error_conn)
with pytest.raises(BreakpointError, match="rbp 0x"):
await bp.remove(0x08001234)
async def test_add_watchpoint_error(self, error_server, error_conn):
"""add_watchpoint() with error response raises BreakpointError."""
error_server.add_response(r"^wp\s+0x", "error: no free watchpoint comparator")
bp = BreakpointManager(error_conn)
with pytest.raises(BreakpointError, match="wp 0x"):
await bp.add_watchpoint(0x20000000, 4)
async def test_remove_watchpoint_error(self, error_server, error_conn):
"""remove_watchpoint() with error response raises BreakpointError."""
error_server.add_response(r"^rwp\s+", "error: no watchpoint at address")
bp = BreakpointManager(error_conn)
with pytest.raises(BreakpointError, match="rwp 0x"):
await bp.remove_watchpoint(0x20000000)
# ======================================================================
# 7. Session error paths
# ======================================================================
class TestSessionErrors:
"""Error conditions from the Session layer."""
async def test_get_or_create_loop_from_async_context(self):
"""_get_or_create_loop() inside a running loop raises RuntimeError."""
with pytest.raises(RuntimeError, match="Cannot use sync API"):
_get_or_create_loop()
async def test_command_on_closed_session(self, mock_ocd):
"""session.command() after close() raises ConnectionError."""
host, port, _server = mock_ocd
sess = await Session.connect(host, port, timeout=5.0)
await sess.close()
with pytest.raises(ConnectionError, match="Not connected"):
await sess.command("targets")
async def test_connect_to_nonexistent_host(self):
"""Session.connect() to a bogus address raises ConnectionError."""
with pytest.raises(ConnectionError):
await Session.connect("127.0.0.1", 1, timeout=1.0)
async def test_double_close_is_safe(self, mock_ocd):
"""Calling close() twice on a session should not raise."""
host, port, _server = mock_ocd
sess = await Session.connect(host, port, timeout=5.0)
await sess.close()
await sess.close() # should be a no-op
# ======================================================================
# 8. Process error paths
# ======================================================================
class TestProcessErrors:
"""Error conditions from the OpenOCDProcess manager."""
async def test_start_empty_config(self):
"""start() with an empty config string raises ProcessError."""
proc = OpenOCDProcess()
with pytest.raises(ProcessError, match="[Ee]mpty config"):
# Use /bin/true as a stand-in binary so we reach config validation
await proc.start("", openocd_bin="/bin/true")
async def test_start_dangling_flag(self):
"""start() with a trailing -f and no argument raises ProcessError."""
proc = OpenOCDProcess()
with pytest.raises(ProcessError, match="requires an argument"):
await proc.start("-f", openocd_bin="/bin/true")
async def test_start_dangling_c_flag(self):
"""start() with a trailing -c and no argument raises ProcessError."""
proc = OpenOCDProcess()
with pytest.raises(ProcessError, match="requires an argument"):
await proc.start("-c", openocd_bin="/bin/true")
async def test_start_nonexistent_binary(self):
"""start() with a nonexistent binary path raises ProcessError."""
proc = OpenOCDProcess()
with pytest.raises(ProcessError):
await proc.start(
"interface/cmsis-dap.cfg",
openocd_bin="/nonexistent/path/to/openocd",
)
async def test_pid_is_none_before_start(self):
"""pid property is None before start()."""
proc = OpenOCDProcess()
assert proc.pid is None
async def test_running_is_false_before_start(self):
"""running property is False before start()."""
proc = OpenOCDProcess()
assert proc.running is False
async def test_stop_before_start_is_safe(self):
"""stop() before start() should not raise."""
proc = OpenOCDProcess()
await proc.stop() # no-op, no exception
# ======================================================================
# 9. Notification connection error paths
# ======================================================================
class TestNotificationErrors:
"""Error conditions for the notification subsystem."""
async def test_enable_notifications_before_connect(self):
"""enable_notifications() before connect() raises ConnectionError."""
conn = TclRpcConnection(timeout=1.0)
with pytest.raises(ConnectionError, match="Not connected"):
await conn.enable_notifications()