Today, mcaxl is read-only against CUCM by *absence* — the tools
never call write methods. But absence isn't enforced: a future
contributor adding a tool could write
self._service.addRoutePartition(...) and zeep would happily
dispatch it. There's no positive guard.
Two new chokepoints close that gap:
AXL side — _ReadOnlyServiceProxy wraps the zeep service object.
__getattr__ refuses any method outside _ALLOWED_AXL_METHODS
(currently {getCCMVersion, executeSQLQuery}) with a new
ReadOnlyViolation exception, raised at attribute lookup BEFORE
zeep serializes a SOAP envelope. Underscore-prefixed and dunder
attributes pass through (zeep introspects via _binding_options,
__class__, etc., and those don't dispatch SOAP).
RisPort side — RisPort70 envelopes are hand-rolled, so the proxy
pattern doesn't apply directly. The equivalent chokepoint lives in
the envelope builders: _check_operation_allowed(name) is the first
line of every builder, and _ALLOWED_RISPORT_OPERATIONS is the
allowlist (currently {selectCmDevice}).
Operators can verify the proxy is active via the health tool —
connection_status() now reports read_only_proxy: true and
allowed_axl_methods: [...].
Tests:
- new tests/test_readonly_proxy.py (13 tests):
* allowed methods dispatch through to inner service
* 9 parameterized refusals (addRoutePartition, updatePhone,
removeUser, applyPhone, resetPhone, restartPhone,
executeSQLUpdate, doDeviceLogin, wipePhone)
* allowlist drift detection (set must be exactly what we
advertise — accidental widening fails red)
* dunder + underscore-prefixed passthrough
- tests/test_risport.py: +TestReadOnlyAllowlist (7 tests):
* selectCmDevice passes _check_operation_allowed
* 6 parameterized refusals (addCmDevice, removeCmDevice,
resetDevice, restartDevice, applyCmDevice, executeSQLUpdate)
* allowlist drift detection
182 tests pass total (was 161; +13 proxy + 7 risport + 1 allowlist
drift catch).
257 lines
8.9 KiB
Python
257 lines
8.9 KiB
Python
"""Unit tests for the RisPort70 SOAP envelope construction and parser.
|
|
|
|
Live-cluster integration is verified separately via the smoke-test
|
|
script — these tests are pure: envelope shape, response-XML parsing,
|
|
edge cases. No network.
|
|
"""
|
|
|
|
import xml.etree.ElementTree as ET
|
|
|
|
import pytest
|
|
|
|
from mcaxl.client import ReadOnlyViolation
|
|
from mcaxl.risport import (
|
|
DEVICE_STATUS_VALUES,
|
|
_ALLOWED_RISPORT_OPERATIONS,
|
|
RisPortClient,
|
|
_build_select_envelope,
|
|
_check_operation_allowed,
|
|
_escape_xml,
|
|
_parse_response,
|
|
)
|
|
|
|
|
|
class TestEscapeXml:
|
|
def test_basic_escapes(self):
|
|
assert _escape_xml("<bad>") == "<bad>"
|
|
assert _escape_xml("a&b") == "a&b"
|
|
assert _escape_xml('"x"') == ""x""
|
|
assert _escape_xml("'y'") == "'y'"
|
|
|
|
def test_passthrough_safe_text(self):
|
|
assert _escape_xml("Phone-1234") == "Phone-1234"
|
|
|
|
|
|
class TestSelectEnvelope:
|
|
def test_envelope_is_well_formed_xml(self):
|
|
env = _build_select_envelope()
|
|
# Must parse — if not, RisPort will reject it
|
|
root = ET.fromstring(env)
|
|
assert root is not None
|
|
|
|
def test_default_envelope_includes_required_fields(self):
|
|
env = _build_select_envelope()
|
|
# Cisco's WSDL requires every CmSelectionCriteria child
|
|
for required in [
|
|
"MaxReturnedDevices",
|
|
"DeviceClass",
|
|
"Model",
|
|
"Status",
|
|
"NodeName",
|
|
"SelectBy",
|
|
"SelectItems",
|
|
"Protocol",
|
|
"DownloadStatus",
|
|
]:
|
|
assert f"<soap:{required}>" in env, (
|
|
f"missing required CmSelectionCriteria field <soap:{required}>"
|
|
)
|
|
|
|
def test_state_info_threaded_into_envelope(self):
|
|
env = _build_select_envelope(state_info="cursor-abc-123")
|
|
assert "cursor-abc-123" in env
|
|
assert "<soap:StateInfo>cursor-abc-123</soap:StateInfo>" in env
|
|
|
|
def test_select_items_default_wildcard(self):
|
|
env = _build_select_envelope()
|
|
# Default: ['*'] — all devices
|
|
assert "<soap:Item>*</soap:Item>" in env
|
|
|
|
def test_select_items_explicit_list(self):
|
|
env = _build_select_envelope(select_items=["SEP1", "SEP2"])
|
|
assert "<soap:Item>SEP1</soap:Item>" in env
|
|
assert "<soap:Item>SEP2</soap:Item>" in env
|
|
|
|
def test_max_devices_int_coerced(self):
|
|
env = _build_select_envelope(max_devices=500)
|
|
assert "<soap:MaxReturnedDevices>500</soap:MaxReturnedDevices>" in env
|
|
|
|
def test_xml_special_chars_in_filter_escaped(self):
|
|
# SOAP injection defense — single quote and angle bracket must be escaped
|
|
env = _build_select_envelope(select_items=["O'Brien <test>"])
|
|
assert "'" in env
|
|
assert "<" in env
|
|
assert ">" in env
|
|
# The raw chars must NOT appear
|
|
assert "'Brien <test>" not in env
|
|
|
|
|
|
class TestParseResponse:
|
|
# Match CUCM 15's actual response shape: an extra <SelectCmDeviceResult>
|
|
# wrapper inside <selectCmDeviceReturn>. Discovered while live-verifying
|
|
# against cucm-pub.binghammemorial.org 2026-04-26 — the parser must
|
|
# descend through this wrapper.
|
|
SAMPLE_RESPONSE = """<?xml version="1.0"?>
|
|
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
|
|
<soapenv:Body>
|
|
<ns:selectCmDeviceResponse xmlns:ns="http://schemas.cisco.com/ast/soap">
|
|
<selectCmDeviceReturn>
|
|
<SelectCmDeviceResult>
|
|
<TotalDevicesFound>2</TotalDevicesFound>
|
|
<StateInfo></StateInfo>
|
|
<CmNodes>
|
|
<item>
|
|
<Name>cucm-pub</Name>
|
|
<ReturnCode>Ok</ReturnCode>
|
|
<CmDevices>
|
|
<item>
|
|
<Name>SEP1234567890AB</Name>
|
|
<IpAddress><item><IP>10.0.0.5</IP><IPAddrType>ipv4</IPAddrType></item></IpAddress>
|
|
<DirNumber>2001</DirNumber>
|
|
<Status>Registered</Status>
|
|
<StatusReason>0</StatusReason>
|
|
<Protocol>SIP</Protocol>
|
|
<Description>Patient Room 101</Description>
|
|
<Model>Cisco 7841</Model>
|
|
<ActiveLoadID>sip78xx.12-5-1SR4-3</ActiveLoadID>
|
|
<TimeStamp>1700000000</TimeStamp>
|
|
</item>
|
|
<item>
|
|
<Name>SEPABCDEF123456</Name>
|
|
<IpAddress></IpAddress>
|
|
<DirNumber></DirNumber>
|
|
<Status>UnRegistered</Status>
|
|
<StatusReason>1</StatusReason>
|
|
<Description>Decommissioned phone</Description>
|
|
</item>
|
|
</CmDevices>
|
|
</item>
|
|
</CmNodes>
|
|
</SelectCmDeviceResult>
|
|
</selectCmDeviceReturn>
|
|
</ns:selectCmDeviceResponse>
|
|
</soapenv:Body>
|
|
</soapenv:Envelope>"""
|
|
|
|
# Pre-CUCM-15 shape (no SelectCmDeviceResult wrapper). The parser falls
|
|
# back to fields directly under selectCmDeviceReturn for backward compat.
|
|
LEGACY_RESPONSE = """<?xml version="1.0"?>
|
|
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
|
|
<soapenv:Body>
|
|
<ns:selectCmDeviceResponse xmlns:ns="http://schemas.cisco.com/ast/soap">
|
|
<selectCmDeviceReturn>
|
|
<TotalDevicesFound>1</TotalDevicesFound>
|
|
<StateInfo></StateInfo>
|
|
<CmNodes>
|
|
<item>
|
|
<Name>cucm-old</Name>
|
|
<ReturnCode>Ok</ReturnCode>
|
|
<CmDevices>
|
|
<item>
|
|
<Name>SEPLEGACY</Name>
|
|
<Status>Registered</Status>
|
|
</item>
|
|
</CmDevices>
|
|
</item>
|
|
</CmNodes>
|
|
</selectCmDeviceReturn>
|
|
</ns:selectCmDeviceResponse>
|
|
</soapenv:Body>
|
|
</soapenv:Envelope>"""
|
|
|
|
def test_legacy_response_shape_still_parses(self):
|
|
"""Backward compat with pre-CUCM-15 RisPort responses."""
|
|
result = _parse_response(self.LEGACY_RESPONSE)
|
|
assert result["total_devices_found"] == 1
|
|
assert result["cm_nodes"][0]["devices"][0]["name"] == "SEPLEGACY"
|
|
|
|
def test_parse_total_count(self):
|
|
result = _parse_response(self.SAMPLE_RESPONSE)
|
|
assert result["total_devices_found"] == 2
|
|
|
|
def test_parse_node_count(self):
|
|
result = _parse_response(self.SAMPLE_RESPONSE)
|
|
assert len(result["cm_nodes"]) == 1
|
|
assert result["cm_nodes"][0]["name"] == "cucm-pub"
|
|
|
|
def test_parse_device_fields(self):
|
|
result = _parse_response(self.SAMPLE_RESPONSE)
|
|
devices = result["cm_nodes"][0]["devices"]
|
|
assert len(devices) == 2
|
|
|
|
first = devices[0]
|
|
assert first["name"] == "SEP1234567890AB"
|
|
assert first["status"] == "Registered"
|
|
assert first["dir_number"] == "2001"
|
|
assert first["description"] == "Patient Room 101"
|
|
assert first["protocol"] == "SIP"
|
|
# Nested IP extraction
|
|
assert first["ip_address"] == "10.0.0.5"
|
|
|
|
def test_parse_unregistered_device(self):
|
|
result = _parse_response(self.SAMPLE_RESPONSE)
|
|
second = result["cm_nodes"][0]["devices"][1]
|
|
assert second["status"] == "UnRegistered"
|
|
assert second["ip_address"] == "" # missing IP renders empty
|
|
|
|
def test_parse_state_info_for_pagination(self):
|
|
result = _parse_response(self.SAMPLE_RESPONSE)
|
|
assert result["state_info"] == "" # last page
|
|
|
|
def test_parse_soap_fault_raises(self):
|
|
fault_response = """<?xml version="1.0"?>
|
|
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
|
|
<soapenv:Body>
|
|
<soapenv:Fault>
|
|
<faultstring>Authentication failed</faultstring>
|
|
</soapenv:Fault>
|
|
</soapenv:Body>
|
|
</soapenv:Envelope>"""
|
|
with pytest.raises(RuntimeError, match="Authentication failed"):
|
|
_parse_response(fault_response)
|
|
|
|
|
|
class TestStatusValidation:
|
|
def test_known_status_values(self):
|
|
assert "Registered" in DEVICE_STATUS_VALUES
|
|
assert "UnRegistered" in DEVICE_STATUS_VALUES
|
|
assert "PartiallyRegistered" in DEVICE_STATUS_VALUES
|
|
|
|
def test_select_cm_device_rejects_invalid_status(self):
|
|
client = RisPortClient()
|
|
# No env vars set, so _ensure_session would fail first;
|
|
# but the validation should run BEFORE that on bad input.
|
|
with pytest.raises(ValueError, match="status must be"):
|
|
client.select_cm_device(status="not-a-real-status")
|
|
|
|
|
|
class TestReadOnlyAllowlist:
|
|
"""The RisPort envelope builders all gate on _check_operation_allowed.
|
|
This is the equivalent of the AXL service proxy — the chokepoint that
|
|
blocks any future write-shaped operation from being assembled.
|
|
"""
|
|
|
|
def test_selectCmDevice_is_allowed(self):
|
|
# No raise — selectCmDevice is in the allowlist
|
|
_check_operation_allowed("selectCmDevice")
|
|
|
|
@pytest.mark.parametrize(
|
|
"operation",
|
|
[
|
|
"addCmDevice",
|
|
"removeCmDevice",
|
|
"resetDevice",
|
|
"restartDevice",
|
|
"applyCmDevice",
|
|
"executeSQLUpdate", # leakage from the AXL surface
|
|
],
|
|
)
|
|
def test_disallowed_operation_raises_in_check(self, operation):
|
|
with pytest.raises(ReadOnlyViolation, match=operation):
|
|
_check_operation_allowed(operation)
|
|
|
|
def test_allowlist_is_exactly_what_we_advertise(self):
|
|
# As with the AXL allowlist, widening this set is a deliberate
|
|
# decision that should be matched by an update to this test.
|
|
assert _ALLOWED_RISPORT_OPERATIONS == frozenset({"selectCmDevice"})
|