599 lines
23 KiB
Python
599 lines
23 KiB
Python
"""Error-path tests for openocd-python.
|
|
|
|
Exercises every error condition and exception branch across the
|
|
connection, target, memory, register, flash, breakpoint, session,
|
|
and process subsystems.
|
|
|
|
Each test configures a mock server to return error responses (or
|
|
misbehave at the protocol level) and asserts that the correct
|
|
exception type is raised with a meaningful message.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
import pytest
|
|
|
|
from openocd.breakpoints import BreakpointError, BreakpointManager
|
|
from openocd.connection.tcl_rpc import TclRpcConnection
|
|
from openocd.errors import (
|
|
ConnectionError,
|
|
FlashError,
|
|
ProcessError,
|
|
TargetError,
|
|
TargetNotHaltedError,
|
|
TimeoutError,
|
|
)
|
|
from openocd.flash import Flash
|
|
from openocd.memory import Memory
|
|
from openocd.process import OpenOCDProcess
|
|
from openocd.registers import Registers
|
|
from openocd.session import Session, _get_or_create_loop
|
|
from openocd.target import Target
|
|
from tests.mock_server import MockOpenOCDServer
|
|
|
|
# ======================================================================
|
|
# Helpers: error-returning mock servers
|
|
# ======================================================================
|
|
|
|
|
|
@pytest.fixture
|
|
async def error_server():
|
|
"""A MockOpenOCDServer pre-wired to return error strings.
|
|
|
|
The default response table is left intact so that "targets" and
|
|
similar plumbing commands still work. Individual tests prepend
|
|
error-producing routes via ``server.add_response()``.
|
|
"""
|
|
server = MockOpenOCDServer()
|
|
await server.start()
|
|
yield server
|
|
await server.stop()
|
|
|
|
|
|
@pytest.fixture
|
|
async def error_conn(error_server):
|
|
"""A TclRpcConnection wired to the error mock server."""
|
|
host, port = error_server.address
|
|
conn = TclRpcConnection(timeout=5.0)
|
|
await conn.connect(host, port)
|
|
yield conn
|
|
await conn.close()
|
|
|
|
|
|
# ======================================================================
|
|
# 1. Connection error paths
|
|
# ======================================================================
|
|
|
|
|
|
class TestConnectionErrors:
|
|
"""Errors at the transport / framing layer."""
|
|
|
|
async def test_send_on_closed_connection(self):
|
|
"""send() after close() raises ConnectionError."""
|
|
server = MockOpenOCDServer()
|
|
await server.start()
|
|
host, port = server.address
|
|
|
|
conn = TclRpcConnection(timeout=5.0)
|
|
await conn.connect(host, port)
|
|
await conn.close()
|
|
|
|
with pytest.raises(ConnectionError, match="Not connected"):
|
|
await conn.send("targets")
|
|
|
|
await server.stop()
|
|
|
|
async def test_send_before_connect(self):
|
|
"""send() without a prior connect() raises ConnectionError."""
|
|
conn = TclRpcConnection()
|
|
with pytest.raises(ConnectionError, match="Not connected"):
|
|
await conn.send("targets")
|
|
|
|
async def test_timeout_when_server_never_responds(self):
|
|
"""A server that reads but never sends \\x1a triggers TimeoutError."""
|
|
hang_event = asyncio.Event()
|
|
|
|
async def _black_hole(reader, writer):
|
|
await reader.read(4096)
|
|
# never send a response -- wait until test signals us to stop
|
|
try:
|
|
await hang_event.wait()
|
|
except asyncio.CancelledError:
|
|
pass
|
|
finally:
|
|
writer.close()
|
|
|
|
srv = await asyncio.start_server(_black_hole, "127.0.0.1", 0)
|
|
await srv.start_serving()
|
|
host, port = srv.sockets[0].getsockname()[:2]
|
|
|
|
conn = TclRpcConnection(timeout=0.3)
|
|
await conn.connect(host, port)
|
|
|
|
with pytest.raises(TimeoutError):
|
|
await conn.send("targets")
|
|
|
|
await conn.close()
|
|
hang_event.set()
|
|
srv.close()
|
|
await srv.wait_closed()
|
|
|
|
async def test_server_closes_connection_mid_stream(self):
|
|
"""Server closing the socket without a separator raises ConnectionError."""
|
|
|
|
async def _close_immediately(reader, writer):
|
|
await reader.read(4096)
|
|
# send partial data with no separator then close
|
|
writer.write(b"partial response")
|
|
await writer.drain()
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
srv = await asyncio.start_server(_close_immediately, "127.0.0.1", 0)
|
|
await srv.start_serving()
|
|
host, port = srv.sockets[0].getsockname()[:2]
|
|
|
|
conn = TclRpcConnection(timeout=2.0)
|
|
await conn.connect(host, port)
|
|
|
|
with pytest.raises(ConnectionError, match="closed the connection"):
|
|
await conn.send("targets")
|
|
|
|
await conn.close()
|
|
srv.close()
|
|
await srv.wait_closed()
|
|
|
|
async def test_bounded_read_rejects_oversized_response(self):
|
|
"""Response exceeding MAX_RESPONSE_SIZE without separator raises ConnectionError."""
|
|
|
|
async def _flood(reader, writer):
|
|
await reader.read(4096)
|
|
# send a lot of data with no separator
|
|
chunk = b"A" * 65536
|
|
try:
|
|
while True:
|
|
writer.write(chunk)
|
|
await writer.drain()
|
|
except (BrokenPipeError, ConnectionResetError):
|
|
pass
|
|
|
|
srv = await asyncio.start_server(_flood, "127.0.0.1", 0)
|
|
await srv.start_serving()
|
|
host, port = srv.sockets[0].getsockname()[:2]
|
|
|
|
conn = TclRpcConnection(timeout=10.0)
|
|
await conn.connect(host, port)
|
|
|
|
with pytest.raises(ConnectionError, match="exceeded"):
|
|
await conn.send("targets")
|
|
|
|
await conn.close()
|
|
srv.close()
|
|
await srv.wait_closed()
|
|
|
|
async def test_connect_refused(self):
|
|
"""Connecting to a port with nothing listening raises ConnectionError."""
|
|
conn = TclRpcConnection(timeout=1.0)
|
|
with pytest.raises(ConnectionError):
|
|
await conn.connect("127.0.0.1", 1)
|
|
|
|
|
|
# ======================================================================
|
|
# 2. Target error paths
|
|
# ======================================================================
|
|
|
|
|
|
class TestTargetErrors:
|
|
"""Error conditions from the Target subsystem."""
|
|
|
|
async def test_halt_error_response(self, error_server, error_conn):
|
|
"""halt() with an error response raises TargetError."""
|
|
error_server.add_response(r"^halt$", "error: target not responding")
|
|
target = Target(error_conn)
|
|
|
|
with pytest.raises(TargetError, match="halt failed"):
|
|
await target.halt()
|
|
|
|
async def test_halt_already_halted_is_not_error(self, error_server, error_conn):
|
|
"""halt() when response says 'already halted' should NOT raise."""
|
|
error_server.add_response(r"^halt$", "error: target already halted")
|
|
target = Target(error_conn)
|
|
# "already halted" is a benign condition -- halt() checks for it
|
|
state = await target.halt()
|
|
assert state.state in ("halted", "unknown")
|
|
|
|
async def test_resume_error_response(self, error_server, error_conn):
|
|
"""resume() with an error response raises TargetError."""
|
|
error_server.add_response(r"^resume", "error: cannot resume target")
|
|
target = Target(error_conn)
|
|
|
|
with pytest.raises(TargetError, match="resume failed"):
|
|
await target.resume()
|
|
|
|
async def test_step_error_response(self, error_server, error_conn):
|
|
"""step() with an error response raises TargetError."""
|
|
error_server.add_response(r"^step", "error: step failed on target")
|
|
target = Target(error_conn)
|
|
|
|
with pytest.raises(TargetError, match="step failed"):
|
|
await target.step()
|
|
|
|
async def test_wait_halt_timeout(self, error_server, error_conn):
|
|
"""wait_halt receiving 'timed out' raises TimeoutError."""
|
|
error_server.add_response(r"^wait_halt", "timed out while waiting for target")
|
|
target = Target(error_conn)
|
|
|
|
with pytest.raises(TimeoutError, match="did not halt"):
|
|
await target.wait_halt(timeout_ms=100)
|
|
|
|
async def test_wait_halt_time_out_variant(self, error_server, error_conn):
|
|
"""wait_halt receiving 'time out' (two words) also raises TimeoutError."""
|
|
error_server.add_response(r"^wait_halt", "time out waiting for halt")
|
|
target = Target(error_conn)
|
|
|
|
with pytest.raises(TimeoutError, match="did not halt"):
|
|
await target.wait_halt(timeout_ms=100)
|
|
|
|
async def test_wait_halt_generic_error(self, error_server, error_conn):
|
|
"""wait_halt with a generic error (not timeout) raises TargetError."""
|
|
error_server.add_response(r"^wait_halt", "error: target communication failure")
|
|
target = Target(error_conn)
|
|
|
|
with pytest.raises(TargetError, match="wait_halt failed"):
|
|
await target.wait_halt(timeout_ms=100)
|
|
|
|
async def test_state_unexpected_format(self, error_server, error_conn):
|
|
"""targets returning garbage still produces a TargetState with 'unknown'."""
|
|
error_server.add_response(r"^targets$", "this is not valid target output")
|
|
# Also need to suppress the reg pc call that _parse_state makes
|
|
error_server.add_response(r"^reg\s+pc$", "no such register")
|
|
target = Target(error_conn)
|
|
|
|
state = await target.state()
|
|
assert state.name == "unknown"
|
|
assert state.state == "unknown"
|
|
assert state.current_pc is None
|
|
|
|
async def test_state_unrecognized_state_string(self, error_server, error_conn):
|
|
"""A target row with a bizarre state string normalizes to 'unknown'."""
|
|
weird_table = (
|
|
" TargetName Type Endian TapName State\n"
|
|
"-- ------------------ ---------- ------ ------------------ ------------\n"
|
|
" 0* stm32f1x.cpu cortex_m little stm32f1x.cpu exploding"
|
|
)
|
|
error_server.add_response(r"^targets$", weird_table)
|
|
target = Target(error_conn)
|
|
|
|
state = await target.state()
|
|
assert state.name == "stm32f1x.cpu"
|
|
assert state.state == "unknown"
|
|
assert state.current_pc is None
|
|
|
|
async def test_reset_error_response(self, error_server, error_conn):
|
|
"""reset() with an error response raises TargetError."""
|
|
error_server.add_response(r"^reset\s+", "error: reset failed, adapter not found")
|
|
target = Target(error_conn)
|
|
|
|
with pytest.raises(TargetError, match="reset failed"):
|
|
await target.reset("halt")
|
|
|
|
|
|
# ======================================================================
|
|
# 3. Memory error paths
|
|
# ======================================================================
|
|
|
|
|
|
class TestMemoryErrors:
|
|
"""Error conditions from the Memory subsystem."""
|
|
|
|
async def test_read_u32_target_not_halted(self, error_server, error_conn):
|
|
"""read_u32 with 'target not halted' in response raises TargetError."""
|
|
error_server.add_response(r"^read_memory\s+", "error: target not halted")
|
|
mem = Memory(error_conn)
|
|
|
|
with pytest.raises(TargetError, match="read_memory failed"):
|
|
await mem.read_u32(0x20000000, 1)
|
|
|
|
async def test_write_u32_error_response(self, error_server, error_conn):
|
|
"""write_u32 with an error response raises TargetError."""
|
|
error_server.add_response(r"^write_memory\s+", "error: target not halted")
|
|
mem = Memory(error_conn)
|
|
|
|
with pytest.raises(TargetError, match="write_memory failed"):
|
|
await mem.write_u32(0x20000000, 0xDEADBEEF)
|
|
|
|
async def test_read_u32_non_hex_tokens(self, error_server, error_conn):
|
|
"""read_memory returning non-hex garbage raises TargetError."""
|
|
error_server.add_response(r"^read_memory\s+", "not_a_hex_value xyz !!!")
|
|
mem = Memory(error_conn)
|
|
|
|
with pytest.raises(TargetError, match="Cannot parse read_memory"):
|
|
await mem.read_u32(0x20000000, 1)
|
|
|
|
async def test_read_u8_error_response(self, error_server, error_conn):
|
|
"""read_u8 with an error response raises TargetError."""
|
|
error_server.add_response(r"^read_memory\s+", "error: bus fault during memory read")
|
|
mem = Memory(error_conn)
|
|
|
|
with pytest.raises(TargetError, match="read_memory failed"):
|
|
await mem.read_u8(0xFFFFFFFF, 4)
|
|
|
|
async def test_write_bytes_error_response(self, error_server, error_conn):
|
|
"""write_bytes with an error response raises TargetError."""
|
|
error_server.add_response(r"^write_memory\s+", "error: write access violation")
|
|
mem = Memory(error_conn)
|
|
|
|
with pytest.raises(TargetError, match="write_memory failed"):
|
|
await mem.write_bytes(0x00000000, b"\x01\x02\x03")
|
|
|
|
async def test_read_u16_error_response(self, error_server, error_conn):
|
|
"""read_u16 with an error response raises TargetError."""
|
|
error_server.add_response(r"^read_memory\s+", "error: alignment fault")
|
|
mem = Memory(error_conn)
|
|
|
|
with pytest.raises(TargetError, match="read_memory failed"):
|
|
await mem.read_u16(0x20000001, 1)
|
|
|
|
|
|
# ======================================================================
|
|
# 4. Register error paths
|
|
# ======================================================================
|
|
|
|
|
|
class TestRegisterErrors:
|
|
"""Error conditions from the Registers subsystem."""
|
|
|
|
async def test_read_not_halted(self, error_server, error_conn):
|
|
"""read('pc') when target is not halted raises TargetNotHaltedError."""
|
|
error_server.add_response(r"^reg\s+pc$", "target not halted")
|
|
regs = Registers(error_conn)
|
|
|
|
with pytest.raises(TargetNotHaltedError, match="halted"):
|
|
await regs.read("pc")
|
|
|
|
async def test_read_nonexistent_register(self, error_server, error_conn):
|
|
"""read('nonexistent') with unparseable response raises TargetError."""
|
|
error_server.add_response(r"^reg\s+nonexistent$", 'invalid command name "nonexistent"')
|
|
regs = Registers(error_conn)
|
|
|
|
with pytest.raises(TargetError, match="Cannot parse register"):
|
|
await regs.read("nonexistent")
|
|
|
|
async def test_write_not_halted(self, error_server, error_conn):
|
|
"""write('pc', val) when target is not halted raises TargetNotHaltedError."""
|
|
error_server.add_response(r"^reg\s+pc\s+0x", "target not halted")
|
|
regs = Registers(error_conn)
|
|
|
|
with pytest.raises(TargetNotHaltedError, match="halted"):
|
|
await regs.write("pc", 0x1234)
|
|
|
|
async def test_write_generic_error(self, error_server, error_conn):
|
|
"""write() with a non-halted-related error raises TargetError."""
|
|
error_server.add_response(r"^reg\s+r0\s+0x", "error: register write failed")
|
|
regs = Registers(error_conn)
|
|
|
|
with pytest.raises(TargetError, match="reg write failed"):
|
|
await regs.write("r0", 0xDEAD)
|
|
|
|
async def test_read_all_not_halted(self, error_server, error_conn):
|
|
"""read_all() when target is not halted raises TargetNotHaltedError."""
|
|
error_server.add_response(r"^reg$", "target not halted")
|
|
regs = Registers(error_conn)
|
|
|
|
with pytest.raises(TargetNotHaltedError, match="halted"):
|
|
await regs.read_all()
|
|
|
|
async def test_read_many_partial_failure(self, error_server, error_conn):
|
|
"""read_many() should propagate the first register read failure."""
|
|
# pc succeeds, but sp returns not-halted
|
|
error_server.add_response(r"^reg\s+sp$", "target not halted")
|
|
regs = Registers(error_conn)
|
|
|
|
with pytest.raises(TargetNotHaltedError):
|
|
await regs.read_many(["pc", "sp"])
|
|
|
|
|
|
# ======================================================================
|
|
# 5. Flash error paths
|
|
# ======================================================================
|
|
|
|
|
|
class TestFlashErrors:
|
|
"""Error conditions from the Flash subsystem."""
|
|
|
|
async def test_banks_error(self, error_server, error_conn):
|
|
"""flash.banks() with an error response raises FlashError."""
|
|
error_server.add_response(r"^flash banks$", "error: no flash banks configured")
|
|
flash = Flash(error_conn)
|
|
|
|
with pytest.raises(FlashError, match="flash banks"):
|
|
await flash.banks()
|
|
|
|
async def test_info_error(self, error_server, error_conn):
|
|
"""flash.info() with an error response raises FlashError."""
|
|
error_server.add_response(r"^flash info\s+", "error: invalid bank number")
|
|
flash = Flash(error_conn)
|
|
|
|
with pytest.raises(FlashError, match="flash info"):
|
|
await flash.info(bank=99)
|
|
|
|
async def test_erase_sector_invalid_range(self, error_server, error_conn):
|
|
"""erase_sector with first > last raises FlashError locally."""
|
|
flash = Flash(error_conn)
|
|
|
|
with pytest.raises(FlashError, match="Invalid sector range"):
|
|
await flash.erase_sector(bank=0, first=10, last=5)
|
|
|
|
async def test_erase_sector_error_response(self, error_server, error_conn):
|
|
"""erase_sector with error from server raises FlashError."""
|
|
error_server.add_response(r"^flash erase_sector\s+", "error: erase failed")
|
|
flash = Flash(error_conn)
|
|
|
|
with pytest.raises(FlashError, match="flash erase_sector"):
|
|
await flash.erase_sector(bank=0, first=0, last=3)
|
|
|
|
async def test_write_image_error(self, error_server, error_conn):
|
|
"""write_image with error from server raises FlashError."""
|
|
error_server.add_response(r"^flash write_image\s+", "error: flash write failed")
|
|
flash = Flash(error_conn)
|
|
|
|
with pytest.raises(FlashError, match="flash write_image"):
|
|
from pathlib import Path
|
|
|
|
await flash.write_image(Path("/tmp/fake_firmware.bin"), verify=False)
|
|
|
|
async def test_protect_error(self, error_server, error_conn):
|
|
"""flash.protect() with error response raises FlashError."""
|
|
error_server.add_response(r"^flash protect\s+", "error: protection change not supported")
|
|
flash = Flash(error_conn)
|
|
|
|
with pytest.raises(FlashError, match="flash protect"):
|
|
await flash.protect(bank=0, first=0, last=3, on=True)
|
|
|
|
|
|
# ======================================================================
|
|
# 6. Breakpoint error paths
|
|
# ======================================================================
|
|
|
|
|
|
class TestBreakpointErrors:
|
|
"""Error conditions from the BreakpointManager subsystem."""
|
|
|
|
async def test_add_breakpoint_error(self, error_server, error_conn):
|
|
"""add() with error response raises BreakpointError."""
|
|
error_server.add_response(
|
|
r"^bp\s+0x", "error: can not add breakpoint, resource not available"
|
|
)
|
|
bp = BreakpointManager(error_conn)
|
|
|
|
with pytest.raises(BreakpointError, match="bp 0x"):
|
|
await bp.add(0x08001234)
|
|
|
|
async def test_remove_breakpoint_error(self, error_server, error_conn):
|
|
"""remove() with error response raises BreakpointError."""
|
|
error_server.add_response(r"^rbp\s+", "error: no breakpoint at address")
|
|
bp = BreakpointManager(error_conn)
|
|
|
|
with pytest.raises(BreakpointError, match="rbp 0x"):
|
|
await bp.remove(0x08001234)
|
|
|
|
async def test_add_watchpoint_error(self, error_server, error_conn):
|
|
"""add_watchpoint() with error response raises BreakpointError."""
|
|
error_server.add_response(r"^wp\s+0x", "error: no free watchpoint comparator")
|
|
bp = BreakpointManager(error_conn)
|
|
|
|
with pytest.raises(BreakpointError, match="wp 0x"):
|
|
await bp.add_watchpoint(0x20000000, 4)
|
|
|
|
async def test_remove_watchpoint_error(self, error_server, error_conn):
|
|
"""remove_watchpoint() with error response raises BreakpointError."""
|
|
error_server.add_response(r"^rwp\s+", "error: no watchpoint at address")
|
|
bp = BreakpointManager(error_conn)
|
|
|
|
with pytest.raises(BreakpointError, match="rwp 0x"):
|
|
await bp.remove_watchpoint(0x20000000)
|
|
|
|
|
|
# ======================================================================
|
|
# 7. Session error paths
|
|
# ======================================================================
|
|
|
|
|
|
class TestSessionErrors:
|
|
"""Error conditions from the Session layer."""
|
|
|
|
async def test_get_or_create_loop_from_async_context(self):
|
|
"""_get_or_create_loop() inside a running loop raises RuntimeError."""
|
|
with pytest.raises(RuntimeError, match="Cannot use sync API"):
|
|
_get_or_create_loop()
|
|
|
|
async def test_command_on_closed_session(self, mock_ocd):
|
|
"""session.command() after close() raises ConnectionError."""
|
|
host, port, _server = mock_ocd
|
|
sess = await Session.connect(host, port, timeout=5.0)
|
|
await sess.close()
|
|
|
|
with pytest.raises(ConnectionError, match="Not connected"):
|
|
await sess.command("targets")
|
|
|
|
async def test_connect_to_nonexistent_host(self):
|
|
"""Session.connect() to a bogus address raises ConnectionError."""
|
|
with pytest.raises(ConnectionError):
|
|
await Session.connect("127.0.0.1", 1, timeout=1.0)
|
|
|
|
async def test_double_close_is_safe(self, mock_ocd):
|
|
"""Calling close() twice on a session should not raise."""
|
|
host, port, _server = mock_ocd
|
|
sess = await Session.connect(host, port, timeout=5.0)
|
|
await sess.close()
|
|
await sess.close() # should be a no-op
|
|
|
|
|
|
# ======================================================================
|
|
# 8. Process error paths
|
|
# ======================================================================
|
|
|
|
|
|
class TestProcessErrors:
|
|
"""Error conditions from the OpenOCDProcess manager."""
|
|
|
|
async def test_start_empty_config(self):
|
|
"""start() with an empty config string raises ProcessError."""
|
|
proc = OpenOCDProcess()
|
|
with pytest.raises(ProcessError, match="[Ee]mpty config"):
|
|
# Use /bin/true as a stand-in binary so we reach config validation
|
|
await proc.start("", openocd_bin="/bin/true")
|
|
|
|
async def test_start_dangling_flag(self):
|
|
"""start() with a trailing -f and no argument raises ProcessError."""
|
|
proc = OpenOCDProcess()
|
|
with pytest.raises(ProcessError, match="requires an argument"):
|
|
await proc.start("-f", openocd_bin="/bin/true")
|
|
|
|
async def test_start_dangling_c_flag(self):
|
|
"""start() with a trailing -c and no argument raises ProcessError."""
|
|
proc = OpenOCDProcess()
|
|
with pytest.raises(ProcessError, match="requires an argument"):
|
|
await proc.start("-c", openocd_bin="/bin/true")
|
|
|
|
async def test_start_nonexistent_binary(self):
|
|
"""start() with a nonexistent binary path raises ProcessError."""
|
|
proc = OpenOCDProcess()
|
|
with pytest.raises(ProcessError):
|
|
await proc.start(
|
|
"interface/cmsis-dap.cfg",
|
|
openocd_bin="/nonexistent/path/to/openocd",
|
|
)
|
|
|
|
async def test_pid_is_none_before_start(self):
|
|
"""pid property is None before start()."""
|
|
proc = OpenOCDProcess()
|
|
assert proc.pid is None
|
|
|
|
async def test_running_is_false_before_start(self):
|
|
"""running property is False before start()."""
|
|
proc = OpenOCDProcess()
|
|
assert proc.running is False
|
|
|
|
async def test_stop_before_start_is_safe(self):
|
|
"""stop() before start() should not raise."""
|
|
proc = OpenOCDProcess()
|
|
await proc.stop() # no-op, no exception
|
|
|
|
|
|
# ======================================================================
|
|
# 9. Notification connection error paths
|
|
# ======================================================================
|
|
|
|
|
|
class TestNotificationErrors:
|
|
"""Error conditions for the notification subsystem."""
|
|
|
|
async def test_enable_notifications_before_connect(self):
|
|
"""enable_notifications() before connect() raises ConnectionError."""
|
|
conn = TclRpcConnection(timeout=1.0)
|
|
with pytest.raises(ConnectionError, match="Not connected"):
|
|
await conn.enable_notifications()
|