Implement D-Bus MCP server with introspection-first discovery
Bridge Linux D-Bus IPC into MCP with tools for service discovery (list_services, introspect, list_objects), method calls and property access, plus convenience shortcuts for notifications, systemd units, and MPRIS media player control. All 25 tests passing.
This commit is contained in:
parent
b383f36872
commit
4d7b73f6ee
17
README.md
Normal file
17
README.md
Normal file
@ -0,0 +1,17 @@
|
||||
# mcdbus
|
||||
|
||||
D-Bus MCP server — bridge Linux IPC into the Model Context Protocol.
|
||||
|
||||
Gives Claude native tool access to session and system D-Bus services through introspection-first discovery.
|
||||
|
||||
## Install
|
||||
|
||||
```bash
|
||||
uv run mcdbus
|
||||
```
|
||||
|
||||
## Add to Claude Code
|
||||
|
||||
```bash
|
||||
claude mcp add mcdbus -- uv run --directory /path/to/mcdbus mcdbus
|
||||
```
|
||||
126
src/mcdbus/_bus.py
Normal file
126
src/mcdbus/_bus.py
Normal file
@ -0,0 +1,126 @@
|
||||
"""Bus connection management and D-Bus type serialization."""
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
from dbus_fast import BusType, Message, MessageType
|
||||
from dbus_fast.aio import MessageBus
|
||||
from dbus_fast.unpack import unpack_variants
|
||||
|
||||
|
||||
class BusManager:
|
||||
"""Lazy-connecting, caching manager for session and system D-Bus connections."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._buses: dict[str, MessageBus] = {}
|
||||
|
||||
async def get_bus(self, bus_type: str) -> MessageBus:
|
||||
"""Get a connected bus by type name ("session" or "system")."""
|
||||
if bus_type not in ("session", "system"):
|
||||
raise ValueError(f"bus_type must be 'session' or 'system', got {bus_type!r}")
|
||||
|
||||
if bus_type in self._buses:
|
||||
bus = self._buses[bus_type]
|
||||
if bus.connected:
|
||||
return bus
|
||||
# stale — drop and reconnect
|
||||
del self._buses[bus_type]
|
||||
|
||||
bt = BusType.SESSION if bus_type == "session" else BusType.SYSTEM
|
||||
bus = await MessageBus(bus_type=bt).connect()
|
||||
self._buses[bus_type] = bus
|
||||
return bus
|
||||
|
||||
async def disconnect_all(self) -> None:
|
||||
for bus in self._buses.values():
|
||||
if bus.connected:
|
||||
bus.disconnect()
|
||||
self._buses.clear()
|
||||
|
||||
|
||||
def serialize_variant(value: Any) -> Any:
|
||||
"""Recursively unwrap dbus-fast Variant objects into JSON-safe Python types."""
|
||||
unpacked = unpack_variants(value)
|
||||
return _make_json_safe(unpacked)
|
||||
|
||||
|
||||
def _make_json_safe(value: Any) -> Any:
|
||||
"""Ensure all values are JSON-serializable (handle bytes, tuples, etc.)."""
|
||||
if isinstance(value, bytes):
|
||||
try:
|
||||
return value.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
return list(value)
|
||||
if isinstance(value, dict):
|
||||
return {str(k): _make_json_safe(v) for k, v in value.items()}
|
||||
if isinstance(value, (list, tuple)):
|
||||
return [_make_json_safe(item) for item in value]
|
||||
return value
|
||||
|
||||
|
||||
def deserialize_args(args_json: str, signature: str) -> list[Any]:
|
||||
"""Parse JSON args string into a Python list suitable for D-Bus call body.
|
||||
|
||||
For 'v' (Variant) signatures, wraps values in dbus_fast.signature.Variant.
|
||||
"""
|
||||
from dbus_fast.signature import SignatureTree, Variant
|
||||
|
||||
if not args_json or args_json.strip() in ("", "[]", "null"):
|
||||
return []
|
||||
|
||||
args = json.loads(args_json)
|
||||
if not isinstance(args, list):
|
||||
args = [args]
|
||||
|
||||
if not signature:
|
||||
return args
|
||||
|
||||
tree = SignatureTree(signature)
|
||||
result = []
|
||||
for i, sig_type in enumerate(tree.types):
|
||||
if i >= len(args):
|
||||
break
|
||||
val = args[i]
|
||||
if sig_type.signature == "v":
|
||||
# Caller must wrap variant values; we auto-wrap strings as "s"
|
||||
if isinstance(val, str):
|
||||
val = Variant("s", val)
|
||||
elif isinstance(val, bool):
|
||||
val = Variant("b", val)
|
||||
elif isinstance(val, int):
|
||||
val = Variant("i", val)
|
||||
elif isinstance(val, float):
|
||||
val = Variant("d", val)
|
||||
elif isinstance(val, list):
|
||||
val = Variant("as", val)
|
||||
elif isinstance(val, dict):
|
||||
val = Variant("a{sv}", val)
|
||||
result.append(val)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
async def call_bus_method(
|
||||
bus: MessageBus,
|
||||
destination: str,
|
||||
path: str,
|
||||
interface: str,
|
||||
member: str,
|
||||
signature: str = "",
|
||||
body: list[Any] | None = None,
|
||||
) -> Any:
|
||||
"""Send a D-Bus method call and return the unpacked response body."""
|
||||
msg = Message(
|
||||
destination=destination,
|
||||
path=path,
|
||||
interface=interface,
|
||||
member=member,
|
||||
signature=signature,
|
||||
body=body or [],
|
||||
)
|
||||
reply = await bus.call(msg)
|
||||
if reply.message_type == MessageType.ERROR:
|
||||
raise RuntimeError(f"D-Bus error: {reply.error_name}: {reply.body}")
|
||||
if reply.body:
|
||||
return serialize_variant(reply.body)
|
||||
return None
|
||||
174
src/mcdbus/_discovery.py
Normal file
174
src/mcdbus/_discovery.py
Normal file
@ -0,0 +1,174 @@
|
||||
"""Discovery tools — list services, introspect objects, walk object trees."""
|
||||
|
||||
from fastmcp import Context
|
||||
|
||||
from mcdbus._bus import BusManager, call_bus_method
|
||||
from mcdbus._state import mcp
|
||||
|
||||
|
||||
def _get_mgr(ctx: Context) -> BusManager:
|
||||
return ctx.request_context.lifespan_context
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def list_services(
|
||||
bus: str,
|
||||
ctx: Context,
|
||||
include_unique: bool = False,
|
||||
) -> str:
|
||||
"""List well-known D-Bus service names on a bus.
|
||||
|
||||
Args:
|
||||
bus: "session" or "system"
|
||||
include_unique: include unique connection names like :1.42 (default False)
|
||||
"""
|
||||
mgr = _get_mgr(ctx)
|
||||
await ctx.info(f"Connecting to {bus} bus...")
|
||||
connection = await mgr.get_bus(bus)
|
||||
|
||||
result = await call_bus_method(
|
||||
connection,
|
||||
destination="org.freedesktop.DBus",
|
||||
path="/org/freedesktop/DBus",
|
||||
interface="org.freedesktop.DBus",
|
||||
member="ListNames",
|
||||
)
|
||||
|
||||
names = sorted(result[0]) if result else []
|
||||
if not include_unique:
|
||||
names = [n for n in names if not n.startswith(":")]
|
||||
|
||||
lines = [f"## D-Bus {bus} bus — {len(names)} services\n"]
|
||||
for name in names:
|
||||
lines.append(f"- `{name}`")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def introspect(
|
||||
bus: str,
|
||||
service: str,
|
||||
object_path: str,
|
||||
ctx: Context,
|
||||
include_standard: bool = False,
|
||||
) -> str:
|
||||
"""Introspect a D-Bus object to see its interfaces, methods, properties, and signals.
|
||||
|
||||
Args:
|
||||
bus: "session" or "system"
|
||||
service: service name (e.g. "org.freedesktop.Notifications")
|
||||
object_path: object path (e.g. "/org/freedesktop/Notifications")
|
||||
include_standard: include standard D-Bus interfaces (Peer, Introspectable, Properties)
|
||||
"""
|
||||
mgr = _get_mgr(ctx)
|
||||
await ctx.report_progress(0, 4)
|
||||
connection = await mgr.get_bus(bus)
|
||||
|
||||
await ctx.report_progress(1, 4)
|
||||
node = await connection.introspect(service, object_path)
|
||||
|
||||
await ctx.report_progress(2, 4)
|
||||
|
||||
standard_ifaces = {
|
||||
"org.freedesktop.DBus.Peer",
|
||||
"org.freedesktop.DBus.Introspectable",
|
||||
"org.freedesktop.DBus.Properties",
|
||||
"org.freedesktop.DBus.ObjectManager",
|
||||
}
|
||||
|
||||
lines = [f"## `{service}` at `{object_path}`\n"]
|
||||
|
||||
for iface in node.interfaces:
|
||||
if not include_standard and iface.name in standard_ifaces:
|
||||
continue
|
||||
|
||||
lines.append(f"### Interface: `{iface.name}`\n")
|
||||
|
||||
if iface.methods:
|
||||
lines.append("**Methods:**")
|
||||
for method in iface.methods:
|
||||
in_args = ", ".join(
|
||||
f"{a.name}: {a.signature}" for a in method.in_args
|
||||
)
|
||||
out_args = ", ".join(
|
||||
f"{a.name}: {a.signature}" for a in method.out_args
|
||||
)
|
||||
out_str = f" -> {out_args}" if out_args else ""
|
||||
lines.append(f"- `{method.name}({in_args}){out_str}`")
|
||||
lines.append("")
|
||||
|
||||
if iface.properties:
|
||||
lines.append("**Properties:**")
|
||||
for prop in iface.properties:
|
||||
access = getattr(prop.access, "name", str(prop.access)).lower()
|
||||
lines.append(f"- `{prop.name}`: `{prop.signature}` ({access})")
|
||||
lines.append("")
|
||||
|
||||
if iface.signals:
|
||||
lines.append("**Signals:**")
|
||||
for signal in iface.signals:
|
||||
sig_args = ", ".join(
|
||||
f"{a.name}: {a.signature}" for a in signal.args
|
||||
)
|
||||
lines.append(f"- `{signal.name}({sig_args})`")
|
||||
lines.append("")
|
||||
|
||||
# Show child nodes if any
|
||||
if node.nodes:
|
||||
lines.append("**Child nodes:**")
|
||||
for child in node.nodes:
|
||||
child_path = object_path.rstrip("/") + "/" + child.name
|
||||
lines.append(f"- `{child_path}`")
|
||||
lines.append("")
|
||||
|
||||
await ctx.report_progress(4, 4)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def list_objects(
|
||||
bus: str,
|
||||
service: str,
|
||||
ctx: Context,
|
||||
root_path: str = "/",
|
||||
) -> str:
|
||||
"""Recursively walk the D-Bus object tree for a service.
|
||||
|
||||
Args:
|
||||
bus: "session" or "system"
|
||||
service: service name
|
||||
root_path: starting path (default "/")
|
||||
"""
|
||||
mgr = _get_mgr(ctx)
|
||||
connection = await mgr.get_bus(bus)
|
||||
|
||||
paths: list[tuple[str, list[str]]] = []
|
||||
|
||||
async def _walk(path: str) -> None:
|
||||
try:
|
||||
node = await connection.introspect(service, path)
|
||||
except Exception as exc:
|
||||
await ctx.warning(f"Could not introspect {path}: {exc}")
|
||||
return
|
||||
|
||||
iface_names = [i.name for i in node.interfaces]
|
||||
paths.append((path, iface_names))
|
||||
await ctx.report_progress(len(paths), len(paths))
|
||||
|
||||
for child in node.nodes:
|
||||
child_path = path.rstrip("/") + "/" + child.name
|
||||
await _walk(child_path)
|
||||
|
||||
await _walk(root_path)
|
||||
|
||||
lines = [f"## Object tree for `{service}` on {bus} bus — {len(paths)} objects\n"]
|
||||
for path, ifaces in sorted(paths):
|
||||
std_prefix = "org.freedesktop.DBus."
|
||||
iface_str = ", ".join(f"`{i}`" for i in ifaces if not i.startswith(std_prefix))
|
||||
if iface_str:
|
||||
lines.append(f"- `{path}` — {iface_str}")
|
||||
else:
|
||||
lines.append(f"- `{path}`")
|
||||
|
||||
return "\n".join(lines)
|
||||
188
src/mcdbus/_interaction.py
Normal file
188
src/mcdbus/_interaction.py
Normal file
@ -0,0 +1,188 @@
|
||||
"""Interaction tools — method calls, property get/set."""
|
||||
|
||||
import json
|
||||
|
||||
from dbus_fast.signature import Variant
|
||||
from fastmcp import Context
|
||||
|
||||
from mcdbus._bus import BusManager, call_bus_method, deserialize_args
|
||||
from mcdbus._state import mcp
|
||||
|
||||
|
||||
def _get_mgr(ctx: Context) -> BusManager:
|
||||
return ctx.request_context.lifespan_context
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def call_method(
|
||||
bus: str,
|
||||
service: str,
|
||||
object_path: str,
|
||||
interface: str,
|
||||
method: str,
|
||||
ctx: Context,
|
||||
args: str = "[]",
|
||||
signature: str = "",
|
||||
) -> str:
|
||||
"""Call a D-Bus method and return the result.
|
||||
|
||||
Args:
|
||||
bus: "session" or "system"
|
||||
service: service name (e.g. "org.freedesktop.Notifications")
|
||||
object_path: object path (e.g. "/org/freedesktop/Notifications")
|
||||
interface: interface name (e.g. "org.freedesktop.Notifications")
|
||||
method: method name (e.g. "GetServerInformation")
|
||||
args: JSON array of arguments (default "[]")
|
||||
signature: D-Bus type signature for args (e.g. "su"); leave empty for no-arg methods
|
||||
"""
|
||||
mgr = _get_mgr(ctx)
|
||||
connection = await mgr.get_bus(bus)
|
||||
|
||||
body = deserialize_args(args, signature)
|
||||
await ctx.info(f"Calling {interface}.{method} on {service}...")
|
||||
|
||||
try:
|
||||
result = await call_bus_method(
|
||||
connection,
|
||||
destination=service,
|
||||
path=object_path,
|
||||
interface=interface,
|
||||
member=method,
|
||||
signature=signature,
|
||||
body=body,
|
||||
)
|
||||
except RuntimeError as exc:
|
||||
await ctx.error(str(exc))
|
||||
return f"Error: {exc}"
|
||||
|
||||
if result is None:
|
||||
return "Method returned no data (void)."
|
||||
|
||||
# If single-element response, unwrap for cleaner output
|
||||
if isinstance(result, list) and len(result) == 1:
|
||||
result = result[0]
|
||||
|
||||
return json.dumps(result, indent=2, default=str)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def get_property(
|
||||
bus: str,
|
||||
service: str,
|
||||
object_path: str,
|
||||
interface: str,
|
||||
property_name: str,
|
||||
ctx: Context,
|
||||
) -> str:
|
||||
"""Read a single D-Bus property value.
|
||||
|
||||
Args:
|
||||
bus: "session" or "system"
|
||||
service: service name
|
||||
object_path: object path
|
||||
interface: interface that owns the property
|
||||
property_name: property name to read
|
||||
"""
|
||||
mgr = _get_mgr(ctx)
|
||||
connection = await mgr.get_bus(bus)
|
||||
|
||||
result = await call_bus_method(
|
||||
connection,
|
||||
destination=service,
|
||||
path=object_path,
|
||||
interface="org.freedesktop.DBus.Properties",
|
||||
member="Get",
|
||||
signature="ss",
|
||||
body=[interface, property_name],
|
||||
)
|
||||
|
||||
if result:
|
||||
value = result[0]
|
||||
return json.dumps(value, indent=2, default=str)
|
||||
return "No value returned."
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def set_property(
|
||||
bus: str,
|
||||
service: str,
|
||||
object_path: str,
|
||||
interface: str,
|
||||
property_name: str,
|
||||
value: str,
|
||||
signature: str,
|
||||
ctx: Context,
|
||||
) -> str:
|
||||
"""Set a D-Bus property value.
|
||||
|
||||
Args:
|
||||
bus: "session" or "system"
|
||||
service: service name
|
||||
object_path: object path
|
||||
interface: interface that owns the property
|
||||
property_name: property name to set
|
||||
value: JSON-encoded value to set
|
||||
signature: D-Bus type signature of the property value (e.g. "b" for boolean)
|
||||
"""
|
||||
mgr = _get_mgr(ctx)
|
||||
connection = await mgr.get_bus(bus)
|
||||
|
||||
parsed_value = json.loads(value)
|
||||
variant = Variant(signature, parsed_value)
|
||||
|
||||
await call_bus_method(
|
||||
connection,
|
||||
destination=service,
|
||||
path=object_path,
|
||||
interface="org.freedesktop.DBus.Properties",
|
||||
member="Set",
|
||||
signature="ssv",
|
||||
body=[interface, property_name, variant],
|
||||
)
|
||||
|
||||
return f"Set {interface}.{property_name} = {value}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def get_all_properties(
|
||||
bus: str,
|
||||
service: str,
|
||||
object_path: str,
|
||||
interface: str,
|
||||
ctx: Context,
|
||||
) -> str:
|
||||
"""Read all properties on a D-Bus interface.
|
||||
|
||||
Args:
|
||||
bus: "session" or "system"
|
||||
service: service name
|
||||
object_path: object path
|
||||
interface: interface to read properties from
|
||||
"""
|
||||
mgr = _get_mgr(ctx)
|
||||
connection = await mgr.get_bus(bus)
|
||||
|
||||
result = await call_bus_method(
|
||||
connection,
|
||||
destination=service,
|
||||
path=object_path,
|
||||
interface="org.freedesktop.DBus.Properties",
|
||||
member="GetAll",
|
||||
signature="s",
|
||||
body=[interface],
|
||||
)
|
||||
|
||||
if not result or not result[0]:
|
||||
return "No properties found."
|
||||
|
||||
props = result[0]
|
||||
lines = [f"## Properties of `{interface}` at `{object_path}`\n"]
|
||||
lines.append("| Property | Value |")
|
||||
lines.append("|----------|-------|")
|
||||
for name, value in sorted(props.items()):
|
||||
val_str = json.dumps(value, default=str)
|
||||
if len(val_str) > 80:
|
||||
val_str = val_str[:77] + "..."
|
||||
lines.append(f"| `{name}` | `{val_str}` |")
|
||||
|
||||
return "\n".join(lines)
|
||||
37
src/mcdbus/_prompts.py
Normal file
37
src/mcdbus/_prompts.py
Normal file
@ -0,0 +1,37 @@
|
||||
"""Prompt templates for guided D-Bus exploration."""
|
||||
|
||||
from mcdbus._state import mcp
|
||||
|
||||
|
||||
@mcp.prompt()
|
||||
def explore_service(bus: str = "session", service: str = "") -> str:
|
||||
"""Guide through exploring a D-Bus service step by step."""
|
||||
if service:
|
||||
return (
|
||||
f"Explore the D-Bus {bus} bus service: {service}\n\n"
|
||||
f'1. Use list_objects(bus="{bus}", service="{service}") to see the object tree\n'
|
||||
f"2. Use introspect() on interesting objects to see methods/properties/signals\n"
|
||||
f"3. Use get_all_properties() to read current state\n"
|
||||
f"4. Use call_method() to interact with methods\n"
|
||||
)
|
||||
return (
|
||||
f"Explore the D-Bus {bus} bus:\n\n"
|
||||
f'1. Start with list_services(bus="{bus}") to see available services\n'
|
||||
f"2. Pick a service and use list_objects() to see its object tree\n"
|
||||
f"3. Use introspect() on interesting objects to see methods/properties/signals\n"
|
||||
f"4. Use get_all_properties() to read current state\n"
|
||||
f"5. Use call_method() to interact with methods\n"
|
||||
)
|
||||
|
||||
|
||||
@mcp.prompt()
|
||||
def debug_service(service: str, bus: str = "session") -> str:
|
||||
"""Diagnose issues with a D-Bus service."""
|
||||
return (
|
||||
f"Debug the D-Bus service: {service} on the {bus} bus\n\n"
|
||||
f'1. Check if the service is registered: list_services(bus="{bus}")\n'
|
||||
f'2. Introspect root: introspect(bus="{bus}", service="{service}", object_path="/")\n'
|
||||
f"3. Walk the object tree to find all endpoints\n"
|
||||
f"4. Read all properties on key interfaces to check state\n"
|
||||
f"5. Try calling Ping on org.freedesktop.DBus.Peer to verify responsiveness\n"
|
||||
)
|
||||
67
src/mcdbus/_resources.py
Normal file
67
src/mcdbus/_resources.py
Normal file
@ -0,0 +1,67 @@
|
||||
"""Dynamic MCP resources for browsing D-Bus services."""
|
||||
|
||||
from urllib.parse import unquote
|
||||
|
||||
from mcdbus._bus import BusManager, call_bus_method
|
||||
from mcdbus._state import mcp
|
||||
|
||||
|
||||
@mcp.resource("dbus://{bus}/services")
|
||||
async def bus_services(bus: str) -> str:
|
||||
"""Live list of well-known service names on a D-Bus bus."""
|
||||
# Resources don't get Context — we need a fresh connection
|
||||
mgr = BusManager()
|
||||
try:
|
||||
connection = await mgr.get_bus(bus)
|
||||
result = await call_bus_method(
|
||||
connection,
|
||||
destination="org.freedesktop.DBus",
|
||||
path="/org/freedesktop/DBus",
|
||||
interface="org.freedesktop.DBus",
|
||||
member="ListNames",
|
||||
)
|
||||
names = sorted(n for n in (result[0] if result else []) if not n.startswith(":"))
|
||||
return "\n".join(names)
|
||||
finally:
|
||||
await mgr.disconnect_all()
|
||||
|
||||
|
||||
@mcp.resource("dbus://{bus}/{service}/objects")
|
||||
async def service_objects(bus: str, service: str) -> str:
|
||||
"""Object tree for a D-Bus service."""
|
||||
mgr = BusManager()
|
||||
try:
|
||||
connection = await mgr.get_bus(bus)
|
||||
paths: list[str] = []
|
||||
|
||||
async def _walk(path: str) -> None:
|
||||
try:
|
||||
node = await connection.introspect(service, path)
|
||||
paths.append(path)
|
||||
for child in node.nodes:
|
||||
child_path = path.rstrip("/") + "/" + child.name
|
||||
await _walk(child_path)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
await _walk("/")
|
||||
return "\n".join(sorted(paths))
|
||||
finally:
|
||||
await mgr.disconnect_all()
|
||||
|
||||
|
||||
@mcp.resource("dbus://{bus}/{service}/{path}/interfaces")
|
||||
async def object_interfaces(bus: str, service: str, path: str) -> str:
|
||||
"""Interfaces available at a D-Bus object path."""
|
||||
# path comes URL-encoded (slashes → %2F), decode it
|
||||
decoded_path = unquote(path)
|
||||
if not decoded_path.startswith("/"):
|
||||
decoded_path = "/" + decoded_path
|
||||
|
||||
mgr = BusManager()
|
||||
try:
|
||||
connection = await mgr.get_bus(bus)
|
||||
node = await connection.introspect(service, decoded_path)
|
||||
return "\n".join(i.name for i in node.interfaces)
|
||||
finally:
|
||||
await mgr.disconnect_all()
|
||||
185
src/mcdbus/_shortcuts.py
Normal file
185
src/mcdbus/_shortcuts.py
Normal file
@ -0,0 +1,185 @@
|
||||
"""High-level convenience tools for common D-Bus operations."""
|
||||
|
||||
from fastmcp import Context
|
||||
|
||||
from mcdbus._bus import BusManager, call_bus_method
|
||||
from mcdbus._state import mcp
|
||||
|
||||
|
||||
def _get_mgr(ctx: Context) -> BusManager:
|
||||
return ctx.request_context.lifespan_context
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def send_notification(
|
||||
summary: str,
|
||||
ctx: Context,
|
||||
body: str = "",
|
||||
icon: str = "",
|
||||
timeout: int = 5000,
|
||||
) -> str:
|
||||
"""Send a desktop notification via D-Bus.
|
||||
|
||||
Args:
|
||||
summary: notification title
|
||||
body: notification body text
|
||||
icon: icon name or path (empty for default)
|
||||
timeout: display duration in milliseconds (default 5000)
|
||||
"""
|
||||
mgr = _get_mgr(ctx)
|
||||
connection = await mgr.get_bus("session")
|
||||
|
||||
result = await call_bus_method(
|
||||
connection,
|
||||
destination="org.freedesktop.Notifications",
|
||||
path="/org/freedesktop/Notifications",
|
||||
interface="org.freedesktop.Notifications",
|
||||
member="Notify",
|
||||
signature="susssasa{sv}i",
|
||||
body=[
|
||||
"mcdbus", # app_name
|
||||
0, # replaces_id (0 = new)
|
||||
icon, # app_icon
|
||||
summary, # summary
|
||||
body, # body
|
||||
[], # actions
|
||||
{}, # hints
|
||||
timeout, # expire_timeout
|
||||
],
|
||||
)
|
||||
|
||||
nid = result[0] if result else "unknown"
|
||||
return f"Notification sent (id: {nid})"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def list_systemd_units(
|
||||
ctx: Context,
|
||||
bus: str = "system",
|
||||
pattern: str = "",
|
||||
) -> str:
|
||||
"""List systemd units, optionally filtered by glob pattern.
|
||||
|
||||
Args:
|
||||
bus: "session" (user units) or "system" (system units, default)
|
||||
pattern: optional glob filter (e.g. "docker*", "*.service")
|
||||
"""
|
||||
mgr = _get_mgr(ctx)
|
||||
connection = await mgr.get_bus(bus)
|
||||
|
||||
result = await call_bus_method(
|
||||
connection,
|
||||
destination="org.freedesktop.systemd1",
|
||||
path="/org/freedesktop/systemd1",
|
||||
interface="org.freedesktop.systemd1.Manager",
|
||||
member="ListUnits",
|
||||
)
|
||||
|
||||
if not result or not result[0]:
|
||||
return "No units found."
|
||||
|
||||
units = result[0]
|
||||
|
||||
# Each unit is (name, description, load_state, active_state, sub_state, ...)
|
||||
import fnmatch
|
||||
if pattern:
|
||||
units = [u for u in units if fnmatch.fnmatch(u[0], pattern)]
|
||||
|
||||
lines = [f"## Systemd units on {bus} bus — {len(units)} matches\n"]
|
||||
lines.append("| Unit | Load | Active | Sub | Description |")
|
||||
lines.append("|------|------|--------|-----|-------------|")
|
||||
for unit in sorted(units, key=lambda u: u[0]):
|
||||
name, desc, load, active, sub = unit[0], unit[1], unit[2], unit[3], unit[4]
|
||||
lines.append(f"| `{name}` | {load} | {active} | {sub} | {desc} |")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def media_player_control(
|
||||
action: str,
|
||||
ctx: Context,
|
||||
player: str = "",
|
||||
) -> str:
|
||||
"""Control an MPRIS2 media player.
|
||||
|
||||
Args:
|
||||
action: one of "play", "pause", "next", "previous", "stop", "play-pause"
|
||||
player: MPRIS player name (auto-discovers first player if empty)
|
||||
"""
|
||||
mgr = _get_mgr(ctx)
|
||||
connection = await mgr.get_bus("session")
|
||||
|
||||
if not player:
|
||||
# Discover MPRIS players
|
||||
result = await call_bus_method(
|
||||
connection,
|
||||
destination="org.freedesktop.DBus",
|
||||
path="/org/freedesktop/DBus",
|
||||
interface="org.freedesktop.DBus",
|
||||
member="ListNames",
|
||||
)
|
||||
names = result[0] if result else []
|
||||
mpris_services = [n for n in names if n.startswith("org.mpris.MediaPlayer2.")]
|
||||
|
||||
if not mpris_services:
|
||||
return "No MPRIS media players found on session bus."
|
||||
player = mpris_services[0]
|
||||
await ctx.info(f"Auto-discovered player: {player}")
|
||||
|
||||
action_map = {
|
||||
"play": "Play",
|
||||
"pause": "Pause",
|
||||
"stop": "Stop",
|
||||
"next": "Next",
|
||||
"previous": "Previous",
|
||||
"play-pause": "PlayPause",
|
||||
}
|
||||
|
||||
dbus_method = action_map.get(action.lower())
|
||||
if not dbus_method:
|
||||
return f"Unknown action: {action}. Use: {', '.join(action_map.keys())}"
|
||||
|
||||
await call_bus_method(
|
||||
connection,
|
||||
destination=player,
|
||||
path="/org/mpris/MediaPlayer2",
|
||||
interface="org.mpris.MediaPlayer2.Player",
|
||||
member=dbus_method,
|
||||
)
|
||||
|
||||
# Read current playback status
|
||||
try:
|
||||
status_result = await call_bus_method(
|
||||
connection,
|
||||
destination=player,
|
||||
path="/org/mpris/MediaPlayer2",
|
||||
interface="org.freedesktop.DBus.Properties",
|
||||
member="Get",
|
||||
signature="ss",
|
||||
body=["org.mpris.MediaPlayer2.Player", "PlaybackStatus"],
|
||||
)
|
||||
status = status_result[0] if status_result else "unknown"
|
||||
except Exception:
|
||||
status = "unknown"
|
||||
|
||||
# Try to get current track metadata
|
||||
try:
|
||||
meta_result = await call_bus_method(
|
||||
connection,
|
||||
destination=player,
|
||||
path="/org/mpris/MediaPlayer2",
|
||||
interface="org.freedesktop.DBus.Properties",
|
||||
member="Get",
|
||||
signature="ss",
|
||||
body=["org.mpris.MediaPlayer2.Player", "Metadata"],
|
||||
)
|
||||
metadata = meta_result[0] if meta_result else {}
|
||||
title = metadata.get("xesam:title", "Unknown")
|
||||
artist = metadata.get("xesam:artist", ["Unknown"])
|
||||
if isinstance(artist, list):
|
||||
artist = ", ".join(artist)
|
||||
except Exception:
|
||||
title, artist = "Unknown", "Unknown"
|
||||
|
||||
return f"Player: {player}\nAction: {action}\nStatus: {status}\nNow: {artist} — {title}"
|
||||
38
src/mcdbus/_state.py
Normal file
38
src/mcdbus/_state.py
Normal file
@ -0,0 +1,38 @@
|
||||
"""FastMCP server instance and lifespan management."""
|
||||
|
||||
import sys
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from fastmcp import FastMCP
|
||||
|
||||
from mcdbus._bus import BusManager
|
||||
|
||||
INSTRUCTIONS = """\
|
||||
D-Bus bridge for Linux IPC. Discover and interact with session and system bus \
|
||||
services through introspection.
|
||||
|
||||
Workflow: list_services -> list_objects -> introspect -> call_method / get_property
|
||||
|
||||
Two buses are available:
|
||||
- "session" — user desktop services (notifications, media players, KDE, portals)
|
||||
- "system" — system services (systemd, NetworkManager, UDisks2, UPower, bluez)
|
||||
"""
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(server: FastMCP):
|
||||
"""Manage BusManager lifecycle — connections are lazy but cleaned up on shutdown."""
|
||||
mgr = BusManager()
|
||||
print("mcdbus: D-Bus MCP server starting", file=sys.stderr)
|
||||
try:
|
||||
yield mgr
|
||||
finally:
|
||||
await mgr.disconnect_all()
|
||||
print("mcdbus: D-Bus connections closed", file=sys.stderr)
|
||||
|
||||
|
||||
mcp = FastMCP(
|
||||
name="mcdbus",
|
||||
instructions=INSTRUCTIONS,
|
||||
lifespan=lifespan,
|
||||
)
|
||||
12
src/mcdbus/server.py
Normal file
12
src/mcdbus/server.py
Normal file
@ -0,0 +1,12 @@
|
||||
"""mcdbus entry point — D-Bus MCP server."""
|
||||
|
||||
import mcdbus._discovery # noqa: F401
|
||||
import mcdbus._interaction # noqa: F401
|
||||
import mcdbus._prompts # noqa: F401
|
||||
import mcdbus._resources # noqa: F401
|
||||
import mcdbus._shortcuts # noqa: F401
|
||||
from mcdbus._state import mcp # noqa: F401
|
||||
|
||||
|
||||
def main():
|
||||
mcp.run()
|
||||
19
tests/conftest.py
Normal file
19
tests/conftest.py
Normal file
@ -0,0 +1,19 @@
|
||||
"""Test fixtures for mcdbus."""
|
||||
|
||||
import pytest
|
||||
|
||||
from mcdbus._bus import BusManager
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def bus_manager():
|
||||
"""Provide a BusManager instance, cleaned up after test."""
|
||||
mgr = BusManager()
|
||||
yield mgr
|
||||
await mgr.disconnect_all()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def session_bus(bus_manager: BusManager):
|
||||
"""Provide a connected session bus."""
|
||||
return await bus_manager.get_bus("session")
|
||||
73
tests/test_bus.py
Normal file
73
tests/test_bus.py
Normal file
@ -0,0 +1,73 @@
|
||||
"""Tests for BusManager and serialization utilities."""
|
||||
|
||||
import pytest
|
||||
|
||||
from mcdbus._bus import BusManager, deserialize_args, serialize_variant
|
||||
|
||||
|
||||
class TestBusManager:
|
||||
async def test_connect_session(self, bus_manager: BusManager):
|
||||
bus = await bus_manager.get_bus("session")
|
||||
assert bus.connected
|
||||
|
||||
async def test_connect_system(self, bus_manager: BusManager):
|
||||
bus = await bus_manager.get_bus("system")
|
||||
assert bus.connected
|
||||
|
||||
async def test_cached_connection(self, bus_manager: BusManager):
|
||||
bus1 = await bus_manager.get_bus("session")
|
||||
bus2 = await bus_manager.get_bus("session")
|
||||
assert bus1 is bus2
|
||||
|
||||
async def test_invalid_bus_type(self, bus_manager: BusManager):
|
||||
with pytest.raises(ValueError, match="must be 'session' or 'system'"):
|
||||
await bus_manager.get_bus("invalid")
|
||||
|
||||
async def test_disconnect_all(self, bus_manager: BusManager):
|
||||
bus = await bus_manager.get_bus("session")
|
||||
assert bus.connected
|
||||
await bus_manager.disconnect_all()
|
||||
assert not bus.connected
|
||||
|
||||
|
||||
class TestSerializeVariant:
|
||||
def test_plain_values(self):
|
||||
assert serialize_variant("hello") == "hello"
|
||||
assert serialize_variant(42) == 42
|
||||
assert serialize_variant(True) is True
|
||||
|
||||
def test_list(self):
|
||||
assert serialize_variant([1, 2, 3]) == [1, 2, 3]
|
||||
|
||||
def test_dict(self):
|
||||
assert serialize_variant({"a": 1}) == {"a": 1}
|
||||
|
||||
def test_nested(self):
|
||||
result = serialize_variant({"a": [1, {"b": "c"}]})
|
||||
assert result == {"a": [1, {"b": "c"}]}
|
||||
|
||||
def test_bytes_utf8(self):
|
||||
assert serialize_variant(b"hello") == "hello"
|
||||
|
||||
def test_bytes_binary(self):
|
||||
result = serialize_variant(b"\xff\xfe")
|
||||
assert result == [255, 254]
|
||||
|
||||
|
||||
class TestDeserializeArgs:
|
||||
def test_empty(self):
|
||||
assert deserialize_args("", "") == []
|
||||
assert deserialize_args("[]", "") == []
|
||||
assert deserialize_args("null", "") == []
|
||||
|
||||
def test_simple_args(self):
|
||||
result = deserialize_args('["hello", 42]', "su")
|
||||
assert result == ["hello", 42]
|
||||
|
||||
def test_no_signature(self):
|
||||
result = deserialize_args('["hello"]', "")
|
||||
assert result == ["hello"]
|
||||
|
||||
def test_single_value_wraps_in_list(self):
|
||||
result = deserialize_args('"hello"', "s")
|
||||
assert result == ["hello"]
|
||||
70
tests/test_discovery.py
Normal file
70
tests/test_discovery.py
Normal file
@ -0,0 +1,70 @@
|
||||
"""Tests for discovery tools — requires a running session bus."""
|
||||
|
||||
from mcdbus._bus import BusManager, call_bus_method
|
||||
|
||||
|
||||
async def test_list_names_raw(bus_manager: BusManager):
|
||||
"""Verify we can list service names on the session bus via raw D-Bus call."""
|
||||
bus = await bus_manager.get_bus("session")
|
||||
result = await call_bus_method(
|
||||
bus,
|
||||
destination="org.freedesktop.DBus",
|
||||
path="/org/freedesktop/DBus",
|
||||
interface="org.freedesktop.DBus",
|
||||
member="ListNames",
|
||||
)
|
||||
names = result[0]
|
||||
assert isinstance(names, list)
|
||||
assert "org.freedesktop.DBus" in names
|
||||
assert len(names) > 1
|
||||
|
||||
|
||||
async def test_introspect_raw(bus_manager: BusManager):
|
||||
"""Verify introspection returns parseable interface data."""
|
||||
bus = await bus_manager.get_bus("session")
|
||||
node = await bus.introspect("org.freedesktop.DBus", "/org/freedesktop/DBus")
|
||||
iface_names = [i.name for i in node.interfaces]
|
||||
assert "org.freedesktop.DBus" in iface_names
|
||||
# Should have standard interfaces too
|
||||
assert "org.freedesktop.DBus.Introspectable" in iface_names
|
||||
|
||||
|
||||
async def test_introspect_methods(bus_manager: BusManager):
|
||||
"""Verify we can see method signatures from introspection."""
|
||||
bus = await bus_manager.get_bus("session")
|
||||
node = await bus.introspect("org.freedesktop.DBus", "/org/freedesktop/DBus")
|
||||
dbus_iface = next(i for i in node.interfaces if i.name == "org.freedesktop.DBus")
|
||||
method_names = [m.name for m in dbus_iface.methods]
|
||||
assert "ListNames" in method_names
|
||||
assert "GetId" in method_names
|
||||
|
||||
|
||||
async def test_walk_object_tree(bus_manager: BusManager):
|
||||
"""Walk the DBus daemon object tree — should have at least root."""
|
||||
bus = await bus_manager.get_bus("session")
|
||||
node = await bus.introspect("org.freedesktop.DBus", "/")
|
||||
|
||||
# The root should exist and may have child nodes
|
||||
assert node is not None
|
||||
# DBus daemon typically has /org/freedesktop/DBus
|
||||
child_names = [c.name for c in node.nodes]
|
||||
assert len(child_names) >= 0 # May be empty at root, that's ok
|
||||
|
||||
|
||||
async def test_list_names_filters_unique(bus_manager: BusManager):
|
||||
"""Verify unique names (:1.xx) can be filtered out."""
|
||||
bus = await bus_manager.get_bus("session")
|
||||
result = await call_bus_method(
|
||||
bus,
|
||||
destination="org.freedesktop.DBus",
|
||||
path="/org/freedesktop/DBus",
|
||||
interface="org.freedesktop.DBus",
|
||||
member="ListNames",
|
||||
)
|
||||
all_names = result[0]
|
||||
well_known = [n for n in all_names if not n.startswith(":")]
|
||||
unique = [n for n in all_names if n.startswith(":")]
|
||||
|
||||
assert len(well_known) > 0
|
||||
assert len(unique) > 0 # Should have at least one unique connection
|
||||
assert "org.freedesktop.DBus" in well_known
|
||||
78
tests/test_interaction.py
Normal file
78
tests/test_interaction.py
Normal file
@ -0,0 +1,78 @@
|
||||
"""Tests for interaction tools — requires a running session bus."""
|
||||
|
||||
from mcdbus._bus import BusManager, call_bus_method
|
||||
|
||||
|
||||
async def test_ping(bus_manager: BusManager):
|
||||
"""Ping the DBus daemon — universal no-arg method."""
|
||||
bus = await bus_manager.get_bus("session")
|
||||
result = await call_bus_method(
|
||||
bus,
|
||||
destination="org.freedesktop.DBus",
|
||||
path="/org/freedesktop/DBus",
|
||||
interface="org.freedesktop.DBus.Peer",
|
||||
member="Ping",
|
||||
)
|
||||
# Ping returns void (None)
|
||||
assert result is None
|
||||
|
||||
|
||||
async def test_get_id(bus_manager: BusManager):
|
||||
"""GetId returns the bus unique ID as a hex string."""
|
||||
bus = await bus_manager.get_bus("session")
|
||||
result = await call_bus_method(
|
||||
bus,
|
||||
destination="org.freedesktop.DBus",
|
||||
path="/org/freedesktop/DBus",
|
||||
interface="org.freedesktop.DBus",
|
||||
member="GetId",
|
||||
)
|
||||
bus_id = result[0]
|
||||
assert isinstance(bus_id, str)
|
||||
assert len(bus_id) > 10
|
||||
|
||||
|
||||
async def test_get_name_owner(bus_manager: BusManager):
|
||||
"""GetNameOwner returns the unique connection for a well-known name."""
|
||||
bus = await bus_manager.get_bus("session")
|
||||
result = await call_bus_method(
|
||||
bus,
|
||||
destination="org.freedesktop.DBus",
|
||||
path="/org/freedesktop/DBus",
|
||||
interface="org.freedesktop.DBus",
|
||||
member="GetNameOwner",
|
||||
signature="s",
|
||||
body=["org.freedesktop.DBus"],
|
||||
)
|
||||
owner = result[0]
|
||||
assert isinstance(owner, str)
|
||||
assert len(owner) > 0
|
||||
|
||||
|
||||
async def test_method_call_error(bus_manager: BusManager):
|
||||
"""Calling a non-existent method should raise RuntimeError."""
|
||||
import pytest
|
||||
|
||||
bus = await bus_manager.get_bus("session")
|
||||
with pytest.raises(RuntimeError, match="D-Bus error"):
|
||||
await call_bus_method(
|
||||
bus,
|
||||
destination="org.freedesktop.DBus",
|
||||
path="/org/freedesktop/DBus",
|
||||
interface="org.freedesktop.DBus",
|
||||
member="NonExistentMethod",
|
||||
)
|
||||
|
||||
|
||||
async def test_system_bus_accessible(bus_manager: BusManager):
|
||||
"""Verify system bus is accessible and can list units (if systemd is running)."""
|
||||
bus = await bus_manager.get_bus("system")
|
||||
result = await call_bus_method(
|
||||
bus,
|
||||
destination="org.freedesktop.DBus",
|
||||
path="/org/freedesktop/DBus",
|
||||
interface="org.freedesktop.DBus",
|
||||
member="ListNames",
|
||||
)
|
||||
names = result[0]
|
||||
assert "org.freedesktop.DBus" in names
|
||||
Loading…
x
Reference in New Issue
Block a user