Bridge Linux D-Bus IPC into MCP with tools for service discovery (list_services, introspect, list_objects), method calls and property access, plus convenience shortcuts for notifications, systemd units, and MPRIS media player control. All 25 tests passing.
79 lines
2.4 KiB
Python
79 lines
2.4 KiB
Python
"""Tests for interaction tools — requires a running session bus."""
|
|
|
|
from mcdbus._bus import BusManager, call_bus_method
|
|
|
|
|
|
async def test_ping(bus_manager: BusManager):
|
|
"""Ping the DBus daemon — universal no-arg method."""
|
|
bus = await bus_manager.get_bus("session")
|
|
result = await call_bus_method(
|
|
bus,
|
|
destination="org.freedesktop.DBus",
|
|
path="/org/freedesktop/DBus",
|
|
interface="org.freedesktop.DBus.Peer",
|
|
member="Ping",
|
|
)
|
|
# Ping returns void (None)
|
|
assert result is None
|
|
|
|
|
|
async def test_get_id(bus_manager: BusManager):
|
|
"""GetId returns the bus unique ID as a hex string."""
|
|
bus = await bus_manager.get_bus("session")
|
|
result = await call_bus_method(
|
|
bus,
|
|
destination="org.freedesktop.DBus",
|
|
path="/org/freedesktop/DBus",
|
|
interface="org.freedesktop.DBus",
|
|
member="GetId",
|
|
)
|
|
bus_id = result[0]
|
|
assert isinstance(bus_id, str)
|
|
assert len(bus_id) > 10
|
|
|
|
|
|
async def test_get_name_owner(bus_manager: BusManager):
|
|
"""GetNameOwner returns the unique connection for a well-known name."""
|
|
bus = await bus_manager.get_bus("session")
|
|
result = await call_bus_method(
|
|
bus,
|
|
destination="org.freedesktop.DBus",
|
|
path="/org/freedesktop/DBus",
|
|
interface="org.freedesktop.DBus",
|
|
member="GetNameOwner",
|
|
signature="s",
|
|
body=["org.freedesktop.DBus"],
|
|
)
|
|
owner = result[0]
|
|
assert isinstance(owner, str)
|
|
assert len(owner) > 0
|
|
|
|
|
|
async def test_method_call_error(bus_manager: BusManager):
|
|
"""Calling a non-existent method should raise RuntimeError."""
|
|
import pytest
|
|
|
|
bus = await bus_manager.get_bus("session")
|
|
with pytest.raises(RuntimeError, match="D-Bus error"):
|
|
await call_bus_method(
|
|
bus,
|
|
destination="org.freedesktop.DBus",
|
|
path="/org/freedesktop/DBus",
|
|
interface="org.freedesktop.DBus",
|
|
member="NonExistentMethod",
|
|
)
|
|
|
|
|
|
async def test_system_bus_accessible(bus_manager: BusManager):
|
|
"""Verify system bus is accessible and can list units (if systemd is running)."""
|
|
bus = await bus_manager.get_bus("system")
|
|
result = await call_bus_method(
|
|
bus,
|
|
destination="org.freedesktop.DBus",
|
|
path="/org/freedesktop/DBus",
|
|
interface="org.freedesktop.DBus",
|
|
member="ListNames",
|
|
)
|
|
names = result[0]
|
|
assert "org.freedesktop.DBus" in names
|