Add birdcage-mcp FastMCP server for satellite dish control

34 tools (connection, movement, signal, system, satellite, console),
5 resources, 3 prompts. Backed by DemoDevice for offline testing.
46 tests passing against the demo backend via run_server_async.
This commit is contained in:
Ryan Malloy 2026-02-17 16:01:51 -07:00
parent 16ca4892b3
commit 8a6b99bd8c
21 changed files with 3233 additions and 0 deletions

34
mcp/pyproject.toml Normal file
View File

@ -0,0 +1,34 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "birdcage-mcp"
version = "2026.02.17"
description = "FastMCP server for Winegard satellite dish control"
license = "MIT"
requires-python = ">=3.11"
authors = [{name = "Ryan Malloy", email = "ryan@supported.systems"}]
dependencies = [
"birdcage",
"fastmcp>=2.0",
]
[project.scripts]
birdcage-mcp = "birdcage_mcp.server:main"
[tool.uv.sources]
birdcage = { path = "..", editable = true }
[tool.ruff]
target-version = "py311"
src = ["src"]
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B", "SIM"]
[tool.hatch.build.targets.wheel]
packages = ["src/birdcage_mcp"]
[dependency-groups]
dev = ["pytest>=9.0.2", "pytest-asyncio>=1.3.0"]

View File

@ -0,0 +1 @@
"""birdcage-mcp: FastMCP server for Winegard satellite dish control."""

View File

@ -0,0 +1,78 @@
"""MCP prompts — guided workflows for common dish operations."""
def register(mcp):
"""Register MCP prompts on the FastMCP instance."""
@mcp.prompt()
async def setup_wizard() -> str:
"""Step-by-step guide to connect and initialize the satellite dish.
Walks through: check status -> connect to serial port -> verify
firmware -> home motors -> confirm position.
"""
return (
"Follow these steps to set up the satellite dish:\n\n"
"1. Call `status` to check the current connection state.\n"
"2. If not connected, call `connect` with the correct port "
"and firmware variant.\n"
"3. Call `get_firmware_id` to verify the firmware version.\n"
"4. Call `home_motor` with motor_id=0 (AZ) then motor_id=1 (EL) "
"to establish reference positions.\n"
"5. Call `get_position` to verify the dish is at the expected "
"home coordinates.\n"
"6. Call `get_el_limits` to confirm the elevation operating range.\n\n"
"The dish is now ready for tracking or manual positioning."
)
@mcp.prompt()
async def satellite_tracking_guide() -> str:
"""Guide for tracking a satellite pass with the dish.
Walks through: search -> get passes -> wait for AOS -> poll
position at 1 Hz -> move dish -> detect LOS.
"""
return (
"Follow these steps to track a satellite:\n\n"
"1. Call `search_satellites` with the satellite name "
"(e.g. 'ISS', 'NOAA 19').\n"
"2. Note the NORAD ID from the search results.\n"
"3. Call `get_passes` with the NORAD ID to see upcoming passes.\n"
"4. Choose a pass with good max elevation (>30 deg is ideal).\n"
"5. When the pass is about to start (AOS time), begin a tracking "
"loop:\n"
" a. Call `get_visible_targets` to get the satellite's current "
"AZ/EL.\n"
" b. Call `move_to` with those coordinates.\n"
" c. Wait ~1 second, then repeat from (a).\n"
"6. Stop tracking when the satellite drops below your minimum "
"elevation.\n\n"
"TIP: Enable the LNA first with `enable_lna` if you want to "
"measure signal strength during the pass."
)
@mcp.prompt()
async def rf_sweep_guide() -> str:
"""Guide for performing an RF sky sweep with the dish.
Walks through: enable LNA -> baseline RSSI -> configure sweep
-> run az_sweep -> analyze results.
"""
return (
"Follow these steps for an RF sky sweep:\n\n"
"1. Call `enable_lna` to power the low-noise amplifier.\n"
"2. Call `get_rssi` to establish a baseline noise floor.\n"
"3. Decide your sweep parameters:\n"
" - start_az: Starting azimuth\n"
" - span: How many degrees to sweep\n"
" - step_cdeg: Resolution in centidegrees (100 = 1 deg)\n"
" - num_xponders: Transponders per position (1 for basic)\n"
"4. Call `az_sweep` with those parameters.\n"
"5. Analyze the returned points:\n"
" - Look for RSSI peaks above the noise floor (~500)\n"
" - Check lock=1 points for strong signals\n"
" - SNR values indicate signal quality\n\n"
"For a 2D sky map, repeat the sweep at different elevations "
"(e.g. EL 20 to 60 in 5 deg steps) using `move_motor` to "
"set each elevation before sweeping."
)

View File

@ -0,0 +1,57 @@
"""MCP resources — live data endpoints for dish state."""
import json
from fastmcp import Context
def register(mcp):
"""Register MCP resources on the FastMCP instance."""
@mcp.resource("birdcage://config")
async def config_resource(ctx: Context) -> str:
"""Current connection configuration."""
state = ctx.request_context.lifespan_context
data = {
"demo_mode": state.demo_mode,
"serial_port": state.serial_port,
"firmware": state.firmware_name,
"connected": state.connected,
}
return json.dumps(data, indent=2)
@mcp.resource("birdcage://position")
async def position_resource(ctx: Context) -> str:
"""Live dish position (queries hardware)."""
state = ctx.request_context.lifespan_context
if state.device is None:
return json.dumps({"error": "not connected"})
pos = state.device.get_position()
return json.dumps(pos, indent=2)
@mcp.resource("birdcage://firmware")
async def firmware_resource(ctx: Context) -> str:
"""Firmware identification string."""
state = ctx.request_context.lifespan_context
if state.device is None:
return json.dumps({"error": "not connected"})
fw_id = state.device.get_firmware_id()
return fw_id
@mcp.resource("birdcage://motor-dynamics")
async def motor_dynamics_resource(ctx: Context) -> str:
"""Motor velocity and acceleration limits."""
state = ctx.request_context.lifespan_context
if state.device is None:
return json.dumps({"error": "not connected"})
dynamics = state.device.get_motor_dynamics()
return json.dumps(dynamics, indent=2)
@mcp.resource("birdcage://el-limits")
async def el_limits_resource(ctx: Context) -> str:
"""Elevation angle limits (min, max, home)."""
state = ctx.request_context.lifespan_context
if state.device is None:
return json.dumps({"error": "not connected"})
limits = state.device.get_el_limits()
return json.dumps(limits, indent=2)

View File

@ -0,0 +1,92 @@
"""Birdcage MCP server — FastMCP entry point with lifespan management."""
import logging
import os
import sys
from contextlib import asynccontextmanager
from fastmcp import FastMCP
from birdcage_mcp.state import BirdcageState
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(server: FastMCP):
"""Server lifespan: configure state from environment, optionally pre-connect."""
state = BirdcageState()
demo_env = os.environ.get("BIRDCAGE_DEMO", "false").lower()
state.demo_mode = demo_env in ("1", "true", "yes")
state.serial_port = os.environ.get("BIRDCAGE_PORT", "/dev/ttyUSB0")
state.firmware_name = os.environ.get("BIRDCAGE_FIRMWARE", "g2")
craft_url = os.environ.get(
"BIRDCAGE_CRAFT_URL", "https://space.warehack.ing"
)
if state.demo_mode:
from birdcage.demo import DemoCraftClient, DemoDevice
device = DemoDevice()
device.connect()
device.initialize()
state.device = device
state.craft_client = DemoCraftClient()
state.connected = True
logger.info("Demo mode: DemoDevice + DemoCraftClient ready")
else:
from birdcage.craft_client import CraftClient
state.craft_client = CraftClient(base_url=craft_url)
logger.info(
"Hardware mode: use 'connect' tool to open serial port %s",
state.serial_port,
)
try:
yield state
finally:
if state.device is not None and hasattr(state.device, "disconnect"):
try:
state.device.disconnect()
except Exception:
logger.debug("Disconnect error during shutdown", exc_info=True)
logger.info("Birdcage MCP server shut down")
mcp = FastMCP(
"birdcage",
instructions=(
"Control a Winegard satellite dish (Carryout G2 / Trav'ler) for "
"amateur radio satellite tracking. Provides motor control, signal "
"measurement, firmware management, and orbital tracking via the "
"Craft API."
),
lifespan=lifespan,
)
# Register tool modules
from birdcage_mcp import prompts, resources # noqa: E402, F401
from birdcage_mcp.tools import ( # noqa: E402
connection,
console,
movement,
satellite,
signal,
system,
)
connection.register(mcp)
movement.register(mcp)
signal.register(mcp)
system.register(mcp)
satellite.register(mcp)
console.register(mcp)
resources.register(mcp)
prompts.register(mcp)
def main():
print("birdcage-mcp v2026.02.17", file=sys.stderr)
mcp.run()

View File

@ -0,0 +1,29 @@
"""Session state for the Birdcage MCP server."""
from dataclasses import dataclass, field
@dataclass
class BirdcageState:
"""Mutable session state shared across all MCP tools.
Populated by the server lifespan and mutated by the ``connect``
and ``disconnect`` tools. Device and craft_client are typed as
``object`` to accept both real (SerialBridge/CraftClient) and
demo (DemoDevice/DemoCraftClient) implementations without
importing them at definition time.
"""
device: object = None
craft_client: object = None
demo_mode: bool = False
serial_port: str = ""
firmware_name: str = "g2"
connected: bool = False
_errors: list[str] = field(default_factory=list)
def record_error(self, msg: str) -> None:
"""Append an error message (capped at 50 entries)."""
self._errors.append(msg)
if len(self._errors) > 50:
self._errors = self._errors[-50:]

View File

@ -0,0 +1 @@
"""Birdcage MCP tool modules."""

View File

@ -0,0 +1,112 @@
"""Connection management tools — connect, disconnect, status."""
from fastmcp import Context
from birdcage_mcp.state import BirdcageState
def register(mcp):
"""Register connection tools on the FastMCP instance."""
@mcp.tool()
async def connect(
ctx: Context,
port: str = "",
firmware: str = "",
skip_init: bool = False,
) -> dict:
"""Connect to a Winegard satellite dish via RS-422/RS-485 serial.
In demo mode this is a no-op (already connected). In hardware mode,
opens the serial port and optionally runs the firmware initialization
sequence (kills TV satellite search, enters motor submenu).
Args:
port: Serial port path (e.g. /dev/ttyUSB0). Defaults to
BIRDCAGE_PORT env var.
firmware: Firmware variant: "g2", "hal205", or "hal000".
Defaults to BIRDCAGE_FIRMWARE env var.
skip_init: If True, skip firmware init (useful for
reconnecting to a running dish).
"""
state: BirdcageState = ctx.request_context.lifespan_context
if state.demo_mode:
return {
"status": "connected",
"mode": "demo",
"message": "Demo device already connected",
}
if state.connected and state.device is not None:
return {
"status": "already_connected",
"port": state.serial_port,
}
serial_port = port or state.serial_port
fw = firmware or state.firmware_name
from birdcage.bridge import SerialBridge
from birdcage.protocol import get_protocol
protocol = get_protocol(fw)
device = SerialBridge(protocol)
device.connect(serial_port)
if not skip_init:
device.initialize()
state.device = device
state.serial_port = serial_port
state.firmware_name = fw
state.connected = True
return {
"status": "connected",
"port": serial_port,
"firmware": fw,
"initialized": not skip_init,
}
@mcp.tool()
async def disconnect(ctx: Context) -> dict:
"""Disconnect from the satellite dish and release the serial port.
In demo mode this is a no-op. In hardware mode, returns to the
root menu and closes the serial connection cleanly.
"""
state: BirdcageState = ctx.request_context.lifespan_context
if state.demo_mode:
return {"status": "demo", "message": "Demo mode — nothing to disconnect"}
if state.device is None or not state.connected:
return {"status": "not_connected"}
state.device.disconnect()
state.device = None
state.connected = False
return {"status": "disconnected"}
@mcp.tool()
async def status(ctx: Context) -> dict:
"""Get current connection status and configuration.
Returns whether the dish is connected, the serial port, firmware
variant, demo mode flag, and current menu state (if connected).
"""
state: BirdcageState = ctx.request_context.lifespan_context
result = {
"connected": state.connected,
"demo_mode": state.demo_mode,
"serial_port": state.serial_port,
"firmware": state.firmware_name,
}
if state.device is not None and hasattr(state.device, "current_menu"):
result["current_menu"] = state.device.current_menu
return result

View File

@ -0,0 +1,49 @@
"""Raw console tool — safety-gated direct firmware access."""
from fastmcp import Context
from birdcage_mcp.state import BirdcageState
# Commands that are dangerous at the root menu level.
_BLOCKED_ROOT_COMMANDS = {"q"}
def register(mcp):
"""Register console tools on the FastMCP instance."""
@mcp.tool()
async def send_raw_command(ctx: Context, command: str) -> dict:
"""Send a raw command to the firmware console and return the response.
This provides direct access to the Winegard firmware shell. The
command is sent as-is and the raw response is returned.
SAFETY: The 'q' command at root menu is BLOCKED it kills the
UART shell and requires a hardware power cycle to recover.
HAZARDOUS COMMANDS (use with caution):
- ADC 'scan' without arguments on uncalibrated AZ: deadlocks shell
- NVS 'e <idx> <value>': writes to non-volatile storage
- 'reboot': reboots the microcontroller
- 'stow': folds dish flat (modified feeds may not survive)
After a raw command, the menu state is marked as unknown.
Args:
command: The firmware command string to send.
"""
state: BirdcageState = ctx.request_context.lifespan_context
if state.device is None:
raise RuntimeError("No device connected. Use the 'connect' tool first.")
cmd_stripped = command.strip().lower()
if cmd_stripped in _BLOCKED_ROOT_COMMANDS:
return {
"error": f"Command '{command}' is blocked for safety. "
"The 'q' command kills the UART shell and requires "
"a hardware power cycle to recover.",
"blocked": True,
}
raw = state.device.send_raw(command)
return {"response": raw}

View File

@ -0,0 +1,154 @@
"""Movement tools — motor positioning, homing, and engagement."""
from fastmcp import Context
from birdcage_mcp.state import BirdcageState
def _require_device(state: BirdcageState):
"""Raise if no device is connected."""
if state.device is None:
raise RuntimeError(
"No device connected. Use the 'connect' tool first."
)
def register(mcp):
"""Register movement tools on the FastMCP instance."""
@mcp.tool()
async def get_position(ctx: Context) -> dict:
"""Query the current azimuth and elevation of the dish.
Returns {"azimuth": float, "elevation": float} in degrees.
Azimuth range is 0-455 (before cable wrap). Elevation range
depends on firmware variant (typically 18-65 deg for Carryout G2).
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
return state.device.get_position()
@mcp.tool()
async def move_to(
ctx: Context,
azimuth: float,
elevation: float,
) -> dict:
"""Move the dish to an absolute azimuth and elevation.
Both motors are commanded sequentially (AZ then EL). The firmware
queues commands, so this returns immediately the dish moves in
the background. Poll get_position to track progress.
Args:
azimuth: Target azimuth in degrees (0-455).
elevation: Target elevation in degrees (18-65 for Carryout G2).
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
state.device.move_to(azimuth, elevation)
return {
"status": "moving",
"target_azimuth": azimuth,
"target_elevation": elevation,
}
@mcp.tool()
async def move_motor(
ctx: Context,
motor_id: int,
degrees: float,
) -> dict:
"""Move a single motor to an absolute position.
Args:
motor_id: 0 for azimuth, 1 for elevation.
degrees: Target angle in degrees.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
if motor_id not in (0, 1):
raise ValueError("motor_id must be 0 (azimuth) or 1 (elevation)")
state.device.move_motor(motor_id, degrees)
axis = "azimuth" if motor_id == 0 else "elevation"
return {"status": "moving", "axis": axis, "target": degrees}
@mcp.tool()
async def home_motor(ctx: Context, motor_id: int) -> dict:
"""Home a motor to its reference position using stall detection.
The dish uses motor stalling (not limit switches) to find the
mechanical home position. Expect audible grinding during homing.
Args:
motor_id: 0 for azimuth, 1 for elevation.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
if motor_id not in (0, 1):
raise ValueError("motor_id must be 0 (azimuth) or 1 (elevation)")
state.device.home_motor(motor_id)
axis = "azimuth" if motor_id == 0 else "elevation"
return {"status": "homing", "axis": axis}
@mcp.tool()
async def engage_motors(ctx: Context) -> dict:
"""Energize the stepper motors (apply holding torque).
Motors must be engaged before movement commands will work.
They draw power while engaged even when stationary.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
state.device.engage()
return {"status": "engaged"}
@mcp.tool()
async def release_motors(ctx: Context) -> dict:
"""De-energize the stepper motors (remove holding torque).
The dish will be free to move under wind/gravity load.
Use before manual repositioning or to save power.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
state.device.release()
return {"status": "released"}
@mcp.tool()
async def stow(ctx: Context) -> dict:
"""Move the dish to stow position (AZ=0, EL=65).
Stow position is safe for transport with the standard feed horn.
WARNING: Modified feeds may not survive the stow movement.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
state.device.move_to(0.0, 65.0)
return {
"status": "stowing",
"target_azimuth": 0.0,
"target_elevation": 65.0,
}
@mcp.tool()
async def get_step_positions(ctx: Context) -> dict:
"""Read raw stepper motor positions in step counts.
Returns {"az_steps": int, "el_steps": int}. The Carryout G2 has
40000 steps/rev for AZ and 24960 steps/rev for EL.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
return state.device.get_step_positions()
@mcp.tool()
async def get_el_limits(ctx: Context) -> dict:
"""Read the firmware elevation angle limits.
Returns {"min": float, "max": float, "home": float} in degrees.
The Carryout G2 defaults are min=18, max=65, home=65.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
return state.device.get_el_limits()

View File

@ -0,0 +1,102 @@
"""Satellite tracking tools — search, passes, visible targets."""
from dataclasses import asdict
from fastmcp import Context
from birdcage_mcp.state import BirdcageState
def _require_craft(state: BirdcageState):
if state.craft_client is None:
raise RuntimeError("No Craft API client configured.")
def register(mcp):
"""Register satellite tools on the FastMCP instance."""
@mcp.tool()
async def search_satellites(
ctx: Context,
query: str,
limit: int = 20,
) -> dict:
"""Search the orbital object catalog for satellites, planets, and stars.
Uses the Craft API (space.warehack.ing) to find objects by name.
Returns matches with name, type, NORAD ID (for satellites), and
current altitude if above horizon.
Args:
query: Search term (e.g. "ISS", "NOAA", "Moon").
limit: Maximum results to return.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_craft(state)
results = state.craft_client.search(query, limit)
return {
"results": [asdict(r) for r in results],
"count": len(results),
}
@mcp.tool()
async def get_passes(
ctx: Context,
norad_id: int,
hours: int = 24,
) -> dict:
"""Get upcoming pass predictions for a satellite.
Returns AOS/TCA/LOS times, azimuths, max elevation, duration,
and visibility for each predicted pass.
Args:
norad_id: NORAD catalog number (e.g. 25544 for ISS).
hours: How many hours ahead to predict (default 24).
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_craft(state)
passes = state.craft_client.get_passes(norad_id, hours)
return {
"passes": [asdict(p) for p in passes],
"count": len(passes),
}
@mcp.tool()
async def get_next_pass(ctx: Context, norad_id: int) -> dict:
"""Get the next upcoming pass for a satellite.
Returns the single soonest pass prediction, or null if none
are predicted in the near future.
Args:
norad_id: NORAD catalog number (e.g. 25544 for ISS).
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_craft(state)
p = state.craft_client.get_next_pass(norad_id)
if p is None:
return {"pass": None}
return {"pass": asdict(p)}
@mcp.tool()
async def get_visible_targets(
ctx: Context,
min_alt: float = 0.0,
) -> dict:
"""Get all objects currently above the horizon with real-time positions.
Returns satellites, planets, and stars with AZ/EL computed
server-side. This is the primary tracking endpoint use it
to find what's overhead right now.
Args:
min_alt: Minimum altitude in degrees (default 0 = horizon).
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_craft(state)
targets = state.craft_client.get_visible_targets(min_alt)
return {
"targets": [asdict(t) for t in targets],
"count": len(targets),
}

View File

@ -0,0 +1,122 @@
"""Signal measurement tools — RSSI, DVB config, LNA, and AZ sweep."""
from fastmcp import Context
from birdcage_mcp.state import BirdcageState
def _require_device(state: BirdcageState):
if state.device is None:
raise RuntimeError("No device connected. Use the 'connect' tool first.")
def register(mcp):
"""Register signal tools on the FastMCP instance."""
@mcp.tool()
async def get_rssi(ctx: Context, iterations: int = 10) -> dict:
"""Read averaged RSSI signal strength from the BCM4515 DVB tuner.
Returns {"reads": int, "average": int, "current": int}.
Noise floor is approximately 500 ADC counts.
Args:
iterations: Number of samples to average (default 10).
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
return state.device.get_rssi(iterations)
@mcp.tool()
async def get_adc_rssi(ctx: Context) -> dict:
"""Read single-shot ADC RSSI value (raw, unbounded).
Returns the raw ADC count from the hardware ADC subsystem,
bypassing the DVB tuner's averaging. Useful for quick signal checks.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.get_adc_rssi()
return {"raw": raw}
@mcp.tool()
async def get_lock_status(ctx: Context) -> dict:
"""Check DVB signal lock status (single-shot).
Returns the raw firmware response string containing lock state,
RSSI, and glitch count.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.get_lock_status()
return {"raw": raw}
@mcp.tool()
async def enable_lna(ctx: Context) -> dict:
"""Enable the LNA by setting LNB voltage to 13V (V-pol).
This powers the low-noise amplifier in the dish's outdoor unit.
Boot default is 18V (H-pol). Required before meaningful RSSI
readings when used for radio astronomy.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
state.device.enable_lna()
return {"status": "lna_enabled", "voltage": "13V"}
@mcp.tool()
async def get_dvb_config(ctx: Context) -> dict:
"""Read BCM4515 DVB tuner hardware and firmware version.
Returns chip ID, revision, firmware version, and strap config.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.get_dvb_config()
return {"raw": raw}
@mcp.tool()
async def get_channel_params(ctx: Context) -> dict:
"""Read current DVB channel parameters.
Returns frequency, symbol rate, modulation, LNB polarity,
tone settings, and scan configuration.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.get_channel_params()
return {"raw": raw}
@mcp.tool()
async def az_sweep(
ctx: Context,
start_az: float,
span: float,
step_cdeg: int = 100,
num_xponders: int = 1,
timeout: float = 120.0,
) -> dict:
"""Execute a firmware-accelerated azimuth sweep (azscanwxp).
Moves to start_az, then sweeps span degrees while measuring
RSSI/Lock/SNR at each step. The firmware handles motor movement
and measurement atomically no per-point serial round-trips.
Returns a list of measurement points, each containing:
az (degrees), rssi (ADC counts), lock (0/1), snr (dB).
SAFETY: Requires homed motors. Do NOT run on uncalibrated axes.
Args:
start_az: Starting azimuth in degrees.
span: Total sweep width in degrees.
step_cdeg: Step size in centidegrees (100 = 1.00 deg).
num_xponders: Transponders to cycle per position.
timeout: Max wait time for the sweep to complete.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
points = state.device.az_sweep_firmware(
start_az, span, step_cdeg, num_xponders, timeout
)
return {"points": points, "count": len(points)}

View File

@ -0,0 +1,178 @@
"""System and firmware tools — identification, motor dynamics, A3981, NVS."""
from fastmcp import Context
from birdcage_mcp.state import BirdcageState
def _require_device(state: BirdcageState):
if state.device is None:
raise RuntimeError("No device connected. Use the 'connect' tool first.")
def register(mcp):
"""Register system tools on the FastMCP instance."""
@mcp.tool()
async def get_firmware_id(ctx: Context) -> dict:
"""Read full MCU and firmware identification.
Returns NVS version, system ID, silicon info, board ID, antenna
ID, software version, clock frequencies, and flash layout.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.get_firmware_id()
return {"raw": raw}
@mcp.tool()
async def get_motor_dynamics(ctx: Context) -> dict:
"""Read max velocity and acceleration for both motor axes.
Returns {"az_max_vel", "el_max_vel", "az_accel", "el_accel"}
in degrees/sec and degrees/sec^2 respectively.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
return state.device.get_motor_dynamics()
@mcp.tool()
async def set_max_velocity(
ctx: Context,
motor_id: int,
deg_per_sec: float,
) -> dict:
"""Set the maximum velocity for a motor axis.
Args:
motor_id: 0 for azimuth, 1 for elevation.
deg_per_sec: Maximum velocity in degrees per second.
Carryout G2 defaults: AZ=65, EL=45.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
if motor_id not in (0, 1):
raise ValueError("motor_id must be 0 (azimuth) or 1 (elevation)")
state.device.set_max_velocity(motor_id, deg_per_sec)
axis = "azimuth" if motor_id == 0 else "elevation"
return {"status": "set", "axis": axis, "max_velocity": deg_per_sec}
@mcp.tool()
async def set_max_acceleration(
ctx: Context,
motor_id: int,
accel: float,
) -> dict:
"""Set the maximum acceleration for a motor axis.
Args:
motor_id: 0 for azimuth, 1 for elevation.
accel: Maximum acceleration in degrees/sec^2.
Carryout G2 default: 400 for both axes.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
if motor_id not in (0, 1):
raise ValueError("motor_id must be 0 (azimuth) or 1 (elevation)")
state.device.set_max_acceleration(motor_id, accel)
axis = "azimuth" if motor_id == 0 else "elevation"
return {"status": "set", "axis": axis, "max_acceleration": accel}
@mcp.tool()
async def get_motor_life(ctx: Context) -> dict:
"""Read motor lifetime and usage statistics.
Returns total moves, total degrees traveled, and uptime hours
for both AZ and EL axes.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.get_motor_life()
return {"raw": raw}
@mcp.tool()
async def get_pid_gains(ctx: Context) -> dict:
"""Read PID controller gains for both motor axes.
Returns {"az": {"kp", "kv", "ki"}, "el": {"kp", "kv", "ki"}}.
Carryout G2 defaults: AZ Kp=600/Kv=60/Ki=1, EL Kp=250/Kv=50/Ki=1.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
return state.device.get_pid_gains()
@mcp.tool()
async def set_pid_gains(
ctx: Context,
motor_id: int,
kp: float,
kv: float,
ki: float,
) -> dict:
"""Set PID controller gains for a motor axis.
CAUTION: Incorrect PID values can cause oscillation or motor damage.
Args:
motor_id: 0 for azimuth, 1 for elevation.
kp: Proportional gain.
kv: Velocity (derivative) gain.
ki: Integral gain.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
if motor_id not in (0, 1):
raise ValueError("motor_id must be 0 (azimuth) or 1 (elevation)")
state.device.set_pid_gains(motor_id, kp, kv, ki)
axis = "azimuth" if motor_id == 0 else "elevation"
return {"status": "set", "axis": axis, "kp": kp, "kv": kv, "ki": ki}
@mcp.tool()
async def get_a3981_diag(ctx: Context) -> dict:
"""Read Allegro A3981 stepper driver diagnostic status.
Returns fault status for both AZ and EL motor drivers.
Normal response is "AZ DIAG: OK / EL DIAG: OK".
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.get_a3981_diag()
return {"raw": raw}
@mcp.tool()
async def get_a3981_modes(ctx: Context) -> dict:
"""Read A3981 stepper driver operating modes.
Returns step mode (AUTO/fixed), current mode (AUTO/HiZ/LoZ),
and step size (FULL=16 through SIXTEENTH=1) for both drivers.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
return state.device.get_a3981_modes()
@mcp.tool()
async def nvs_dump(ctx: Context) -> dict:
"""Dump all non-volatile storage (NVS) values.
Returns the complete NVS table with index, name, current value,
saved value, and default value for every setting.
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.nvs_dump()
return {"raw": raw}
@mcp.tool()
async def nvs_read(ctx: Context, index: int) -> dict:
"""Read a single NVS value by index.
Key indices: 20=Disable Tracker, 80=AZ Max Vel, 85=EL Max Vel,
101=Min EL, 102=Max EL, 128-133=PID gains.
Args:
index: NVS index number (0-143).
"""
state: BirdcageState = ctx.request_context.lifespan_context
_require_device(state)
raw = state.device.nvs_read(index)
return {"index": index, "raw": raw}

74
mcp/tests/conftest.py Normal file
View File

@ -0,0 +1,74 @@
"""Shared fixtures for birdcage-mcp tests.
Uses FastMCP's run_server_async to spin up a real MCP server backed by
DemoDevice + DemoCraftClient. No serial hardware, no subprocesses.
"""
import json
from contextlib import asynccontextmanager
import pytest
from birdcage.demo import DemoCraftClient, DemoDevice
from fastmcp import Client, FastMCP
from fastmcp.client.transports import StreamableHttpTransport
from fastmcp.utilities.tests import run_server_async
from birdcage_mcp.state import BirdcageState
@asynccontextmanager
async def _test_lifespan(server: FastMCP):
"""Test lifespan that pre-connects a DemoDevice."""
device = DemoDevice()
device.connect()
device.initialize()
state = BirdcageState(
device=device,
craft_client=DemoCraftClient(),
demo_mode=True,
serial_port="/dev/demo",
firmware_name="g2",
connected=True,
)
yield state
def _build_server() -> FastMCP:
"""Build a FastMCP server with all tools registered."""
from birdcage_mcp import prompts, resources
from birdcage_mcp.tools import (
connection,
console,
movement,
satellite,
signal,
system,
)
mcp = FastMCP("birdcage-test", lifespan=_test_lifespan)
connection.register(mcp)
movement.register(mcp)
signal.register(mcp)
system.register(mcp)
satellite.register(mcp)
console.register(mcp)
resources.register(mcp)
prompts.register(mcp)
return mcp
@pytest.fixture
async def mcp_client():
"""Yield a connected MCP client backed by DemoDevice."""
server = _build_server()
async with (
run_server_async(server) as url,
Client(StreamableHttpTransport(url)) as client,
):
yield client
def parse_result(result) -> dict:
"""Extract the dict from a tool call result."""
return json.loads(result.content[0].text)

View File

@ -0,0 +1,29 @@
"""Tests for connection tools (connect, disconnect, status)."""
import pytest
from conftest import parse_result
from fastmcp import Client
@pytest.mark.anyio
async def test_status_shows_connected(mcp_client: Client):
result = await mcp_client.call_tool("status", {})
data = parse_result(result)
assert data["connected"] is True
assert data["demo_mode"] is True
assert data["firmware"] == "g2"
@pytest.mark.anyio
async def test_connect_demo_noop(mcp_client: Client):
result = await mcp_client.call_tool("connect", {})
data = parse_result(result)
assert data["mode"] == "demo"
assert data["status"] == "connected"
@pytest.mark.anyio
async def test_disconnect_demo_noop(mcp_client: Client):
result = await mcp_client.call_tool("disconnect", {})
data = parse_result(result)
assert data["status"] == "demo"

56
mcp/tests/test_console.py Normal file
View File

@ -0,0 +1,56 @@
"""Tests for raw console tool (safety-gated)."""
import pytest
from conftest import parse_result
from fastmcp import Client
@pytest.mark.anyio
async def test_send_raw_command(mcp_client: Client):
result = await mcp_client.call_tool(
"send_raw_command", {"command": "?"}
)
data = parse_result(result)
assert "response" in data
assert "Available commands" in data["response"]
@pytest.mark.anyio
async def test_q_command_blocked(mcp_client: Client):
result = await mcp_client.call_tool(
"send_raw_command", {"command": "q"}
)
data = parse_result(result)
assert data["blocked"] is True
assert "power cycle" in data["error"]
@pytest.mark.anyio
async def test_q_command_blocked_with_whitespace(mcp_client: Client):
result = await mcp_client.call_tool(
"send_raw_command", {"command": " Q "}
)
data = parse_result(result)
assert data["blocked"] is True
@pytest.mark.anyio
async def test_submenu_navigation(mcp_client: Client):
result = await mcp_client.call_tool(
"send_raw_command", {"command": "mot"}
)
data = parse_result(result)
assert "MOT>" in data["response"]
@pytest.mark.anyio
async def test_raw_motor_query(mcp_client: Client):
# Enter mot submenu first
await mcp_client.call_tool("send_raw_command", {"command": "mot"})
# Query position
result = await mcp_client.call_tool(
"send_raw_command", {"command": "a"}
)
data = parse_result(result)
assert "Angle[0]" in data["response"]
assert "Angle[1]" in data["response"]

100
mcp/tests/test_movement.py Normal file
View File

@ -0,0 +1,100 @@
"""Tests for movement tools."""
import pytest
from conftest import parse_result
from fastmcp import Client
from fastmcp.exceptions import ToolError
@pytest.mark.anyio
async def test_get_position(mcp_client: Client):
result = await mcp_client.call_tool("get_position", {})
data = parse_result(result)
assert "azimuth" in data
assert "elevation" in data
assert isinstance(data["azimuth"], float)
assert isinstance(data["elevation"], float)
@pytest.mark.anyio
async def test_move_to(mcp_client: Client):
result = await mcp_client.call_tool(
"move_to", {"azimuth": 200.0, "elevation": 40.0}
)
data = parse_result(result)
assert data["status"] == "moving"
assert data["target_azimuth"] == 200.0
assert data["target_elevation"] == 40.0
@pytest.mark.anyio
async def test_move_motor_az(mcp_client: Client):
result = await mcp_client.call_tool(
"move_motor", {"motor_id": 0, "degrees": 90.0}
)
data = parse_result(result)
assert data["axis"] == "azimuth"
assert data["target"] == 90.0
@pytest.mark.anyio
async def test_move_motor_el(mcp_client: Client):
result = await mcp_client.call_tool(
"move_motor", {"motor_id": 1, "degrees": 30.0}
)
data = parse_result(result)
assert data["axis"] == "elevation"
@pytest.mark.anyio
async def test_move_motor_invalid_id(mcp_client: Client):
with pytest.raises(ToolError):
await mcp_client.call_tool(
"move_motor", {"motor_id": 2, "degrees": 10.0}
)
@pytest.mark.anyio
async def test_home_motor(mcp_client: Client):
result = await mcp_client.call_tool("home_motor", {"motor_id": 1})
data = parse_result(result)
assert data["status"] == "homing"
assert data["axis"] == "elevation"
@pytest.mark.anyio
async def test_engage_release(mcp_client: Client):
result = await mcp_client.call_tool("release_motors", {})
data = parse_result(result)
assert data["status"] == "released"
result = await mcp_client.call_tool("engage_motors", {})
data = parse_result(result)
assert data["status"] == "engaged"
@pytest.mark.anyio
async def test_stow(mcp_client: Client):
result = await mcp_client.call_tool("stow", {})
data = parse_result(result)
assert data["status"] == "stowing"
assert data["target_azimuth"] == 0.0
assert data["target_elevation"] == 65.0
@pytest.mark.anyio
async def test_get_step_positions(mcp_client: Client):
result = await mcp_client.call_tool("get_step_positions", {})
data = parse_result(result)
assert "az_steps" in data
assert "el_steps" in data
assert isinstance(data["az_steps"], int)
@pytest.mark.anyio
async def test_get_el_limits(mcp_client: Client):
result = await mcp_client.call_tool("get_el_limits", {})
data = parse_result(result)
assert data["min"] == 18.0
assert data["max"] == 65.0
assert data["home"] == 65.0

View File

@ -0,0 +1,86 @@
"""Tests for satellite tracking tools."""
import pytest
from conftest import parse_result
from fastmcp import Client
@pytest.mark.anyio
async def test_search_satellites(mcp_client: Client):
result = await mcp_client.call_tool(
"search_satellites", {"query": "ISS"}
)
data = parse_result(result)
assert data["count"] >= 1
names = [r["name"] for r in data["results"]]
assert any("ISS" in n for n in names)
@pytest.mark.anyio
async def test_search_no_results(mcp_client: Client):
result = await mcp_client.call_tool(
"search_satellites", {"query": "nonexistent_xyz"}
)
data = parse_result(result)
assert data["count"] == 0
@pytest.mark.anyio
async def test_search_with_limit(mcp_client: Client):
result = await mcp_client.call_tool(
"search_satellites", {"query": "", "limit": 3}
)
data = parse_result(result)
# Empty query won't match any catalog entries
assert data["count"] >= 0
@pytest.mark.anyio
async def test_get_passes(mcp_client: Client):
result = await mcp_client.call_tool(
"get_passes", {"norad_id": 25544}
)
data = parse_result(result)
assert data["count"] > 0
p = data["passes"][0]
assert "aos_time" in p
assert "tca_time" in p
assert "los_time" in p
assert "max_elevation" in p
assert p["norad_id"] == 25544
@pytest.mark.anyio
async def test_get_next_pass(mcp_client: Client):
result = await mcp_client.call_tool(
"get_next_pass", {"norad_id": 25544}
)
data = parse_result(result)
assert data["pass"] is not None
assert data["pass"]["satellite_name"] == "ISS (ZARYA)"
@pytest.mark.anyio
async def test_get_visible_targets(mcp_client: Client):
result = await mcp_client.call_tool(
"get_visible_targets", {}
)
data = parse_result(result)
# Celestial bodies (Moon, Sun, Jupiter) are always visible
assert data["count"] >= 1
names = [t["name"] for t in data["targets"]]
# At least celestial bodies should be present
assert any(
n in names for n in ("Moon", "Sun", "Jupiter")
)
@pytest.mark.anyio
async def test_get_visible_targets_with_min_alt(mcp_client: Client):
result = await mcp_client.call_tool(
"get_visible_targets", {"min_alt": 80.0}
)
data = parse_result(result)
# Very high min_alt should filter out most targets
for t in data["targets"]:
assert t["altitude"] >= 80.0

80
mcp/tests/test_signal.py Normal file
View File

@ -0,0 +1,80 @@
"""Tests for signal measurement tools."""
import pytest
from conftest import parse_result
from fastmcp import Client
@pytest.mark.anyio
async def test_get_rssi_default(mcp_client: Client):
result = await mcp_client.call_tool("get_rssi", {})
data = parse_result(result)
assert data["reads"] == 10
assert isinstance(data["average"], int)
assert isinstance(data["current"], int)
@pytest.mark.anyio
async def test_get_rssi_custom_iterations(mcp_client: Client):
result = await mcp_client.call_tool("get_rssi", {"iterations": 5})
data = parse_result(result)
assert data["reads"] == 5
@pytest.mark.anyio
async def test_get_adc_rssi(mcp_client: Client):
result = await mcp_client.call_tool("get_adc_rssi", {})
data = parse_result(result)
assert "raw" in data
@pytest.mark.anyio
async def test_get_lock_status(mcp_client: Client):
result = await mcp_client.call_tool("get_lock_status", {})
data = parse_result(result)
assert "raw" in data
assert "Lock:" in data["raw"]
@pytest.mark.anyio
async def test_enable_lna(mcp_client: Client):
result = await mcp_client.call_tool("enable_lna", {})
data = parse_result(result)
assert data["status"] == "lna_enabled"
assert data["voltage"] == "13V"
@pytest.mark.anyio
async def test_get_dvb_config(mcp_client: Client):
result = await mcp_client.call_tool("get_dvb_config", {})
data = parse_result(result)
assert "BCM" in data["raw"]
assert "0x4515" in data["raw"]
@pytest.mark.anyio
async def test_get_channel_params(mcp_client: Client):
result = await mcp_client.call_tool("get_channel_params", {})
data = parse_result(result)
assert "Frequency" in data["raw"]
@pytest.mark.anyio
async def test_az_sweep(mcp_client: Client):
result = await mcp_client.call_tool(
"az_sweep",
{
"start_az": 195.0,
"span": 10.0,
"step_cdeg": 200,
"num_xponders": 1,
},
)
data = parse_result(result)
assert data["count"] > 0
assert len(data["points"]) == data["count"]
pt = data["points"][0]
assert "az" in pt
assert "rssi" in pt
assert "lock" in pt
assert "snr" in pt

121
mcp/tests/test_system.py Normal file
View File

@ -0,0 +1,121 @@
"""Tests for system and firmware tools."""
import pytest
from conftest import parse_result
from fastmcp import Client
from fastmcp.exceptions import ToolError
@pytest.mark.anyio
async def test_get_firmware_id(mcp_client: Client):
result = await mcp_client.call_tool("get_firmware_id", {})
data = parse_result(result)
assert "02.02.48" in data["raw"]
assert "TWELINCH" in data["raw"]
@pytest.mark.anyio
async def test_get_motor_dynamics(mcp_client: Client):
result = await mcp_client.call_tool("get_motor_dynamics", {})
data = parse_result(result)
assert data["az_max_vel"] == 65.0
assert data["el_max_vel"] == 45.0
assert data["az_accel"] == 400.0
assert data["el_accel"] == 400.0
@pytest.mark.anyio
async def test_set_max_velocity(mcp_client: Client):
result = await mcp_client.call_tool(
"set_max_velocity", {"motor_id": 0, "deg_per_sec": 30.0}
)
data = parse_result(result)
assert data["axis"] == "azimuth"
assert data["max_velocity"] == 30.0
# Verify it stuck
result = await mcp_client.call_tool("get_motor_dynamics", {})
data = parse_result(result)
assert data["az_max_vel"] == 30.0
@pytest.mark.anyio
async def test_set_max_velocity_invalid_motor(mcp_client: Client):
with pytest.raises(ToolError):
await mcp_client.call_tool(
"set_max_velocity", {"motor_id": 3, "deg_per_sec": 10.0}
)
@pytest.mark.anyio
async def test_set_max_acceleration(mcp_client: Client):
result = await mcp_client.call_tool(
"set_max_acceleration", {"motor_id": 1, "accel": 200.0}
)
data = parse_result(result)
assert data["axis"] == "elevation"
assert data["max_acceleration"] == 200.0
@pytest.mark.anyio
async def test_get_motor_life(mcp_client: Client):
result = await mcp_client.call_tool("get_motor_life", {})
data = parse_result(result)
assert "AZ total moves" in data["raw"]
@pytest.mark.anyio
async def test_get_pid_gains(mcp_client: Client):
result = await mcp_client.call_tool("get_pid_gains", {})
data = parse_result(result)
assert data["az"]["kp"] == 600.0
assert data["az"]["kv"] == 60.0
assert data["el"]["kp"] == 250.0
@pytest.mark.anyio
async def test_set_pid_gains(mcp_client: Client):
result = await mcp_client.call_tool(
"set_pid_gains",
{"motor_id": 0, "kp": 500.0, "kv": 55.0, "ki": 2.0},
)
data = parse_result(result)
assert data["status"] == "set"
assert data["kp"] == 500.0
@pytest.mark.anyio
async def test_get_a3981_diag(mcp_client: Client):
result = await mcp_client.call_tool("get_a3981_diag", {})
data = parse_result(result)
assert "OK" in data["raw"]
@pytest.mark.anyio
async def test_get_a3981_modes(mcp_client: Client):
result = await mcp_client.call_tool("get_a3981_modes", {})
data = parse_result(result)
assert "AUTO" in data["step_mode"]
@pytest.mark.anyio
async def test_nvs_dump(mcp_client: Client):
result = await mcp_client.call_tool("nvs_dump", {})
data = parse_result(result)
assert "Disable Tracker" in data["raw"]
assert "AZ Max Vel" in data["raw"]
@pytest.mark.anyio
async def test_nvs_read(mcp_client: Client):
result = await mcp_client.call_tool("nvs_read", {"index": 20})
data = parse_result(result)
assert data["index"] == 20
assert "Disable Tracker" in data["raw"]
@pytest.mark.anyio
async def test_nvs_read_missing_index(mcp_client: Client):
result = await mcp_client.call_tool("nvs_read", {"index": 999})
data = parse_result(result)
assert "not found" in data["raw"]

1678
mcp/uv.lock generated Normal file

File diff suppressed because it is too large Load Diff