Bridge Linux D-Bus IPC into MCP with tools for service discovery (list_services, introspect, list_objects), method calls and property access, plus convenience shortcuts for notifications, systemd units, and MPRIS media player control. All 25 tests passing.
71 lines
2.7 KiB
Python
71 lines
2.7 KiB
Python
"""Tests for discovery tools — requires a running session bus."""
|
|
|
|
from mcdbus._bus import BusManager, call_bus_method
|
|
|
|
|
|
async def test_list_names_raw(bus_manager: BusManager):
|
|
"""Verify we can list service names on the session bus via raw D-Bus call."""
|
|
bus = await bus_manager.get_bus("session")
|
|
result = await call_bus_method(
|
|
bus,
|
|
destination="org.freedesktop.DBus",
|
|
path="/org/freedesktop/DBus",
|
|
interface="org.freedesktop.DBus",
|
|
member="ListNames",
|
|
)
|
|
names = result[0]
|
|
assert isinstance(names, list)
|
|
assert "org.freedesktop.DBus" in names
|
|
assert len(names) > 1
|
|
|
|
|
|
async def test_introspect_raw(bus_manager: BusManager):
|
|
"""Verify introspection returns parseable interface data."""
|
|
bus = await bus_manager.get_bus("session")
|
|
node = await bus.introspect("org.freedesktop.DBus", "/org/freedesktop/DBus")
|
|
iface_names = [i.name for i in node.interfaces]
|
|
assert "org.freedesktop.DBus" in iface_names
|
|
# Should have standard interfaces too
|
|
assert "org.freedesktop.DBus.Introspectable" in iface_names
|
|
|
|
|
|
async def test_introspect_methods(bus_manager: BusManager):
|
|
"""Verify we can see method signatures from introspection."""
|
|
bus = await bus_manager.get_bus("session")
|
|
node = await bus.introspect("org.freedesktop.DBus", "/org/freedesktop/DBus")
|
|
dbus_iface = next(i for i in node.interfaces if i.name == "org.freedesktop.DBus")
|
|
method_names = [m.name for m in dbus_iface.methods]
|
|
assert "ListNames" in method_names
|
|
assert "GetId" in method_names
|
|
|
|
|
|
async def test_walk_object_tree(bus_manager: BusManager):
|
|
"""Walk the DBus daemon object tree — should have at least root."""
|
|
bus = await bus_manager.get_bus("session")
|
|
node = await bus.introspect("org.freedesktop.DBus", "/")
|
|
|
|
# The root should exist and may have child nodes
|
|
assert node is not None
|
|
# DBus daemon typically has /org/freedesktop/DBus
|
|
child_names = [c.name for c in node.nodes]
|
|
assert len(child_names) >= 0 # May be empty at root, that's ok
|
|
|
|
|
|
async def test_list_names_filters_unique(bus_manager: BusManager):
|
|
"""Verify unique names (:1.xx) can be filtered out."""
|
|
bus = await bus_manager.get_bus("session")
|
|
result = await call_bus_method(
|
|
bus,
|
|
destination="org.freedesktop.DBus",
|
|
path="/org/freedesktop/DBus",
|
|
interface="org.freedesktop.DBus",
|
|
member="ListNames",
|
|
)
|
|
all_names = result[0]
|
|
well_known = [n for n in all_names if not n.startswith(":")]
|
|
unique = [n for n in all_names if n.startswith(":")]
|
|
|
|
assert len(well_known) > 0
|
|
assert len(unique) > 0 # Should have at least one unique connection
|
|
assert "org.freedesktop.DBus" in well_known
|