diff --git a/mcp/skywalker-mcp/src/skywalker_mcp/server.py b/mcp/skywalker-mcp/src/skywalker_mcp/server.py index 94c627e..930c652 100644 --- a/mcp/skywalker-mcp/src/skywalker_mcp/server.py +++ b/mcp/skywalker-mcp/src/skywalker_mcp/server.py @@ -16,7 +16,7 @@ from pathlib import Path from fastmcp import FastMCP, Context # Add the tools directory to path so we can import the hardware library -_TOOLS_DIR = Path(__file__).resolve().parents[3] / "tools" +_TOOLS_DIR = Path(__file__).resolve().parents[4] / "tools" if str(_TOOLS_DIR) not in sys.path: sys.path.insert(0, str(_TOOLS_DIR)) @@ -34,6 +34,9 @@ from signal_analysis import adaptive_noise_floor, detect_peaks_enhanced # noqa: from survey_engine import SurveyEngine # noqa: E402 +MOTOR_WATCHDOG_SECS = 30 + + class DeviceBridge: """Thread-safe wrapper around SkyWalker1 for MCP tool access. @@ -42,11 +45,16 @@ class DeviceBridge: Internal access pattern: always use `with bridge.lock:` then `bridge._dev.method()`. The `call()` convenience method does this automatically for simple cases. + + Includes a motor watchdog: when continuous drive is started, a background + asyncio task will automatically halt the motor after MOTOR_WATCHDOG_SECS + unless cancelled by a halt or new motor command. """ def __init__(self, device: SkyWalker1): self._dev = device self._lock = threading.RLock() + self._motor_watchdog: asyncio.Task | None = None def call(self, method_name: str, *args, **kwargs): """Call a SkyWalker1 method under the lock.""" @@ -57,6 +65,34 @@ class DeviceBridge: def lock(self) -> threading.RLock: return self._lock + def cancel_motor_watchdog(self): + """Cancel any running motor watchdog timer.""" + if self._motor_watchdog is not None and not self._motor_watchdog.done(): + self._motor_watchdog.cancel() + self._motor_watchdog = None + + def start_motor_watchdog(self, timeout: float = MOTOR_WATCHDOG_SECS): + """Start or restart the motor watchdog timer. + + After `timeout` seconds, the motor is automatically halted. + Any subsequent motor command or explicit halt cancels the watchdog. + """ + self.cancel_motor_watchdog() + + async def _watchdog(): + await asyncio.sleep(timeout) + print( + f"skywalker-mcp: MOTOR WATCHDOG fired after {timeout}s — halting motor", + file=sys.stderr, + ) + with self._lock: + try: + self._dev.motor_halt() + except Exception as e: + print(f"skywalker-mcp: watchdog halt failed: {e}", file=sys.stderr) + + self._motor_watchdog = asyncio.create_task(_watchdog()) + # Global bridge reference, set during lifespan _bridge: DeviceBridge | None = None @@ -74,6 +110,8 @@ async def lifespan(server: FastMCP): print(f"skywalker-mcp: device open, fw {dev.get_fw_version()['version']}", file=sys.stderr) yield {"bridge": _bridge} finally: + if _bridge is not None: + _bridge.cancel_motor_watchdog() _bridge = None dev.close() print("skywalker-mcp: device closed", file=sys.stderr) @@ -81,9 +119,9 @@ async def lifespan(server: FastMCP): mcp = FastMCP( "skywalker-mcp", - description="MCP server for the Genpix SkyWalker-1 DVB-S USB receiver. " - "Provides spectrum sweep, signal monitoring, carrier survey, " - "dish motor control, and transport stream analysis.", + instructions="MCP server for the Genpix SkyWalker-1 DVB-S USB receiver. " + "Provides spectrum sweep, signal monitoring, carrier survey, " + "dish motor control, and transport stream analysis.", lifespan=lifespan, ) @@ -438,7 +476,8 @@ async def move_dish( else: bridge._dev.motor_drive_west(steps) mode = "continuous (send halt to stop)" if steps == 0 else "stepped" - return {"action": action, "steps": steps, "mode": mode, "status": "driving"} + return {"action": action, "steps": steps, "mode": mode, "status": "driving", + "continuous": steps == 0} elif action == "goto": slot = int(value) @@ -463,7 +502,24 @@ async def move_dish( else: return {"error": f"Unknown action '{action}'. Valid: halt, east, west, goto, gotox"} - return await asyncio.to_thread(_move) + result = await asyncio.to_thread(_move) + + # Motor watchdog management: cancel on halt/goto/gotox, start on continuous drive + if "error" not in result: + if action == "halt": + bridge.cancel_motor_watchdog() + elif action in ("goto", "gotox"): + # GotoX/Goto have inherent motor-stop at destination + bridge.cancel_motor_watchdog() + elif result.get("continuous"): + bridge.start_motor_watchdog() + result["watchdog_secs"] = MOTOR_WATCHDOG_SECS + result["warning"] = ( + f"Motor watchdog active: auto-halt in {MOTOR_WATCHDOG_SECS}s. " + "Send action='halt' to stop sooner." + ) + + return result @mcp.tool() diff --git a/mcp/skywalker-mcp/tests/conftest.py b/mcp/skywalker-mcp/tests/conftest.py new file mode 100644 index 0000000..dd6ba73 --- /dev/null +++ b/mcp/skywalker-mcp/tests/conftest.py @@ -0,0 +1,176 @@ +"""Shared fixtures for skywalker-mcp tests.""" + +from unittest.mock import MagicMock + +import pytest + +import skywalker_mcp.server as srv +from skywalker_mcp.server import DeviceBridge + + +class MockSkyWalker1: + """Mock SkyWalker1 device that returns plausible data without USB hardware.""" + + def __init__(self, verbose=False): + self.verbose = verbose + self._motor_halted = False + self._armed = False + self._lnb_on = True + self._calls = [] + + def _record(self, method, *args, **kwargs): + self._calls.append((method, args, kwargs)) + + def open(self): + self._record("open") + + def close(self): + self._record("close") + + def ensure_booted(self): + self._record("ensure_booted") + + def get_fw_version(self): + self._record("get_fw_version") + return {"version": "3.05.0-test", "date": "2026-02-17", "raw": b"\x03\x05\x00"} + + def get_config(self): + self._record("get_config") + return 0x3F + + def get_usb_speed(self): + self._record("get_usb_speed") + return 2 + + def get_serial_number(self): + self._record("get_serial_number") + return bytes([0xDE, 0xAD, 0xBE, 0xEF]) + + def get_last_error(self): + self._record("get_last_error") + return 0x00 + + def signal_monitor(self): + self._record("signal_monitor") + return { + "snr_raw": 200, "snr_db": 8.5, "snr_pct": 42.5, + "agc1": 1200, "agc2": 800, "power_db": -45.3, + "locked": True, "lock": 0x1F, "status": 0x01, + } + + def get_stream_diag(self, reset=False): + self._record("get_stream_diag", reset=reset) + return {"poll_count": 100, "overflow_count": 0, "sync_loss": 0, "armed": self._armed} + + def sweep_spectrum(self, start_mhz, stop_mhz, step_mhz=5.0, dwell_ms=15): + self._record("sweep_spectrum", start_mhz, stop_mhz, step_mhz=step_mhz, dwell_ms=dwell_ms) + n_points = int((stop_mhz - start_mhz) / step_mhz) + 1 + freqs = [start_mhz + i * step_mhz for i in range(n_points)] + powers = [] + for f in freqs: + base = -50.0 + if abs(f - 1420.0) < 5: + base += 8.0 * (1.0 - abs(f - 1420.0) / 5.0) + powers.append(base) + raw = [(int((p + 60) * 100), 0) for p in powers] + return freqs, powers, raw + + def tune_monitor(self, sr_sps, freq_khz, mod_idx, fec_idx, dwell_ms): + self._record("tune_monitor", sr_sps, freq_khz, mod_idx, fec_idx, dwell_ms) + return { + "snr_raw": 180, "snr_db": 7.8, "snr_pct": 39.0, + "agc1": 1100, "agc2": 750, "power_db": -46.1, + "locked": True, "lock": 0x1F, "status": 0x01, + "dwell_ms": dwell_ms, + } + + def adaptive_blind_scan(self, freq_khz, sr_min, sr_max, sr_step): + self._record("adaptive_blind_scan", freq_khz, sr_min, sr_max, sr_step) + return {"freq_khz": freq_khz, "locked": True, "sr_sps": 20000000} + + def motor_halt(self): + self._record("motor_halt") + self._motor_halted = True + + def motor_drive_east(self, steps): + self._record("motor_drive_east", steps) + self._motor_halted = False + + def motor_drive_west(self, steps): + self._record("motor_drive_west", steps) + self._motor_halted = False + + def motor_goto_position(self, slot): + self._record("motor_goto_position", slot) + + def motor_goto_x(self, observer_lon, sat_lon): + self._record("motor_goto_x", observer_lon, sat_lon) + + def motor_store_position(self, slot): + self._record("motor_store_position", slot) + + def start_intersil(self, on=True): + self._record("start_intersil", on=on) + self._lnb_on = on + + def set_lnb_voltage(self, high): + self._record("set_lnb_voltage", high) + + def set_22khz_tone(self, on): + self._record("set_22khz_tone", on) + + def i2c_bus_scan(self): + self._record("i2c_bus_scan") + return [0x08, 0x61, 0x51] + + def i2c_raw_read(self, slave, register): + self._record("i2c_raw_read", slave, register) + return 0xAB + + def arm_transfer(self, on): + self._record("arm_transfer", on) + self._armed = on + + def read_stream(self, timeout=500): + self._record("read_stream", timeout=timeout) + if self._armed: + return bytes([0x47, 0x00, 0x00, 0x10] + [0xFF] * 184) + return None + + +class MockContext: + """Minimal mock of FastMCP Context for direct tool function calls. + + Provides the bridge via request_context.lifespan_context["bridge"], + matching what _get_bridge(ctx) expects. + """ + + def __init__(self, bridge: DeviceBridge): + self.request_context = MagicMock() + self.request_context.lifespan_context = {"bridge": bridge} + self._progress = [] + + async def report_progress(self, current, total): + self._progress.append((current, total)) + + +@pytest.fixture +def mock_device(): + """Provide a fresh MockSkyWalker1 instance.""" + return MockSkyWalker1() + + +@pytest.fixture +def bridge(mock_device): + """Provide a DeviceBridge wrapping the mock device.""" + b = DeviceBridge(mock_device) + srv._bridge = b + yield b + b.cancel_motor_watchdog() + srv._bridge = None + + +@pytest.fixture +def ctx(bridge): + """Provide a MockContext wired to the bridge.""" + return MockContext(bridge) diff --git a/mcp/skywalker-mcp/tests/test_server.py b/mcp/skywalker-mcp/tests/test_server.py new file mode 100644 index 0000000..083bac6 --- /dev/null +++ b/mcp/skywalker-mcp/tests/test_server.py @@ -0,0 +1,503 @@ +"""Tests for skywalker-mcp server tools, validation, and safety. + +Calls async tool functions directly with a MockContext. No real USB hardware +or MCP transport needed — tests validation, safety, and response formatting. +""" + +import asyncio + +import pytest + +from skywalker_mcp.server import ( + get_device_status as _get_device_status, + get_signal_quality as _get_signal_quality, + get_stream_diagnostics as _get_stream_diagnostics, + sweep_spectrum as _sweep_spectrum, + tune_frequency as _tune_frequency, + run_blind_scan as _run_blind_scan, + move_dish as _move_dish, + jog_dish as _jog_dish, + store_position as _store_position, + set_lnb_config as _set_lnb_config, + scan_i2c_bus as _scan_i2c_bus, + read_i2c_register as _read_i2c_register, + capture_transport_stream as _capture_transport_stream, + identify_frequency as _identify_frequency, + compare_surveys as _compare_surveys, + mcp, + MOTOR_WATCHDOG_SECS, +) + +# Unwrap FastMCP Tool objects → raw async functions for direct testing. +# @mcp.tool() wraps each function as a Tool(fn=...) Pydantic model; +# .fn gives us the original async def we can call with MockContext. +get_device_status = _get_device_status.fn +get_signal_quality = _get_signal_quality.fn +get_stream_diagnostics = _get_stream_diagnostics.fn +sweep_spectrum = _sweep_spectrum.fn +tune_frequency = _tune_frequency.fn +run_blind_scan = _run_blind_scan.fn +move_dish = _move_dish.fn +jog_dish = _jog_dish.fn +store_position = _store_position.fn +set_lnb_config = _set_lnb_config.fn +scan_i2c_bus = _scan_i2c_bus.fn +read_i2c_register = _read_i2c_register.fn +capture_transport_stream = _capture_transport_stream.fn +identify_frequency = _identify_frequency.fn +compare_surveys = _compare_surveys.fn + + +# ───────────────────────────────────────────── +# Tool Registration +# ───────────────────────────────────────────── + +def test_tool_count(): + """17 tools should be registered.""" + assert len(mcp._tool_manager._tools) == 17 + + +def test_tool_names(): + """All expected tool names are present.""" + names = set(mcp._tool_manager._tools.keys()) + expected = { + "get_device_status", "get_signal_quality", "get_stream_diagnostics", + "sweep_spectrum", "tune_frequency", "run_blind_scan", + "run_carrier_survey", "compare_surveys", "list_surveys", + "move_dish", "jog_dish", "store_position", + "set_lnb_config", "scan_i2c_bus", "read_i2c_register", + "capture_transport_stream", "identify_frequency", + } + assert expected == names + + +def test_resource_count(): + """4 resources registered.""" + assert len(mcp._resource_manager._resources) == 4 + + +def test_prompt_count(): + """2 prompts registered.""" + assert len(mcp._prompt_manager._prompts) == 2 + + +# ───────────────────────────────────────────── +# Device Status Tools +# ───────────────────────────────────────────── + +async def test_get_device_status(ctx): + result = await get_device_status(ctx) + assert "3.05.0-test" in result["firmware"]["version"] + assert result["usb_speed"] == "High (480 Mbps)" + assert "de ad be ef" in result["serial"] + + +async def test_get_signal_quality(ctx): + result = await get_signal_quality(ctx) + assert result["snr_db"] == 8.5 + assert result["locked"] is True + assert result["agc1"] == 1200 + + +async def test_get_stream_diagnostics(ctx): + result = await get_stream_diagnostics(ctx) + assert result["poll_count"] == 100 + assert result["overflow_count"] == 0 + + +# ───────────────────────────────────────────── +# Spectrum Sweep Validation +# ───────────────────────────────────────────── + +async def test_sweep_defaults(ctx): + result = await sweep_spectrum(ctx) + assert result["start_mhz"] == 950.0 + assert result["stop_mhz"] == 2150.0 + assert result["num_points"] > 0 + assert len(result["frequencies_mhz"]) == result["num_points"] + assert len(result["powers_db"]) == result["num_points"] + + +async def test_sweep_narrow_band(ctx): + result = await sweep_spectrum(ctx, start_mhz=1418.0, stop_mhz=1423.0, step_mhz=0.5) + assert result["num_points"] == 11 + assert result["step_mhz"] == 0.5 + + +async def test_sweep_freq_below_range(ctx): + result = await sweep_spectrum(ctx, start_mhz=800.0) + assert "error" in result + assert "950" in result["error"] + + +async def test_sweep_freq_above_range(ctx): + result = await sweep_spectrum(ctx, stop_mhz=3000.0) + assert "error" in result + assert "2150" in result["error"] + + +async def test_sweep_start_gt_stop(ctx): + result = await sweep_spectrum(ctx, start_mhz=1500.0, stop_mhz=1000.0) + assert "error" in result + assert "less than" in result["error"] + + +async def test_sweep_bad_step(ctx): + result = await sweep_spectrum(ctx, step_mhz=0.01) + assert "error" in result + assert "step_mhz" in result["error"] + + +async def test_sweep_step_too_large(ctx): + result = await sweep_spectrum(ctx, step_mhz=200.0) + assert "error" in result + + +async def test_sweep_bad_dwell(ctx): + result = await sweep_spectrum(ctx, dwell_ms=0) + assert "error" in result + assert "dwell_ms" in result["error"] + + +async def test_sweep_dwell_too_high(ctx): + result = await sweep_spectrum(ctx, dwell_ms=300) + assert "error" in result + + +# ───────────────────────────────────────────── +# Tune Frequency Validation +# ───────────────────────────────────────────── + +async def test_tune_valid(ctx): + result = await tune_frequency(ctx, freq_mhz=1420.0, symbol_rate_ksps=5000) + assert result["locked"] is True + assert result["freq_mhz"] == 1420.0 + assert result["modulation"] == "qpsk" + + +async def test_tune_below_range(ctx): + result = await tune_frequency(ctx, freq_mhz=500.0) + assert "error" in result + assert "950" in result["error"] + + +async def test_tune_above_range(ctx): + result = await tune_frequency(ctx, freq_mhz=2200.0) + assert "error" in result + + +async def test_tune_bad_sr_low(ctx): + result = await tune_frequency(ctx, freq_mhz=1200.0, symbol_rate_ksps=100) + assert "error" in result + assert "256" in result["error"] + + +async def test_tune_bad_sr_high(ctx): + result = await tune_frequency(ctx, freq_mhz=1200.0, symbol_rate_ksps=50000) + assert "error" in result + assert "30000" in result["error"] + + +async def test_tune_bad_modulation(ctx): + result = await tune_frequency(ctx, freq_mhz=1200.0, modulation="dvb-s2") + assert "error" in result + assert "dvb-s2" in result["error"] + + +async def test_tune_bad_dwell(ctx): + result = await tune_frequency(ctx, freq_mhz=1200.0, dwell_ms=0) + assert "error" in result + + +# ───────────────────────────────────────────── +# Blind Scan Validation +# ───────────────────────────────────────────── + +async def test_blind_scan_valid(ctx): + result = await run_blind_scan(ctx, freq_mhz=1200.0) + assert result["locked"] is True + assert result["sr_ksps"] == 20000.0 + + +async def test_blind_scan_freq_below(ctx): + result = await run_blind_scan(ctx, freq_mhz=500.0) + assert "error" in result + + +async def test_blind_scan_sr_below_min(ctx): + result = await run_blind_scan(ctx, freq_mhz=1200.0, sr_min_ksps=100) + assert "error" in result + assert "256" in result["error"] + + +async def test_blind_scan_sr_above_max(ctx): + result = await run_blind_scan(ctx, freq_mhz=1200.0, sr_max_ksps=50000) + assert "error" in result + assert "30000" in result["error"] + + +# ───────────────────────────────────────────── +# Motor Safety Tests +# ───────────────────────────────────────────── + +async def test_motor_halt(ctx, mock_device): + result = await move_dish(ctx, action="halt") + assert result["action"] == "halt" + assert result["status"] == "stopped" + assert mock_device._motor_halted is True + + +async def test_motor_east_stepped(ctx): + result = await move_dish(ctx, action="east", value=10) + assert result["steps"] == 10 + assert result["mode"] == "stepped" + + +async def test_motor_west_stepped(ctx): + result = await move_dish(ctx, action="west", value=5) + assert result["steps"] == 5 + assert result["action"] == "west" + + +async def test_motor_continuous_rejected(ctx): + """Continuous drive (steps=0) without explicit flag is rejected.""" + result = await move_dish(ctx, action="east", value=0) + assert "error" in result + assert "CONTINUOUS" in result["error"] + assert "continuous=True" in result["error"] + + +async def test_motor_continuous_with_flag(ctx): + """Continuous drive with explicit flag succeeds and starts watchdog.""" + result = await move_dish(ctx, action="west", value=0, continuous=True) + assert result["status"] == "driving" + assert result["continuous"] is True + assert result["watchdog_secs"] == MOTOR_WATCHDOG_SECS + assert "warning" in result + + +async def test_motor_steps_negative(ctx): + result = await move_dish(ctx, action="east", value=-5) + assert "error" in result + assert "0-127" in result["error"] + + +async def test_motor_steps_too_high(ctx): + result = await move_dish(ctx, action="east", value=200) + assert "error" in result + assert "0-127" in result["error"] + + +async def test_motor_gotox(ctx, mock_device): + result = await move_dish(ctx, action="gotox", value=-97.0, observer_lon=-96.8) + assert result["action"] == "gotox" + assert result["satellite_lon"] == -97.0 + assert "motor_angle_deg" in result + assert ("motor_goto_x", (-96.8, -97.0), {}) in mock_device._calls + + +async def test_motor_goto_slot(ctx, mock_device): + result = await move_dish(ctx, action="goto", value=5) + assert result["slot"] == 5 + assert result["action"] == "goto" + + +async def test_motor_goto_slot_out_of_range(ctx): + result = await move_dish(ctx, action="goto", value=300) + assert "error" in result + assert "0-255" in result["error"] + + +async def test_motor_invalid_action(ctx): + result = await move_dish(ctx, action="spin") + assert "error" in result + assert "spin" in result["error"] + + +# ───────────────────────────────────────────── +# Motor Watchdog Tests +# ───────────────────────────────────────────── + +async def test_watchdog_starts_on_continuous(ctx, bridge): + """Watchdog task is created when continuous drive starts.""" + await move_dish(ctx, action="west", value=0, continuous=True) + assert bridge._motor_watchdog is not None + assert not bridge._motor_watchdog.done() + bridge.cancel_motor_watchdog() + + +async def test_watchdog_cancelled_on_halt(ctx, bridge): + """Halt cancels the watchdog.""" + await move_dish(ctx, action="east", value=0, continuous=True) + assert bridge._motor_watchdog is not None + await move_dish(ctx, action="halt") + assert bridge._motor_watchdog is None or bridge._motor_watchdog.cancelled() + + +async def test_watchdog_fires_and_halts(ctx, bridge, mock_device): + """Watchdog auto-halts after timeout.""" + bridge.start_motor_watchdog(timeout=0.1) # 100ms for test speed + await asyncio.sleep(0.3) # Wait for watchdog to fire + assert mock_device._motor_halted is True + + +# ───────────────────────────────────────────── +# Jog Dish Tests +# ───────────────────────────────────────────── + +async def test_jog_valid(ctx): + result = await jog_dish(ctx, direction="east", steps=5) + assert result["direction"] == "east" + assert result["steps"] == 5 + assert "snr_db" in result + + +async def test_jog_too_many_steps(ctx): + result = await jog_dish(ctx, direction="east", steps=50) + assert "error" in result + assert "1-30" in result["error"] + + +async def test_jog_zero_steps(ctx): + result = await jog_dish(ctx, direction="east", steps=0) + assert "error" in result + + +async def test_jog_bad_direction(ctx): + result = await jog_dish(ctx, direction="up") + assert "error" in result + assert "east" in result["error"] + + +# ───────────────────────────────────────────── +# Store Position Tests +# ───────────────────────────────────────────── + +async def test_store_position_valid(ctx, mock_device): + result = await store_position(ctx, slot=5) + assert result["stored"] is True + assert result["slot"] == 5 + + +async def test_store_position_slot_zero(ctx): + result = await store_position(ctx, slot=0) + assert "error" in result + + +async def test_store_position_slot_too_high(ctx): + result = await store_position(ctx, slot=300) + assert "error" in result + + +# ───────────────────────────────────────────── +# LNB & I2C Tests +# ───────────────────────────────────────────── + +async def test_lnb_disable(ctx, mock_device): + result = await set_lnb_config(ctx, disable_lnb=True) + assert result["lnb_power"] == "off" + assert mock_device._lnb_on is False + + +async def test_lnb_voltage(ctx, mock_device): + result = await set_lnb_config(ctx, voltage="18V") + assert result["voltage"] == "18V" + + +async def test_lnb_tone(ctx, mock_device): + result = await set_lnb_config(ctx, tone_22khz=True) + assert result["tone_22khz"] is True + + +async def test_i2c_scan(ctx): + result = await scan_i2c_bus(ctx) + assert result["device_count"] == 3 + addresses = [d["address"] for d in result["devices"]] + assert "0x08" in addresses + assert "0x61" in addresses + assert "0x51" in addresses + + +async def test_i2c_read(ctx): + result = await read_i2c_register(ctx, slave_address=0x08, register=0x00) + assert result["value"] == 0xAB + assert result["hex"] == "0xAB" + assert "0b" in result["binary"] + + +# ───────────────────────────────────────────── +# Transport Stream Validation +# ───────────────────────────────────────────── + +async def test_ts_capture_duration_too_short(ctx): + result = await capture_transport_stream(ctx, duration_secs=0.1) + assert "error" in result + assert "0.5-30" in result["error"] + + +async def test_ts_capture_duration_too_long(ctx): + result = await capture_transport_stream(ctx, duration_secs=60.0) + assert "error" in result + + +async def test_ts_capture_valid(ctx): + """Valid TS capture returns packet count (mock device is locked).""" + result = await capture_transport_stream(ctx, duration_secs=0.5) + assert result["bytes_captured"] > 0 + assert result["packets"] > 0 + + +# ───────────────────────────────────────────── +# Frequency Identification +# ───────────────────────────────────────────── + +async def test_identify_hydrogen(ctx): + result = await identify_frequency(ctx, freq_mhz=1420.405) + assert result["in_if_range"] is True + signals = [m.get("signal", "") for m in result["matches"]] + assert any("Hydrogen" in s for s in signals) + + +async def test_identify_gps_l1(ctx): + result = await identify_frequency(ctx, freq_mhz=1575.42) + signals = [m.get("signal", "") for m in result["matches"]] + assert any("GPS L1" in s for s in signals) + + +async def test_identify_gps_l5(ctx): + result = await identify_frequency(ctx, freq_mhz=1176.45) + signals = [m.get("signal", "") for m in result["matches"]] + assert any("GPS L5" in s or "Galileo E5a" in s for s in signals) + + +async def test_identify_with_lnb(ctx): + result = await identify_frequency(ctx, freq_mhz=1200.0, lnb_lo_mhz=9750.0) + assert result["rf_freq_mhz"] == 10950.0 + assert result["lnb_lo_mhz"] == 9750.0 + + +async def test_identify_no_lnb(ctx): + result = await identify_frequency(ctx, freq_mhz=1200.0) + assert result["rf_freq_mhz"] is None + assert result["lnb_lo_mhz"] is None + + +# ───────────────────────────────────────────── +# Path Traversal Protection +# ───────────────────────────────────────────── + +async def test_compare_path_traversal(ctx): + result = await compare_surveys(ctx, old_filename="../../../etc/passwd", new_filename="ok.json") + assert "error" in result + assert "plain filename" in result["error"] + + +async def test_compare_dotdot_in_name(ctx): + result = await compare_surveys(ctx, old_filename="..hidden.json", new_filename="ok.json") + assert "error" in result + + +async def test_compare_slash_in_name(ctx): + result = await compare_surveys(ctx, old_filename="subdir/file.json", new_filename="ok.json") + assert "error" in result + assert "plain filename" in result["error"]