diff --git a/src/mcdbus/_bus.py b/src/mcdbus/_bus.py index db66533..70bb4d4 100644 --- a/src/mcdbus/_bus.py +++ b/src/mcdbus/_bus.py @@ -76,7 +76,7 @@ class BusManager: def get_mgr(ctx: Context) -> "BusManager": """Extract the BusManager from a FastMCP tool context.""" - return ctx.request_context.lifespan_context + return ctx.lifespan_context def serialize_variant(value: Any) -> Any: diff --git a/src/mcdbus/_shortcuts.py b/src/mcdbus/_shortcuts.py index 1a83a22..a47491a 100644 --- a/src/mcdbus/_shortcuts.py +++ b/src/mcdbus/_shortcuts.py @@ -7,6 +7,9 @@ from fastmcp import Context from mcdbus._bus import call_bus_method, get_mgr from mcdbus._state import mcp +# --------------------------------------------------------------------------- +# Notifications +# --------------------------------------------------------------------------- @mcp.tool() async def send_notification( @@ -53,6 +56,10 @@ async def send_notification( return f"Notification sent (id: {nid})" +# --------------------------------------------------------------------------- +# Systemd +# --------------------------------------------------------------------------- + @mcp.tool() async def list_systemd_units( ctx: Context, @@ -98,6 +105,10 @@ async def list_systemd_units( return "\n".join(lines) +# --------------------------------------------------------------------------- +# MPRIS Media Player +# --------------------------------------------------------------------------- + @mcp.tool() async def media_player_control( action: str, @@ -194,3 +205,306 @@ async def media_player_control( title, artist = "Unknown", "Unknown" return f"Player: {player}\nAction: {action}\nStatus: {status}\nNow: {artist} — {title}" + + +# --------------------------------------------------------------------------- +# NetworkManager +# --------------------------------------------------------------------------- + +_NM_DEST = "org.freedesktop.NetworkManager" +_NM_PATH = "/org/freedesktop/NetworkManager" +_NM_IFACE = "org.freedesktop.NetworkManager" + +_NM_STATE_MAP = { + 0: "Unknown", 10: "Asleep", 20: "Disconnected", + 30: "Disconnecting", 40: "Connecting", + 50: "Connected (local)", 60: "Connected (site)", + 70: "Connected (global)", +} + +_NM_CONN_STATE_MAP = { + 0: "Unknown", 1: "Activating", 2: "Activated", + 3: "Deactivating", 4: "Deactivated", +} + + +@mcp.tool() +async def network_status(ctx: Context) -> str: + """Show NetworkManager connection status and active connections.""" + mgr = get_mgr(ctx) + connection = await mgr.get_bus("system") + + try: + state_result = await call_bus_method( + connection, + destination=_NM_DEST, + path=_NM_PATH, + interface="org.freedesktop.DBus.Properties", + member="GetAll", + signature="s", + body=[_NM_IFACE], + ) + except (RuntimeError, TimeoutError) as exc: + return f"NetworkManager not available: {exc}" + + if not state_result or not state_result[0]: + return "NetworkManager returned no properties." + + props = state_result[0] + state = _NM_STATE_MAP.get(props.get("State", 0), "Unknown") + wireless = "enabled" if props.get("WirelessEnabled") else "disabled" + + lines = [ + "## Network Status\n", + f"- **State:** {state}", + f"- **Wireless:** {wireless}", + ] + + # Active connections + active_paths = props.get("ActiveConnections", []) + if active_paths: + lines.append("\n| Connection | Type | State |") + lines.append("|------------|------|-------|") + + for ac_path in active_paths: + try: + ac_result = await call_bus_method( + connection, + destination=_NM_DEST, + path=ac_path, + interface="org.freedesktop.DBus.Properties", + member="GetAll", + signature="s", + body=["org.freedesktop.NetworkManager.Connection.Active"], + ) + if ac_result and ac_result[0]: + ac = ac_result[0] + name = ac.get("Id", "?") + conn_type = ac.get("Type", "?") + conn_state = _NM_CONN_STATE_MAP.get(ac.get("State", 0), "?") + lines.append(f"| {name} | {conn_type} | {conn_state} |") + except (RuntimeError, TimeoutError): + continue + + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# UPower (Battery) +# --------------------------------------------------------------------------- + +_UPOWER_DEST = "org.freedesktop.UPower" +_UPOWER_PATH = "/org/freedesktop/UPower" + +_UPOWER_STATE_MAP = { + 0: "Unknown", 1: "Charging", 2: "Discharging", + 3: "Empty", 4: "Fully charged", 5: "Pending charge", + 6: "Pending discharge", +} + + +@mcp.tool() +async def battery_status(ctx: Context) -> str: + """Show battery status from UPower (percentage, charging state, time remaining).""" + mgr = get_mgr(ctx) + connection = await mgr.get_bus("system") + + try: + devices_result = await call_bus_method( + connection, + destination=_UPOWER_DEST, + path=_UPOWER_PATH, + interface=_UPOWER_DEST, + member="EnumerateDevices", + ) + except (RuntimeError, TimeoutError) as exc: + return f"UPower not available: {exc}" + + if not devices_result or not devices_result[0]: + return "No power devices found." + + device_paths = devices_result[0] + batteries = [] + + for dev_path in device_paths: + try: + dev_result = await call_bus_method( + connection, + destination=_UPOWER_DEST, + path=dev_path, + interface="org.freedesktop.DBus.Properties", + member="GetAll", + signature="s", + body=["org.freedesktop.UPower.Device"], + ) + except (RuntimeError, TimeoutError): + continue + + if not dev_result or not dev_result[0]: + continue + + props = dev_result[0] + # Type 2 = Battery + if props.get("Type") != 2: + continue + + pct = props.get("Percentage", 0) + state = _UPOWER_STATE_MAP.get(props.get("State", 0), "Unknown") + tte = props.get("TimeToEmpty", 0) + ttf = props.get("TimeToFull", 0) + rate = props.get("EnergyRate", 0) + model = props.get("Model", "Battery") + + time_str = "" + if tte > 0: + h, m = divmod(int(tte) // 60, 60) + time_str = f"{h}h {m}m remaining" + elif ttf > 0: + h, m = divmod(int(ttf) // 60, 60) + time_str = f"{h}h {m}m to full" + + batteries.append({ + "model": model, "percentage": pct, "state": state, + "time": time_str, "rate": rate, + }) + + if not batteries: + return "No batteries found." + + lines = ["## Battery Status\n"] + for bat in batteries: + lines.append(f"### {bat['model']}") + lines.append(f"- **Charge:** {bat['percentage']:.0f}%") + lines.append(f"- **State:** {bat['state']}") + if bat["time"]: + lines.append(f"- **Time:** {bat['time']}") + if bat["rate"] > 0: + lines.append(f"- **Power draw:** {bat['rate']:.1f} W") + + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# Bluetooth (bluez) +# --------------------------------------------------------------------------- + +@mcp.tool() +async def bluetooth_devices(ctx: Context) -> str: + """List discovered and paired Bluetooth devices from bluez.""" + mgr = get_mgr(ctx) + connection = await mgr.get_bus("system") + + try: + objects_result = await call_bus_method( + connection, + destination="org.bluez", + path="/", + interface="org.freedesktop.DBus.ObjectManager", + member="GetManagedObjects", + ) + except (RuntimeError, TimeoutError) as exc: + return f"bluez not available: {exc}" + + if not objects_result or not objects_result[0]: + return "No Bluetooth objects found." + + managed = objects_result[0] + devices = [] + + for _obj_path, interfaces in managed.items(): + dev_props = interfaces.get("org.bluez.Device1") + if not dev_props: + continue + + devices.append({ + "name": dev_props.get("Name", dev_props.get("Alias", "Unknown")), + "address": dev_props.get("Address", "?"), + "connected": dev_props.get("Connected", False), + "paired": dev_props.get("Paired", False), + "trusted": dev_props.get("Trusted", False), + }) + + if not devices: + return "No Bluetooth devices found." + + devices.sort(key=lambda d: (not d["connected"], not d["paired"], d["name"])) + lines = [ + f"## Bluetooth Devices — {len(devices)} found\n", + "| Name | Address | Connected | Paired | Trusted |", + "|------|---------|-----------|--------|---------|", + ] + for d in devices: + conn = "✓" if d["connected"] else "—" + pair = "✓" if d["paired"] else "—" + trust = "✓" if d["trusted"] else "—" + lines.append(f"| {d['name']} | `{d['address']}` | {conn} | {pair} | {trust} |") + + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# KWin Windows (KDE Plasma) +# --------------------------------------------------------------------------- + +@mcp.tool() +async def kwin_windows(ctx: Context) -> str: + """List open windows from KDE KWin via the KRunner window runner. + + Returns window titles, application names, and virtual desktop assignments. + Requires a running KDE Plasma session. + """ + mgr = get_mgr(ctx) + connection = await mgr.get_bus("session") + + # KRunner's WindowsRunner lists all windows when queried with empty string + try: + result = await call_bus_method( + connection, + destination="org.kde.KWin", + path="/WindowsRunner", + interface="org.kde.krunner1", + member="Match", + signature="s", + body=[""], + ) + except (RuntimeError, TimeoutError) as exc: + return f"KWin not available: {exc}" + + if not result or not result[0]: + return "No windows found (KWin WindowsRunner returned empty)." + + matches = result[0] + # Each match: (id, caption, icon, type, relevance, properties) + # type=0 → window, type=8 → virtual desktop + + windows = [] + desktops = [] + seen_ids = set() + + for match in matches: + match_id, caption, icon = match[0], match[1], match[2] + match_type = match[3] + + if match_id in seen_ids: + continue + seen_ids.add(match_id) + + if match_type == 8: + desktops.append(caption) + else: + windows.append({"caption": caption, "app": icon or "unknown"}) + + if not windows: + return "No open windows found." + + # Group by app + lines = [f"## KWin Windows — {len(windows)} open\n"] + lines.append("| Window | Application |") + lines.append("|--------|-------------|") + for w in sorted(windows, key=lambda x: (x["app"], x["caption"])): + lines.append(f"| {w['caption']} | {w['app']} |") + + if desktops: + lines.append(f"\n**Virtual desktops:** {', '.join(sorted(desktops))}") + + return "\n".join(lines) diff --git a/tests/conftest.py b/tests/conftest.py index 2f65118..b53e3b4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,8 +1,34 @@ """Test fixtures for mcdbus.""" +import asyncio + import pytest +from fastmcp import Client from mcdbus._bus import BusManager +from mcdbus.server import mcp + + +@pytest.fixture(autouse=True) +def _reset_mcp_singleton(): + """Reset event-loop-bound state on the mcp singleton. + + FastMCP creates an asyncio.Event() at construction time. pytest-asyncio + creates a fresh loop per test, so after test 1 the Event is bound to a + dead loop and test 2 would crash. Re-creating these objects keeps every + test isolated. + """ + mcp._started = asyncio.Event() + mcp._lifespan_result = None + mcp._lifespan_result_set = False + yield + + +@pytest.fixture +async def client(): + """MCP client connected via in-memory transport (no HTTP).""" + async with Client(mcp) as c: + yield c @pytest.fixture diff --git a/tests/test_tools.py b/tests/test_tools.py new file mode 100644 index 0000000..587537b --- /dev/null +++ b/tests/test_tools.py @@ -0,0 +1,238 @@ +"""Tool-level tests — every MCP tool exercised through Client(mcp) in-memory transport.""" + +import json + +import pytest +from fastmcp import Client +from fastmcp.exceptions import ToolError + +# --------------------------------------------------------------------------- +# Discovery tools +# --------------------------------------------------------------------------- + +class TestListServices: + async def test_returns_services(self, client: Client): + result = await client.call_tool("list_services", {"bus": "session"}) + text = result.content[0].text + assert "org.freedesktop.DBus" in text + assert "D-Bus session bus" in text + + async def test_include_unique(self, client: Client): + result = await client.call_tool( + "list_services", {"bus": "session", "include_unique": True} + ) + text = result.content[0].text + assert ":1." in text + + +class TestIntrospect: + async def test_dbus_daemon(self, client: Client): + result = await client.call_tool("introspect", { + "bus": "session", + "service": "org.freedesktop.DBus", + "object_path": "/org/freedesktop/DBus", + }) + text = result.content[0].text + assert "org.freedesktop.DBus" in text + assert "ListNames" in text + + async def test_invalid_path(self, client: Client): + with pytest.raises(ToolError, match="Invalid D-Bus object path"): + await client.call_tool("introspect", { + "bus": "session", + "service": "org.freedesktop.DBus", + "object_path": "not-a-path", + }) + + +class TestListObjects: + async def test_dbus_tree(self, client: Client): + result = await client.call_tool("list_objects", { + "bus": "session", + "service": "org.freedesktop.DBus", + }) + text = result.content[0].text + assert "Object tree" in text + assert "/org/freedesktop/DBus" in text + + +# --------------------------------------------------------------------------- +# Interaction tools +# --------------------------------------------------------------------------- + +class TestCallMethod: + async def test_ping_void(self, client: Client): + result = await client.call_tool("call_method", { + "bus": "session", + "service": "org.freedesktop.DBus", + "object_path": "/org/freedesktop/DBus", + "interface": "org.freedesktop.DBus.Peer", + "method": "Ping", + }) + text = result.content[0].text + assert "void" in text.lower() + + async def test_get_id_returns_hex(self, client: Client): + result = await client.call_tool("call_method", { + "bus": "session", + "service": "org.freedesktop.DBus", + "object_path": "/org/freedesktop/DBus", + "interface": "org.freedesktop.DBus", + "method": "GetId", + }) + text = result.content[0].text + # GetId returns a hex string, JSON-encoded with quotes + bus_id = json.loads(text) + assert isinstance(bus_id, str) + assert len(bus_id) > 10 + + async def test_bad_method_returns_error(self, client: Client): + result = await client.call_tool("call_method", { + "bus": "session", + "service": "org.freedesktop.DBus", + "object_path": "/org/freedesktop/DBus", + "interface": "org.freedesktop.DBus", + "method": "NoSuchMethod", + }) + text = result.content[0].text + assert "Error" in text or "error" in text + + +class TestGetProperty: + async def test_read_property(self, client: Client): + # Read the Features property from the DBus daemon + result = await client.call_tool("get_property", { + "bus": "session", + "service": "org.freedesktop.DBus", + "object_path": "/org/freedesktop/DBus", + "interface": "org.freedesktop.DBus", + "property_name": "Features", + }) + text = result.content[0].text + # Features returns an array of strings + features = json.loads(text) + assert isinstance(features, list) + + +class TestGetAllProperties: + async def test_dbus_daemon(self, client: Client): + result = await client.call_tool("get_all_properties", { + "bus": "session", + "service": "org.freedesktop.DBus", + "object_path": "/org/freedesktop/DBus", + "interface": "org.freedesktop.DBus", + }) + text = result.content[0].text + assert "Properties of" in text + assert "Features" in text + + +# --------------------------------------------------------------------------- +# Shortcut tools — existing +# --------------------------------------------------------------------------- + +class TestSendNotification: + async def test_sends(self, client: Client): + result = await client.call_tool("send_notification", { + "summary": "mcdbus test", + "body": "automated test notification", + "timeout": 1000, + }) + text = result.content[0].text + assert "Notification sent" in text + assert "id:" in text + + +class TestListSystemdUnits: + async def test_returns_table(self, client: Client): + result = await client.call_tool("list_systemd_units", {}) + text = result.content[0].text + assert "Systemd units" in text + assert "| Unit |" in text + + async def test_pattern_filter(self, client: Client): + result = await client.call_tool( + "list_systemd_units", {"pattern": "dbus*"} + ) + text = result.content[0].text + assert "dbus" in text.lower() + + +class TestMediaPlayerControl: + async def test_no_players_graceful(self, client: Client): + # With no explicit player, it auto-discovers; if none found, returns message + result = await client.call_tool( + "media_player_control", {"action": "play"} + ) + text = result.content[0].text + # Either "No MPRIS media players" or a player response — both are valid + assert "Player:" in text or "No MPRIS" in text + + async def test_bad_action(self, client: Client): + result = await client.call_tool( + "media_player_control", {"action": "invalid_action"} + ) + text = result.content[0].text + # auto-discovers player first; if none found, "No MPRIS" message + # if player found, "Unknown action" message + assert "Unknown action" in text or "No MPRIS" in text + + +# --------------------------------------------------------------------------- +# Shortcut tools — new +# --------------------------------------------------------------------------- + +class TestNetworkStatus: + async def test_returns_status(self, client: Client): + result = await client.call_tool("network_status", {}) + text = result.content[0].text + assert "Network Status" in text or "NetworkManager not available" in text + + async def test_has_state(self, client: Client): + result = await client.call_tool("network_status", {}) + text = result.content[0].text + if "not available" not in text: + assert "State:" in text + + +class TestBatteryStatus: + async def test_returns_result(self, client: Client): + result = await client.call_tool("battery_status", {}) + text = result.content[0].text + # Desktop may not have battery — both outcomes are valid + assert ( + "Battery Status" in text + or "No batteries found" in text + or "No power devices" in text + or "UPower not available" in text + ) + + +class TestBluetoothDevices: + async def test_returns_result(self, client: Client): + result = await client.call_tool("bluetooth_devices", {}) + text = result.content[0].text + assert ( + "Bluetooth Devices" in text + or "No Bluetooth devices" in text + or "No Bluetooth objects" in text + or "bluez not available" in text + ) + + +class TestKwinWindows: + async def test_returns_windows(self, client: Client): + result = await client.call_tool("kwin_windows", {}) + text = result.content[0].text + assert ( + "KWin Windows" in text + or "KWin not available" in text + or "No windows found" in text + or "No open windows" in text + ) + + async def test_has_table(self, client: Client): + result = await client.call_tool("kwin_windows", {}) + text = result.content[0].text + if "KWin Windows" in text: + assert "| Window |" in text