"""Error-path tests for openocd-python. Exercises every error condition and exception branch across the connection, target, memory, register, flash, breakpoint, session, and process subsystems. Each test configures a mock server to return error responses (or misbehave at the protocol level) and asserts that the correct exception type is raised with a meaningful message. """ from __future__ import annotations import asyncio import pytest from openocd.breakpoints import BreakpointError, BreakpointManager from openocd.connection.tcl_rpc import TclRpcConnection from openocd.errors import ( ConnectionError, FlashError, ProcessError, TargetError, TargetNotHaltedError, TimeoutError, ) from openocd.flash import Flash from openocd.memory import Memory from openocd.process import OpenOCDProcess from openocd.registers import Registers from openocd.session import Session, _get_or_create_loop from openocd.target import Target from tests.mock_server import MockOpenOCDServer # ====================================================================== # Helpers: error-returning mock servers # ====================================================================== @pytest.fixture async def error_server(): """A MockOpenOCDServer pre-wired to return error strings. The default response table is left intact so that "targets" and similar plumbing commands still work. Individual tests prepend error-producing routes via ``server.add_response()``. """ server = MockOpenOCDServer() await server.start() yield server await server.stop() @pytest.fixture async def error_conn(error_server): """A TclRpcConnection wired to the error mock server.""" host, port = error_server.address conn = TclRpcConnection(timeout=5.0) await conn.connect(host, port) yield conn await conn.close() # ====================================================================== # 1. Connection error paths # ====================================================================== class TestConnectionErrors: """Errors at the transport / framing layer.""" async def test_send_on_closed_connection(self): """send() after close() raises ConnectionError.""" server = MockOpenOCDServer() await server.start() host, port = server.address conn = TclRpcConnection(timeout=5.0) await conn.connect(host, port) await conn.close() with pytest.raises(ConnectionError, match="Not connected"): await conn.send("targets") await server.stop() async def test_send_before_connect(self): """send() without a prior connect() raises ConnectionError.""" conn = TclRpcConnection() with pytest.raises(ConnectionError, match="Not connected"): await conn.send("targets") async def test_timeout_when_server_never_responds(self): """A server that reads but never sends \\x1a triggers TimeoutError.""" hang_event = asyncio.Event() async def _black_hole(reader, writer): await reader.read(4096) # never send a response -- wait until test signals us to stop try: await hang_event.wait() except asyncio.CancelledError: pass finally: writer.close() srv = await asyncio.start_server(_black_hole, "127.0.0.1", 0) await srv.start_serving() host, port = srv.sockets[0].getsockname()[:2] conn = TclRpcConnection(timeout=0.3) await conn.connect(host, port) with pytest.raises(TimeoutError): await conn.send("targets") await conn.close() hang_event.set() srv.close() await srv.wait_closed() async def test_server_closes_connection_mid_stream(self): """Server closing the socket without a separator raises ConnectionError.""" async def _close_immediately(reader, writer): await reader.read(4096) # send partial data with no separator then close writer.write(b"partial response") await writer.drain() writer.close() await writer.wait_closed() srv = await asyncio.start_server(_close_immediately, "127.0.0.1", 0) await srv.start_serving() host, port = srv.sockets[0].getsockname()[:2] conn = TclRpcConnection(timeout=2.0) await conn.connect(host, port) with pytest.raises(ConnectionError, match="closed the connection"): await conn.send("targets") await conn.close() srv.close() await srv.wait_closed() async def test_bounded_read_rejects_oversized_response(self): """Response exceeding MAX_RESPONSE_SIZE without separator raises ConnectionError.""" async def _flood(reader, writer): await reader.read(4096) # send a lot of data with no separator chunk = b"A" * 65536 try: while True: writer.write(chunk) await writer.drain() except (BrokenPipeError, ConnectionResetError): pass srv = await asyncio.start_server(_flood, "127.0.0.1", 0) await srv.start_serving() host, port = srv.sockets[0].getsockname()[:2] conn = TclRpcConnection(timeout=10.0) await conn.connect(host, port) with pytest.raises(ConnectionError, match="exceeded"): await conn.send("targets") await conn.close() srv.close() await srv.wait_closed() async def test_connect_refused(self): """Connecting to a port with nothing listening raises ConnectionError.""" conn = TclRpcConnection(timeout=1.0) with pytest.raises(ConnectionError): await conn.connect("127.0.0.1", 1) # ====================================================================== # 2. Target error paths # ====================================================================== class TestTargetErrors: """Error conditions from the Target subsystem.""" async def test_halt_error_response(self, error_server, error_conn): """halt() with an error response raises TargetError.""" error_server.add_response(r"^halt$", "error: target not responding") target = Target(error_conn) with pytest.raises(TargetError, match="halt failed"): await target.halt() async def test_halt_already_halted_is_not_error(self, error_server, error_conn): """halt() when response says 'already halted' should NOT raise.""" error_server.add_response(r"^halt$", "error: target already halted") target = Target(error_conn) # "already halted" is a benign condition -- halt() checks for it state = await target.halt() assert state.state in ("halted", "unknown") async def test_resume_error_response(self, error_server, error_conn): """resume() with an error response raises TargetError.""" error_server.add_response(r"^resume", "error: cannot resume target") target = Target(error_conn) with pytest.raises(TargetError, match="resume failed"): await target.resume() async def test_step_error_response(self, error_server, error_conn): """step() with an error response raises TargetError.""" error_server.add_response(r"^step", "error: step failed on target") target = Target(error_conn) with pytest.raises(TargetError, match="step failed"): await target.step() async def test_wait_halt_timeout(self, error_server, error_conn): """wait_halt receiving 'timed out' raises TimeoutError.""" error_server.add_response(r"^wait_halt", "timed out while waiting for target") target = Target(error_conn) with pytest.raises(TimeoutError, match="did not halt"): await target.wait_halt(timeout_ms=100) async def test_wait_halt_time_out_variant(self, error_server, error_conn): """wait_halt receiving 'time out' (two words) also raises TimeoutError.""" error_server.add_response(r"^wait_halt", "time out waiting for halt") target = Target(error_conn) with pytest.raises(TimeoutError, match="did not halt"): await target.wait_halt(timeout_ms=100) async def test_wait_halt_generic_error(self, error_server, error_conn): """wait_halt with a generic error (not timeout) raises TargetError.""" error_server.add_response(r"^wait_halt", "error: target communication failure") target = Target(error_conn) with pytest.raises(TargetError, match="wait_halt failed"): await target.wait_halt(timeout_ms=100) async def test_state_unexpected_format(self, error_server, error_conn): """targets returning garbage still produces a TargetState with 'unknown'.""" error_server.add_response(r"^targets$", "this is not valid target output") # Also need to suppress the reg pc call that _parse_state makes error_server.add_response(r"^reg\s+pc$", "no such register") target = Target(error_conn) state = await target.state() assert state.name == "unknown" assert state.state == "unknown" assert state.current_pc is None async def test_state_unrecognized_state_string(self, error_server, error_conn): """A target row with a bizarre state string normalizes to 'unknown'.""" weird_table = ( " TargetName Type Endian TapName State\n" "-- ------------------ ---------- ------ ------------------ ------------\n" " 0* stm32f1x.cpu cortex_m little stm32f1x.cpu exploding" ) error_server.add_response(r"^targets$", weird_table) target = Target(error_conn) state = await target.state() assert state.name == "stm32f1x.cpu" assert state.state == "unknown" assert state.current_pc is None async def test_reset_error_response(self, error_server, error_conn): """reset() with an error response raises TargetError.""" error_server.add_response(r"^reset\s+", "error: reset failed, adapter not found") target = Target(error_conn) with pytest.raises(TargetError, match="reset failed"): await target.reset("halt") # ====================================================================== # 3. Memory error paths # ====================================================================== class TestMemoryErrors: """Error conditions from the Memory subsystem.""" async def test_read_u32_target_not_halted(self, error_server, error_conn): """read_u32 with 'target not halted' in response raises TargetError.""" error_server.add_response( r"^read_memory\s+", "error: target not halted" ) mem = Memory(error_conn) with pytest.raises(TargetError, match="read_memory failed"): await mem.read_u32(0x20000000, 1) async def test_write_u32_error_response(self, error_server, error_conn): """write_u32 with an error response raises TargetError.""" error_server.add_response( r"^write_memory\s+", "error: target not halted" ) mem = Memory(error_conn) with pytest.raises(TargetError, match="write_memory failed"): await mem.write_u32(0x20000000, 0xDEADBEEF) async def test_read_u32_non_hex_tokens(self, error_server, error_conn): """read_memory returning non-hex garbage raises TargetError.""" error_server.add_response( r"^read_memory\s+", "not_a_hex_value xyz !!!" ) mem = Memory(error_conn) with pytest.raises(TargetError, match="Cannot parse read_memory"): await mem.read_u32(0x20000000, 1) async def test_read_u8_error_response(self, error_server, error_conn): """read_u8 with an error response raises TargetError.""" error_server.add_response( r"^read_memory\s+", "error: bus fault during memory read" ) mem = Memory(error_conn) with pytest.raises(TargetError, match="read_memory failed"): await mem.read_u8(0xFFFFFFFF, 4) async def test_write_bytes_error_response(self, error_server, error_conn): """write_bytes with an error response raises TargetError.""" error_server.add_response( r"^write_memory\s+", "error: write access violation" ) mem = Memory(error_conn) with pytest.raises(TargetError, match="write_memory failed"): await mem.write_bytes(0x00000000, b"\x01\x02\x03") async def test_read_u16_error_response(self, error_server, error_conn): """read_u16 with an error response raises TargetError.""" error_server.add_response( r"^read_memory\s+", "error: alignment fault" ) mem = Memory(error_conn) with pytest.raises(TargetError, match="read_memory failed"): await mem.read_u16(0x20000001, 1) # ====================================================================== # 4. Register error paths # ====================================================================== class TestRegisterErrors: """Error conditions from the Registers subsystem.""" async def test_read_not_halted(self, error_server, error_conn): """read('pc') when target is not halted raises TargetNotHaltedError.""" error_server.add_response( r"^reg\s+pc$", "target not halted" ) regs = Registers(error_conn) with pytest.raises(TargetNotHaltedError, match="halted"): await regs.read("pc") async def test_read_nonexistent_register(self, error_server, error_conn): """read('nonexistent') with unparseable response raises TargetError.""" error_server.add_response( r"^reg\s+nonexistent$", "invalid command name \"nonexistent\"" ) regs = Registers(error_conn) with pytest.raises(TargetError, match="Cannot parse register"): await regs.read("nonexistent") async def test_write_not_halted(self, error_server, error_conn): """write('pc', val) when target is not halted raises TargetNotHaltedError.""" error_server.add_response( r"^reg\s+pc\s+0x", "target not halted" ) regs = Registers(error_conn) with pytest.raises(TargetNotHaltedError, match="halted"): await regs.write("pc", 0x1234) async def test_write_generic_error(self, error_server, error_conn): """write() with a non-halted-related error raises TargetError.""" error_server.add_response( r"^reg\s+r0\s+0x", "error: register write failed" ) regs = Registers(error_conn) with pytest.raises(TargetError, match="reg write failed"): await regs.write("r0", 0xDEAD) async def test_read_all_not_halted(self, error_server, error_conn): """read_all() when target is not halted raises TargetNotHaltedError.""" error_server.add_response(r"^reg$", "target not halted") regs = Registers(error_conn) with pytest.raises(TargetNotHaltedError, match="halted"): await regs.read_all() async def test_read_many_partial_failure(self, error_server, error_conn): """read_many() should propagate the first register read failure.""" # pc succeeds, but sp returns not-halted error_server.add_response(r"^reg\s+sp$", "target not halted") regs = Registers(error_conn) with pytest.raises(TargetNotHaltedError): await regs.read_many(["pc", "sp"]) # ====================================================================== # 5. Flash error paths # ====================================================================== class TestFlashErrors: """Error conditions from the Flash subsystem.""" async def test_banks_error(self, error_server, error_conn): """flash.banks() with an error response raises FlashError.""" error_server.add_response(r"^flash banks$", "error: no flash banks configured") flash = Flash(error_conn) with pytest.raises(FlashError, match="flash banks"): await flash.banks() async def test_info_error(self, error_server, error_conn): """flash.info() with an error response raises FlashError.""" error_server.add_response(r"^flash info\s+", "error: invalid bank number") flash = Flash(error_conn) with pytest.raises(FlashError, match="flash info"): await flash.info(bank=99) async def test_erase_sector_invalid_range(self, error_server, error_conn): """erase_sector with first > last raises FlashError locally.""" flash = Flash(error_conn) with pytest.raises(FlashError, match="Invalid sector range"): await flash.erase_sector(bank=0, first=10, last=5) async def test_erase_sector_error_response(self, error_server, error_conn): """erase_sector with error from server raises FlashError.""" error_server.add_response(r"^flash erase_sector\s+", "error: erase failed") flash = Flash(error_conn) with pytest.raises(FlashError, match="flash erase_sector"): await flash.erase_sector(bank=0, first=0, last=3) async def test_write_image_error(self, error_server, error_conn): """write_image with error from server raises FlashError.""" error_server.add_response( r"^flash write_image\s+", "error: flash write failed" ) flash = Flash(error_conn) with pytest.raises(FlashError, match="flash write_image"): from pathlib import Path await flash.write_image(Path("/tmp/fake_firmware.bin"), verify=False) async def test_protect_error(self, error_server, error_conn): """flash.protect() with error response raises FlashError.""" error_server.add_response( r"^flash protect\s+", "error: protection change not supported" ) flash = Flash(error_conn) with pytest.raises(FlashError, match="flash protect"): await flash.protect(bank=0, first=0, last=3, on=True) # ====================================================================== # 6. Breakpoint error paths # ====================================================================== class TestBreakpointErrors: """Error conditions from the BreakpointManager subsystem.""" async def test_add_breakpoint_error(self, error_server, error_conn): """add() with error response raises BreakpointError.""" error_server.add_response( r"^bp\s+0x", "error: can not add breakpoint, resource not available" ) bp = BreakpointManager(error_conn) with pytest.raises(BreakpointError, match="bp 0x"): await bp.add(0x08001234) async def test_remove_breakpoint_error(self, error_server, error_conn): """remove() with error response raises BreakpointError.""" error_server.add_response( r"^rbp\s+", "error: no breakpoint at address" ) bp = BreakpointManager(error_conn) with pytest.raises(BreakpointError, match="rbp 0x"): await bp.remove(0x08001234) async def test_add_watchpoint_error(self, error_server, error_conn): """add_watchpoint() with error response raises BreakpointError.""" error_server.add_response( r"^wp\s+0x", "error: no free watchpoint comparator" ) bp = BreakpointManager(error_conn) with pytest.raises(BreakpointError, match="wp 0x"): await bp.add_watchpoint(0x20000000, 4) async def test_remove_watchpoint_error(self, error_server, error_conn): """remove_watchpoint() with error response raises BreakpointError.""" error_server.add_response( r"^rwp\s+", "error: no watchpoint at address" ) bp = BreakpointManager(error_conn) with pytest.raises(BreakpointError, match="rwp 0x"): await bp.remove_watchpoint(0x20000000) # ====================================================================== # 7. Session error paths # ====================================================================== class TestSessionErrors: """Error conditions from the Session layer.""" async def test_get_or_create_loop_from_async_context(self): """_get_or_create_loop() inside a running loop raises RuntimeError.""" with pytest.raises(RuntimeError, match="Cannot use sync API"): _get_or_create_loop() async def test_command_on_closed_session(self, mock_ocd): """session.command() after close() raises ConnectionError.""" host, port, _server = mock_ocd sess = await Session.connect(host, port, timeout=5.0) await sess.close() with pytest.raises(ConnectionError, match="Not connected"): await sess.command("targets") async def test_connect_to_nonexistent_host(self): """Session.connect() to a bogus address raises ConnectionError.""" with pytest.raises(ConnectionError): await Session.connect("127.0.0.1", 1, timeout=1.0) async def test_double_close_is_safe(self, mock_ocd): """Calling close() twice on a session should not raise.""" host, port, _server = mock_ocd sess = await Session.connect(host, port, timeout=5.0) await sess.close() await sess.close() # should be a no-op # ====================================================================== # 8. Process error paths # ====================================================================== class TestProcessErrors: """Error conditions from the OpenOCDProcess manager.""" async def test_start_empty_config(self): """start() with an empty config string raises ProcessError.""" proc = OpenOCDProcess() with pytest.raises(ProcessError, match="[Ee]mpty config"): # Use /bin/true as a stand-in binary so we reach config validation await proc.start("", openocd_bin="/bin/true") async def test_start_dangling_flag(self): """start() with a trailing -f and no argument raises ProcessError.""" proc = OpenOCDProcess() with pytest.raises(ProcessError, match="requires an argument"): await proc.start("-f", openocd_bin="/bin/true") async def test_start_dangling_c_flag(self): """start() with a trailing -c and no argument raises ProcessError.""" proc = OpenOCDProcess() with pytest.raises(ProcessError, match="requires an argument"): await proc.start("-c", openocd_bin="/bin/true") async def test_start_nonexistent_binary(self): """start() with a nonexistent binary path raises ProcessError.""" proc = OpenOCDProcess() with pytest.raises(ProcessError): await proc.start( "interface/cmsis-dap.cfg", openocd_bin="/nonexistent/path/to/openocd", ) async def test_pid_is_none_before_start(self): """pid property is None before start().""" proc = OpenOCDProcess() assert proc.pid is None async def test_running_is_false_before_start(self): """running property is False before start().""" proc = OpenOCDProcess() assert proc.running is False async def test_stop_before_start_is_safe(self): """stop() before start() should not raise.""" proc = OpenOCDProcess() await proc.stop() # no-op, no exception # ====================================================================== # 9. Notification connection error paths # ====================================================================== class TestNotificationErrors: """Error conditions for the notification subsystem.""" async def test_enable_notifications_before_connect(self): """enable_notifications() before connect() raises ConnectionError.""" conn = TclRpcConnection(timeout=1.0) with pytest.raises(ConnectionError, match="Not connected"): await conn.enable_notifications()