mcdbus/tests/test_interaction.py
Ryan Malloy 4d7b73f6ee Implement D-Bus MCP server with introspection-first discovery
Bridge Linux D-Bus IPC into MCP with tools for service discovery
(list_services, introspect, list_objects), method calls and property
access, plus convenience shortcuts for notifications, systemd units,
and MPRIS media player control. All 25 tests passing.
2026-03-05 20:20:55 -07:00

79 lines
2.4 KiB
Python

"""Tests for interaction tools — requires a running session bus."""
from mcdbus._bus import BusManager, call_bus_method
async def test_ping(bus_manager: BusManager):
"""Ping the DBus daemon — universal no-arg method."""
bus = await bus_manager.get_bus("session")
result = await call_bus_method(
bus,
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus.Peer",
member="Ping",
)
# Ping returns void (None)
assert result is None
async def test_get_id(bus_manager: BusManager):
"""GetId returns the bus unique ID as a hex string."""
bus = await bus_manager.get_bus("session")
result = await call_bus_method(
bus,
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus",
member="GetId",
)
bus_id = result[0]
assert isinstance(bus_id, str)
assert len(bus_id) > 10
async def test_get_name_owner(bus_manager: BusManager):
"""GetNameOwner returns the unique connection for a well-known name."""
bus = await bus_manager.get_bus("session")
result = await call_bus_method(
bus,
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus",
member="GetNameOwner",
signature="s",
body=["org.freedesktop.DBus"],
)
owner = result[0]
assert isinstance(owner, str)
assert len(owner) > 0
async def test_method_call_error(bus_manager: BusManager):
"""Calling a non-existent method should raise RuntimeError."""
import pytest
bus = await bus_manager.get_bus("session")
with pytest.raises(RuntimeError, match="D-Bus error"):
await call_bus_method(
bus,
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus",
member="NonExistentMethod",
)
async def test_system_bus_accessible(bus_manager: BusManager):
"""Verify system bus is accessible and can list units (if systemd is running)."""
bus = await bus_manager.get_bus("system")
result = await call_bus_method(
bus,
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus",
member="ListNames",
)
names = result[0]
assert "org.freedesktop.DBus" in names