From 7223fcf8104691107d1ca7fee5caf40a8c93b532 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Tue, 17 Feb 2026 13:46:14 -0700 Subject: [PATCH] Add Hamilton adversarial test suite for firmware safety validation 55-test suite covering operator error, invalid inputs, state machine violations, boundary conditions, and rapid-fire stress. Verifies all Phase E safety fixes (timeout protection, watchdog, error propagation, DiSEqC rejection) survive malformed commands without hanging or corrupting device state. --- tools/test_hamilton.py | 444 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 444 insertions(+) create mode 100644 tools/test_hamilton.py diff --git a/tools/test_hamilton.py b/tools/test_hamilton.py new file mode 100644 index 0000000..b0f1481 --- /dev/null +++ b/tools/test_hamilton.py @@ -0,0 +1,444 @@ +#!/usr/bin/env python3 +""" +Hamilton Adversarial Test Suite — SkyWalker-1 v3.05.0 + +"What happens if the astronaut pushes the wrong button?" + +Tests operator error, invalid inputs, state machine violations, +boundary conditions, and rapid-fire stress to verify all safety +fixes from the Phase E Margaret Hamilton review. +""" + +import sys +import os +import time +import struct + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from skywalker_lib import SkyWalker1 +import usb.core + +ERR_NAMES = { + 0x00: 'OK', 0x01: 'I2C_TIMEOUT', 0x02: 'I2C_NAK', 0x03: 'BCM_TIMEOUT', + 0x04: 'BCM_NOT_READY', 0x05: 'BCM_VERIFY', 0x06: 'TUNE_FAIL', + 0x07: 'EP0_TIMEOUT', 0x08: 'GPIF_TIMEOUT', 0x09: 'EP2_TIMEOUT', + 0x0A: 'NOT_SUPPORTED', 0x0B: 'DISEQC_LEN', 0x0C: 'DISEQC_TIMER', + 0x0D: 'WDT_FIRED' +} + +passed = 0 +failed = 0 + + +def get_err(sw): + return sw.dev.ctrl_transfer(0xC0, 0xBC, 0, 0, 1)[0] + + +def err_name(code): + return ERR_NAMES.get(code, f'0x{code:02X}') + + +def device_alive(sw): + try: + fw = sw.get_fw_version() + return fw['version'] == '3.05.0' + except Exception: + return False + + +def test(sw, label, fn, expect_err=None, expect_no_hang=False): + """Run test, track error changes, verify device survives.""" + global passed, failed + + err_before = get_err(sw) + usb_err = None + try: + fn() + except usb.core.USBError as e: + usb_err = e + except Exception as e: + usb_err = e + + time.sleep(0.15) + + if not device_alive(sw): + print(f' [FAIL] {label}: DEVICE DIED!') + failed += 1 + return False + + err_after = get_err(sw) + changed = (err_after != err_before) + suffix = f' (USB: {usb_err})' if usb_err else '' + + if expect_err is not None: + if err_after == expect_err: + print(f' [PASS] {label}: err={err_name(expect_err)} as expected{suffix}') + passed += 1 + else: + print(f' [FAIL] {label}: expected {err_name(expect_err)}, got {err_name(err_after)}{suffix}') + failed += 1 + elif expect_no_hang: + print(f' [PASS] {label}: no hang, err={err_name(err_after)}{suffix}') + passed += 1 + else: + if changed: + print(f' [INFO] {label}: err changed {err_name(err_before)} -> {err_name(err_after)}{suffix}') + else: + print(f' [PASS] {label}: no new error{suffix}') + passed += 1 + + return True + + +def main(): + global passed, failed + + with SkyWalker1() as sw: + print('=' * 64) + print(' HAMILTON ADVERSARIAL TEST SUITE — SkyWalker-1 v3.05.0') + print(' "What if the astronaut pushes the wrong button?"') + print('=' * 64) + print() + + # Ensure clean starting state + sw.dev.ctrl_transfer(0xC0, 0x89, 1, 0, 3, timeout=10000) + sw.start_intersil(True) + time.sleep(0.5) + + # ============================================================ + print('=== CAT 1: DiSEqC Message Abuse ===') + print() + + test(sw, '1a. Tone burst B (M3: NOT_SUPPORTED)', + lambda: sw.send_diseqc_tone_burst(1), + expect_err=0x0A) + + test(sw, '1b. Tone burst wValue=0xFF', + lambda: sw.dev.ctrl_transfer(0x40, 0x8D, 0xFF, 0, None, timeout=3000), + expect_err=0x0A) + + test(sw, '1c. DiSEqC 2 bytes (too short)', + lambda: sw.dev.ctrl_transfer(0x40, 0x8D, 0xE0, 0, bytes([0xE0, 0x10]), timeout=3000), + expect_no_hang=True) + + test(sw, '1d. DiSEqC 8 bytes (too long)', + lambda: sw.dev.ctrl_transfer(0x40, 0x8D, 0xE0, 0, bytes([0xE0] * 8), timeout=3000), + expect_no_hang=True) + + test(sw, '1e. DiSEqC empty payload', + lambda: sw.dev.ctrl_transfer(0x40, 0x8D, 0xE0, 0, bytes([]), timeout=3000), + expect_no_hang=True) + + test(sw, '1f. Valid 4-byte DiSEqC (recovery)', + lambda: sw.send_diseqc_message(bytes([0xE0, 0x10, 0x38, 0xF0])), + expect_no_hang=True) + + test(sw, '1g. DiSEqC 1.2 motor halt (no motor)', + lambda: sw.send_diseqc_message(bytes([0xE0, 0x31, 0x60])), + expect_no_hang=True) + + test(sw, '1h. DiSEqC 1.2 drive east 255 steps', + lambda: sw.send_diseqc_message(bytes([0xE0, 0x31, 0x68, 0xFF])), + expect_no_hang=True) + + test(sw, '1i. DiSEqC 1.2 USALS GotoX (bogus angle)', + lambda: sw.send_diseqc_message(bytes([0xE0, 0x31, 0x6E, 0xFF, 0xFF])), + expect_no_hang=True) + + print() + + # ============================================================ + print('=== CAT 2: Tune Parameter Abuse ===') + print() + + test(sw, '2a. SR=0', + lambda: [sw.tune(0, 1000000, 0, 0), time.sleep(0.5)], + expect_no_hang=True) + + test(sw, '2b. SR=0xFFFFFFFF', + lambda: [sw.dev.ctrl_transfer(0x40, 0x86, 0, 0, + struct.pack('> Powering off BCM4500...') + sw.dev.ctrl_transfer(0xC0, 0x89, 0, 0, 3, timeout=5000) + time.sleep(0.5) + + test(sw, '4b. Tune with BCM off', + lambda: [sw.tune(20000000, 1000000, 0, 0), time.sleep(0.5)], + expect_err=0x04) # BCM_NOT_READY + + test(sw, '4c. Signal monitor with BCM off', + lambda: sw.dev.ctrl_transfer(0xC0, 0xB7, 0, 0, 8, timeout=3000), + expect_no_hang=True) + + test(sw, '4d. I2C bus scan with BCM off', + lambda: sw.dev.ctrl_transfer(0xC0, 0xB4, 0, 0, 16, timeout=5000), + expect_no_hang=True) + + test(sw, '4e. Hotplug rescan with BCM off', + lambda: sw.dev.ctrl_transfer(0xC0, 0xBE, 2, 0, 36, timeout=5000), + expect_no_hang=True) + + # 4f. Recovery + print(' >> Re-booting BCM4500...') + r = sw.dev.ctrl_transfer(0xC0, 0x89, 1, 0, 3, timeout=10000) + time.sleep(0.5) + cfg = sw.get_config() + if cfg & 0x03 == 0x03: + print(f' [PASS] 4f. Recovery: config=0x{cfg:02X} (STARTED|FW_LOADED)') + passed += 1 + else: + print(f' [FAIL] 4f. No recovery: config=0x{cfg:02X}') + failed += 1 + + # 4g. Arm/disarm rapid toggle + test(sw, '4g. Arm + immediate disarm', + lambda: [sw.arm_transfer(True), sw.arm_transfer(False)], + expect_no_hang=True) + + # 4h. Disarm when not armed + test(sw, '4h. Disarm when not armed', + lambda: sw.arm_transfer(False), + expect_no_hang=True) + + # 4i. Boot off/on/off/on rapid + test(sw, '4i. Rapid boot toggle (off-on-off-on)', + lambda: [ + sw.dev.ctrl_transfer(0xC0, 0x89, 0, 0, 3, timeout=5000), + time.sleep(0.2), + sw.dev.ctrl_transfer(0xC0, 0x89, 1, 0, 3, timeout=10000), + time.sleep(0.3), + sw.dev.ctrl_transfer(0xC0, 0x89, 0, 0, 3, timeout=5000), + time.sleep(0.2), + sw.dev.ctrl_transfer(0xC0, 0x89, 1, 0, 3, timeout=10000), + time.sleep(0.3), + ], + expect_no_hang=True) + + print() + + # ============================================================ + print('=== CAT 5: Boundary & Buffer Abuse ===') + print() + + # 5a. Request 0 bytes from GET_CONFIG + test(sw, '5a. GET_CONFIG request 0 bytes', + lambda: sw.dev.ctrl_transfer(0xC0, 0x80, 0, 0, 0, timeout=2000), + expect_no_hang=True) + + # 5b. Request 64 bytes from GET_CONFIG (returns 1) + test(sw, '5b. GET_CONFIG request 64 bytes', + lambda: sw.dev.ctrl_transfer(0xC0, 0x80, 0, 0, 64, timeout=2000), + expect_no_hang=True) + + # 5c. Request 64 bytes from GET_LAST_ERROR (returns 1) + test(sw, '5c. GET_LAST_ERROR request 64 bytes', + lambda: sw.dev.ctrl_transfer(0xC0, 0xBC, 0, 0, 64, timeout=2000), + expect_no_hang=True) + + # 5d. GET_FW_VERS request 1 byte (returns 6) + test(sw, '5d. GET_FW_VERS request 1 byte', + lambda: sw.dev.ctrl_transfer(0xC0, 0x92, 0, 0, 1, timeout=2000), + expect_no_hang=True) + + # 5e. GET_STREAM_DIAG request 1 byte (returns 12) + test(sw, '5e. GET_STREAM_DIAG request 1 byte', + lambda: sw.dev.ctrl_transfer(0xC0, 0xBD, 0, 0, 1, timeout=2000), + expect_no_hang=True) + + # 5f. GET_STREAM_DIAG with wval=0xFFFF (reset flag, but not 1) + test(sw, '5f. GET_STREAM_DIAG wval=0xFFFF', + lambda: sw.dev.ctrl_transfer(0xC0, 0xBD, 0xFFFF, 0, 12, timeout=2000), + expect_no_hang=True) + + # 5g. GET_HOTPLUG with wval=0xFFFF (unknown sub-command) + test(sw, '5g. GET_HOTPLUG wval=0xFFFF', + lambda: sw.dev.ctrl_transfer(0xC0, 0xBE, 0xFFFF, 0, 36, timeout=2000), + expect_no_hang=True) + + print() + + # ============================================================ + print('=== CAT 6: Rapid-Fire Stress ===') + print() + + t0 = time.time() + for i in range(200): + sw.get_config() + dt = time.time() - t0 + print(f' [PASS] 6a. 200 config reads: {dt * 1000:.0f}ms ({dt / 200 * 1000:.1f}ms/read)') + passed += 1 + + t0 = time.time() + for i in range(50): + get_err(sw) + dt = time.time() - t0 + print(f' [PASS] 6b. 50 error reads: {dt * 1000:.0f}ms ({dt / 50 * 1000:.1f}ms/read)') + passed += 1 + + t0 = time.time() + errs = 0 + for i in range(30): + try: + sw.signal_monitor() + except Exception: + errs += 1 + dt = time.time() - t0 + print(f' [PASS] 6c. 30 signal monitors: {dt * 1000:.0f}ms ({errs} errors)') + passed += 1 + + sw.start_intersil(True) + time.sleep(0.1) + t0 = time.time() + for i in range(40): + sw.set_lnb_voltage(i % 2 == 0) + dt = time.time() - t0 + print(f' [PASS] 6d. 40 voltage toggles: {dt * 1000:.0f}ms') + passed += 1 + + t0 = time.time() + for i in range(10): + try: + sw.send_diseqc_message(bytes([0xE0, 0x10, 0x38, 0xF0 | (i & 3)])) + time.sleep(0.05) + except Exception: + pass + dt = time.time() - t0 + print(f' [PASS] 6e. 10 DiSEqC msgs: {dt * 1000:.0f}ms') + passed += 1 + + print() + + # ============================================================ + print('=== CAT 7: Invalid Vendor Commands ===') + print() + + for cmd, name in [(0xFF, '0xFF'), (0x01, '0x01'), (0x50, '0x50'), + (0xFE, '0xFE'), (0x00, '0x00'), (0x79, '0x79')]: + try: + r = sw.dev.ctrl_transfer(0xC0, cmd, 0, 0, 1, timeout=2000) + print(f' [INFO] 7. Cmd {name}: accepted (0x{r[0]:02X})') + except usb.core.USBError: + print(f' [PASS] 7. Cmd {name}: STALL (rejected)') + passed += 1 + + test(sw, '7g. GET_CONFIG as OUT direction', + lambda: sw.dev.ctrl_transfer(0x40, 0x80, 0, 0, bytes([0xAA]), timeout=2000), + expect_no_hang=True) + + test(sw, '7h. 64-byte payload to GET_CONFIG', + lambda: sw.dev.ctrl_transfer(0x40, 0x80, 0, 0, bytes(64), timeout=2000), + expect_no_hang=True) + + print() + + # ============================================================ + # CLEANUP + # ============================================================ + sw.set_22khz_tone(False) + sw.set_lnb_voltage(False) + sw.start_intersil(False) + time.sleep(0.2) + + alive = device_alive(sw) + final_err = get_err(sw) + + print('=' * 64) + print(f' HAMILTON ADVERSARIAL TEST — FINAL RESULTS') + print(f' -----------------------------------------') + print(f' Tests passed: {passed}') + print(f' Tests failed: {failed}') + print(f' Device alive: {alive}') + print(f' Final error: 0x{final_err:02X} [{err_name(final_err)}]') + print(f' Watchdog fired: {"YES!" if final_err == 0x0D else "No"}') + verdict = 'PASS' if failed == 0 and alive else 'FAIL' + print(f' Verdict: {verdict}') + print('=' * 64) + + +if __name__ == '__main__': + main()