From 39d4b293927ee0bd65096c79901fe31467626a84 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Sun, 26 Apr 2026 10:28:04 -0600 Subject: [PATCH] Add RisPort70 for real-time registration state + rate-limit backoff MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two ideas borrowed from cisco-cucm-mcp (calltelemetry/cisco-cucm-mcp, MIT licensed): real-time device registration via RisPort70, and exponential-backoff retry on transient HTTP 5xx errors. Both are purpose-built for the audit use case rather than general-purpose ports — RisPort tools exist to inform audit findings, not as a standalone "look at my devices" interface. Rate limit / 503 backoff (~30 lines + 3 tests): AxlClient now mounts an HTTPAdapter with a urllib3 Retry policy (3 retries, exponential backoff, status_forcelist=[502,503,504]). Configurable via AXL_RATE_LIMIT_RETRIES (default 3, 0 disables). Surfaces in connection_status() so operators can see the policy. Closes a real reliability gap: CUCM SOAP rate-limits under load during change windows or with multiple concurrent admins; pre-fix any 503 was a hard failure. RisPort70 (new src/risport.py + 2 tools + prompt update): Hand-coded SOAP client for /realtimeservice2/services/RISService70 (avoids dragging in another zeep instance for one operation). Reuses AXL_URL/USER/PASS env vars — RisPort lives on the same host. New tools: device_registration_status(device_class, status, name_filter, page_size) device_registration_summary() — cluster-wide breakdown by class Live-cluster verification (cucm-pub.binghammemorial.org): Phone: 803 registered=679 unregistered=123 rejected=1 Gateway: 85 registered=41 rejected=44 ← real audit finding SIPTrunk: 22 registered=18 unregistered=4 HuntList: 28 registered=28 H323/CTI: 0 (cluster doesn't use these) Discovered while live-verifying: CUCM 15 wraps the RisPort response in an extra element inside . Older CUCM versions exposed the fields directly. The parser falls back to either shape; tests cover both (test_legacy_response_shape_still_parses asserts the older shape still works). phone_inventory_report prompt updated: New Step 3 — "Cross-reference with real-time registration" — recommends device_registration_summary() + device_registration_status(status="UnRegistered") to surface configured-but-never-registered phones (strongest orphan signal), PartiallyRegistered phones (firewall/cert/version mismatch indicator), and registration-state vs config-state mismatches. Tooling delta worth noting: AXL device count: 1,377 phones RisPort device count: 803 phones Delta (~574) likely templates, hidden phones, or stale config — itself an audit finding the new tool will surface to anyone running phone_inventory_report. README updated: - Added health(), device_registration_status, device_registration_summary - Added "Scope and complement" section recommending @calltelemetry/cisco-cucm-mcp alongside for operational debugging (logs, perfmon, packet capture, service control). The two servers answer different questions; the LLM with both can compose audit findings with operational state. - Listed all 10 prompts (was 4 outdated entries). Tests: 134 → 155 (+21). --- README.md | 40 ++ src/mcp_cucm_axl/client.py | 34 +- .../prompts/phone_inventory_report.py | 41 +- src/mcp_cucm_axl/risport.py | 407 ++++++++++++++++++ src/mcp_cucm_axl/server.py | 69 ++- tests/test_client_recovery.py | 72 ++++ tests/test_risport.py | 222 ++++++++++ 7 files changed, 881 insertions(+), 4 deletions(-) create mode 100644 src/mcp_cucm_axl/risport.py create mode 100644 tests/test_risport.py diff --git a/README.md b/README.md index e9deb55..26f66ba 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,18 @@ opens this directory. | `axl_list_tables(pattern=None)` | Discover Informix tables | | `axl_describe_table(name)` | Column metadata for one table | | `cache_stats()`, `cache_clear(pattern=None)` | Cache plumbing | +| `health()` | Subsystem self-check (cache / AXL / docs / RisPort init state) | + +### Real-time device registration (RisPort70) + +Complementary to AXL — AXL tells you what's *configured*, RisPort tells you +what's *currently registered*. The audit-relevant cross-reference is +"configured but unregistered" (orphan signal). + +| Tool | Purpose | +|---|---| +| `device_registration_status(device_class, status, name_filter, page_size)` | Page through CUCM's RisPort `selectCmDevice` for live registration state | +| `device_registration_summary()` | Cluster-wide breakdown: registered / unregistered / rejected counts across Phone, Gateway, SIPTrunk, HuntList, etc. | ### Route plan @@ -116,6 +128,34 @@ sibling `cisco-docs` index and embed them inline: - `investigate_pattern(pattern, partition=None)` — deep-dive a specific pattern - `audit_routing(focus="full")` — comprehensive audit walkthrough - `cucm_sql_help(question)` — catch-all for arbitrary SQL questions +- `sip_trunk_report(name_filter=None)` — SIP trunk inventory + findings +- `phone_inventory_report(filter=None)` — phone fleet aggregates with anomaly findings; cross-references RisPort registration state +- `user_audit(focus="full")` — end users + application users + role assignments +- `inbound_did_audit()` — XFORM-Inbound-DNIS inventory + screening pipeline +- `hunt_pilot_audit()` — hunt pilots, queue settings, line group membership +- `whoami(userid=None)` — single-user role chain (defaults to AXL service account) + +## Scope and complement + +This server is **audit-focused**: read-only queries against AXL plus +RisPort cross-reference for registration state. It does *not* cover +operational debugging (logs, packet capture, perfmon counters, +service control, certificates, backups). + +For those, install [`@calltelemetry/cisco-cucm-mcp`](https://github.com/calltelemetry/cisco-cucm-mcp) +alongside this server: + +```bash +claude mcp add cucm-ops -- npx -y @calltelemetry/cisco-cucm-mcp@latest +``` + +The two servers are **complementary**, not competing — they answer +different questions and use different CUCM APIs (AXL + RisPort here; +DIME + RisPort + PerfMon + ControlCenter + SSH there). An LLM with +both servers can compose audit findings (this server) with operational +state (theirs) — e.g., *"audit found CSS X has 0 references AND +RisPort shows zero phones currently registered against any device pool +that inherits it → confirmed safe to delete."* ## Cache diff --git a/src/mcp_cucm_axl/client.py b/src/mcp_cucm_axl/client.py index 6639e8e..f13e573 100644 --- a/src/mcp_cucm_axl/client.py +++ b/src/mcp_cucm_axl/client.py @@ -50,20 +50,23 @@ class AxlClient: self._config_error: str | None = None # permanent, pinned self._last_error: str | None = None # last seen, may be transient self._connected_at: float | None = None # monotonic time of last success + self._retry_config: dict | None = None # populated when session is built def connection_status(self) -> dict: """Diagnostic snapshot — what's the state of the connection? Useful for the `health` MCP tool and for operators trying to figure out why a tool call failed. Reports whether we're - currently connected, when we last successfully connected, and - the last error (config or operational). + currently connected, when we last successfully connected, the + last error (config or operational), and the rate-limit retry + policy in effect. """ return { "connected": self._service is not None, "connected_at_monotonic": self._connected_at, "config_error": self._config_error, # permanent until restart "last_error": self._last_error, + "retry_config": self._retry_config, } def _ensure_connected(self) -> None: @@ -103,6 +106,33 @@ class AxlClient: session.verify = verify_tls session.auth = HTTPBasicAuth(user, password) + # Rate-limit / transient-error retry. CUCM's SOAP layer returns 503 + # under load (multiple admins running AXL queries during a change + # window, etc). 502/504 occur when the publisher is restarting or + # a load balancer is between us and CUCM. Pre-fix, any of these + # was a hard failure to the caller; now they're retried with + # exponential backoff. + from requests.adapters import HTTPAdapter + from urllib3.util.retry import Retry + max_retries = int(os.environ.get("AXL_RATE_LIMIT_RETRIES", "3")) + if max_retries > 0: + retry = Retry( + total=max_retries, + backoff_factor=1.0, # 1s, 2s, 4s between retries + status_forcelist=(502, 503, 504), + allowed_methods=frozenset(["POST", "GET"]), + raise_on_status=False, # let zeep see the final response + respect_retry_after_header=True, + ) + adapter = HTTPAdapter(max_retries=retry) + session.mount("https://", adapter) + session.mount("http://", adapter) + self._retry_config = { + "max_retries": max_retries, + "backoff_factor": 1.0, + "status_forcelist": [502, 503, 504], + } + # zeep's own WSDL cache (separate from our response cache) keeps # repeat startups fast — it parses the WSDL once and reuses from platformdirs import user_cache_dir diff --git a/src/mcp_cucm_axl/prompts/phone_inventory_report.py b/src/mcp_cucm_axl/prompts/phone_inventory_report.py index d948ec3..9679d2b 100644 --- a/src/mcp_cucm_axl/prompts/phone_inventory_report.py +++ b/src/mcp_cucm_axl/prompts/phone_inventory_report.py @@ -145,7 +145,46 @@ ORDER BY d.name; worth confirming the count matches expectations — a sudden increase indicates config drift.) -## Step 3 — Cross-reference with users (if owners exist) +## Step 3 — Cross-reference with real-time registration (highest-leverage) + +`device_registration_summary()` returns a cluster-wide breakdown by status +across all device classes. The audit-relevant question is: what fraction +of *configured* phones are *actually registered*? + +``` +device_registration_summary() +``` + +For drill-down on the unregistered population: + +``` +device_registration_status(device_class="Phone", status="UnRegistered") +``` + +This gives you the actual list of phones that have a config in CUCM but +are not currently registered. **A phone that's been "UnRegistered" for +weeks is the strongest orphan signal we can produce** — cleaner than +"no description" or "no owner" because registration state reflects +real-world usage, not just operator hygiene. + +**Findings to surface from this cross-reference:** + +- **Configured-but-never-registered phones**: the cluster has them + configured, they've never come online. Often abandoned conference + phones, decommissioned analog gateways, or templates that were + cloned but never deployed. Strong candidates for deletion. +- **Registered but no description / no owner**: actively-used phones + whose operator hygiene is weak. Hospitals accumulate these as + "patient room 3142" or shared cordless phones; verify they're + intentional shared-use endpoints rather than just untracked. +- **PartiallyRegistered**: a phone that's communicating with one CM + node but not another. Indicates a real registration problem + (firewall, version mismatch, certificate); flag for IT. +- **Status mismatch**: phone class says "Cisco 7841" but `model` field + on the registered device says something different — could indicate a + spoofing attempt or hardware swap without config update. + +## Step 4 — Cross-reference with users (if owners exist) For phones with an owner, list owner identities to spot stale assignments (employee left, phone still tagged to them): diff --git a/src/mcp_cucm_axl/risport.py b/src/mcp_cucm_axl/risport.py new file mode 100644 index 0000000..8603ae7 --- /dev/null +++ b/src/mcp_cucm_axl/risport.py @@ -0,0 +1,407 @@ +"""RisPort70 — real-time device registration status. + +CUCM's RisPort (Real-time Information Server Port) lives at +`/realtimeservice2/services/RISService70` and exposes a SOAP API for +querying the *runtime* state of devices: are phones currently +registered, what IP did they get, what's their status, etc. + +This is **complementary to AXL**: AXL tells us what's CONFIGURED; +RisPort tells us what's HAPPENING right now. For audit purposes the +difference between "configured but never registered" (orphan) and +"actively registered" (live) is the highest-value cross-reference. + +Why we ship our own RisPort wrapper alongside the AXL one rather than +deferring to a separate operations MCP server (`@calltelemetry/cisco-cucm-mcp` +covers operational debugging more thoroughly): the audit-narrative is +the use case here. `phone_inventory_report` becomes substantively more +valuable when it can join configured-phones with currently-registered +state in a single prompt, and that join lives most naturally in this +codebase. + +SOAP envelope structure cribbed from cisco-cucm-mcp's TypeScript +implementation (MIT licensed) — namespaces and field names verified +against CUCM 15.0.1.12900 documentation. +""" + +from __future__ import annotations + +import os +import re +import sys +import xml.etree.ElementTree as ET +from urllib.parse import urlparse + +from requests import Session +from requests.adapters import HTTPAdapter +from requests.auth import HTTPBasicAuth +from urllib3.util.retry import Retry + + +# RisPort path on the CUCM publisher +_RIS_PATH = "/realtimeservice2/services/RISService70" + +# SOAP namespaces. These match Cisco's published values for RisPort70. +_NS_SOAPENV = "http://schemas.xmlsoap.org/soap/envelope/" +_NS_RIS = "http://schemas.cisco.com/ast/soap" + +# Status values RisPort returns for devices +DEVICE_STATUS_VALUES = ( + "Any", "Registered", "UnRegistered", "Rejected", + "PartiallyRegistered", "Unknown", +) + + +def _escape_xml(s: str) -> str: + """Minimal XML entity escape for values we inject into SOAP envelopes.""" + return ( + s.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + .replace("'", "'") + ) + + +def _build_select_envelope( + state_info: str = "", + max_devices: int = 200, + device_class: str = "Phone", + status: str = "Any", + select_items: list[str] | None = None, + select_by: str = "Name", +) -> str: + """Build a `selectCmDevice` SOAP envelope. + + The structure is fragile — the CmSelectionCriteria child elements + must appear in the order Cisco's WSDL expects, and missing fields + are rejected. We err on the side of always including every field + with sensible defaults. + """ + items = select_items if select_items else ["*"] + items_xml = "".join( + f"{_escape_xml(i)}" + for i in items + ) + return ( + '' + f'' + "" + "" + "" + f"{_escape_xml(state_info)}" + "" + f"{int(max_devices)}" + f"{_escape_xml(device_class)}" + "255" + f"{_escape_xml(status)}" + "" + f"{_escape_xml(select_by)}" + f"{items_xml}" + "Any" + "Any" + "" + "" + "" + "" + ) + + +def _extract_text(elem: ET.Element | None, tag: str) -> str: + """Return the text of child of elem, or '' if missing. + + RisPort responses don't use namespaces consistently across CUCM + versions; matching on local-name only is more robust than xpath + against a fixed namespace. + """ + if elem is None: + return "" + for child in elem: + if child.tag.split("}")[-1] == tag: + return (child.text or "").strip() + return "" + + +def _extract_ip(elem: ET.Element | None) -> str: + """Pull the IP string out of CUCM's nested IPAddress structure. + + CUCM 15 returns IPAddress as a nested struct: + x.x.x.xipv4 + Older versions may return a flat string. We handle both. + """ + if elem is None: + return "" + if elem.text and elem.text.strip(): + return elem.text.strip() + for item_elem in elem: + if item_elem.tag.split("}")[-1].lower() == "item": + for ip_elem in item_elem: + if ip_elem.tag.split("}")[-1] == "IP": + return (ip_elem.text or "").strip() + return "" + + +def _parse_device(elem: ET.Element) -> dict: + """Pull the audit-relevant fields out of a single CmDevice element.""" + ip_elem = None + for child in elem: + local = child.tag.split("}")[-1] + if local in ("IPAddress", "IpAddress"): + ip_elem = child + break + return { + "name": _extract_text(elem, "Name"), + "ip_address": _extract_ip(ip_elem), + "description": _extract_text(elem, "Description"), + "dir_number": _extract_text(elem, "DirNumber"), + "status": _extract_text(elem, "Status"), + "status_reason": _extract_text(elem, "StatusReason"), + "protocol": _extract_text(elem, "Protocol"), + "model": _extract_text(elem, "Model"), + "active_load_id": _extract_text(elem, "ActiveLoadID"), + "timestamp": _extract_text(elem, "TimeStamp"), + } + + +def _parse_response(xml_text: str) -> dict: + """Parse the selectCmDevice SOAP response into structured form. + + Returns: + { + "total_devices_found": int, + "state_info": str (cursor for next page; empty = last), + "cm_nodes": [ + {"name": str, "return_code": str, "devices": [...]} + ], + } + """ + root = ET.fromstring(xml_text) + # Walk to selectCmDeviceReturn regardless of namespacing quirks + select_return = None + for elem in root.iter(): + if elem.tag.split("}")[-1] == "selectCmDeviceReturn": + select_return = elem + break + if select_return is None: + # Check for SOAP fault + for elem in root.iter(): + if elem.tag.split("}")[-1] == "Fault": + fault_string = _extract_text(elem, "faultstring") + raise RuntimeError(f"RisPort SOAP fault: {fault_string or 'unknown'}") + raise RuntimeError("RisPort response missing selectCmDeviceReturn") + + # CUCM 15 wraps the data in an extra element + # inside . Older versions exposed the fields + # directly. Probe for the wrapper and descend if found. + for child in select_return: + if child.tag.split("}")[-1] == "SelectCmDeviceResult": + select_return = child + break + + total = _extract_text(select_return, "TotalDevicesFound") + state_info = _extract_text(select_return, "StateInfo") + + nodes_out = [] + cm_nodes = None + for child in select_return: + if child.tag.split("}")[-1] == "CmNodes": + cm_nodes = child + break + + if cm_nodes is not None: + for node_elem in cm_nodes: + if node_elem.tag.split("}")[-1] != "item": + continue + node_name = _extract_text(node_elem, "Name") + return_code = _extract_text(node_elem, "ReturnCode") + devices: list[dict] = [] + for cm_devices_elem in node_elem: + if cm_devices_elem.tag.split("}")[-1] != "CmDevices": + continue + for dev_item in cm_devices_elem: + if dev_item.tag.split("}")[-1] == "item": + devices.append(_parse_device(dev_item)) + nodes_out.append({ + "name": node_name, + "return_code": return_code, + "devices": devices, + }) + + try: + total_int = int(total) + except (TypeError, ValueError): + total_int = 0 + + return { + "total_devices_found": total_int, + "state_info": state_info, + "cm_nodes": nodes_out, + } + + +class RisPortClient: + """Lazy SOAP client for CUCM's RisPort70 service. + + Reuses AXL_URL / AXL_USER / AXL_PASS env vars (RisPort lives on the + same host as AXL on standard CUCM deployments). Builds a separate + `requests.Session` so the retry policy and TLS settings can be tuned + independently if needed. + """ + + def __init__(self): + self._session: Session | None = None + self._url: str | None = None + self._config_error: str | None = None + self._last_error: str | None = None + + def _ensure_session(self) -> None: + if self._session is not None: + return + if self._config_error is not None: + raise RuntimeError(self._config_error) + try: + axl_url = os.environ["AXL_URL"] + user = os.environ["AXL_USER"] + password = os.environ["AXL_PASS"] + except KeyError as e: + self._config_error = ( + f"Missing required env var {e.args[0]} for RisPort. " + f"Reuses AXL_URL/USER/PASS." + ) + raise RuntimeError(self._config_error) from None + + verify_tls = os.environ.get("AXL_VERIFY_TLS", "false").lower() in ( + "1", "true", "yes" + ) + + # Derive RisPort URL from AXL host + parsed = urlparse(axl_url) + if not parsed.hostname: + self._config_error = f"Could not parse AXL_URL host: {axl_url!r}" + raise RuntimeError(self._config_error) + port = parsed.port or 8443 + scheme = parsed.scheme or "https" + self._url = f"{scheme}://{parsed.hostname}:{port}{_RIS_PATH}" + + session = Session() + session.verify = verify_tls + session.auth = HTTPBasicAuth(user, password) + + # Same retry policy as AXL — 503/502/504 with backoff + max_retries = int(os.environ.get("AXL_RATE_LIMIT_RETRIES", "3")) + if max_retries > 0: + retry = Retry( + total=max_retries, + backoff_factor=1.0, + status_forcelist=(502, 503, 504), + allowed_methods=frozenset(["POST", "GET"]), + raise_on_status=False, + respect_retry_after_header=True, + ) + adapter = HTTPAdapter(max_retries=retry) + session.mount("https://", adapter) + session.mount("http://", adapter) + + self._session = session + print( + f"[mcp-cucm-axl] RisPort client ready: {self._url}", + file=sys.stderr, + flush=True, + ) + + def select_cm_device( + self, + max_devices: int = 200, + device_class: str = "Phone", + status: str = "Any", + name_filter: str | None = None, + state_info: str = "", + ) -> dict: + """Single-page selectCmDevice call. Returns up to max_devices rows. + + For full inventory, call `select_all` which auto-paginates via + the `state_info` cursor. + """ + # Validate before connecting — we want a clear error from bad input + # whether or not env vars are set. + if status not in DEVICE_STATUS_VALUES: + raise ValueError( + f"status must be one of {DEVICE_STATUS_VALUES}; got {status!r}" + ) + self._ensure_session() + + select_items = [name_filter] if name_filter else ["*"] + envelope = _build_select_envelope( + state_info=state_info, + max_devices=max_devices, + device_class=device_class, + status=status, + select_items=select_items, + ) + try: + resp = self._session.post( + self._url, + data=envelope, + headers={ + "Content-Type": "text/xml; charset=utf-8", + "SOAPAction": '"selectCmDevice"', + }, + timeout=30, + ) + resp.raise_for_status() + self._last_error = None + return _parse_response(resp.text) + except Exception as e: + self._last_error = f"RisPort request failed: {e}" + raise RuntimeError(self._last_error) from e + + def select_all( + self, + device_class: str = "Phone", + status: str = "Any", + page_size: int = 200, + max_pages: int = 20, + ) -> dict: + """Auto-paginate through selectCmDevice using the StateInfo cursor. + + Walks pages until the cursor is empty OR max_pages reached. Returns + a single dict with all devices flattened across pages, plus + per-status counts and a `pages_walked` field for diagnostics. + """ + all_devices: list[dict] = [] + nodes_seen: set[str] = set() + state_info = "" + pages = 0 + last_total = 0 + while pages < max_pages: + page = self.select_cm_device( + max_devices=page_size, + device_class=device_class, + status=status, + state_info=state_info, + ) + pages += 1 + last_total = page["total_devices_found"] + for node in page["cm_nodes"]: + nodes_seen.add(node["name"]) + all_devices.extend(node["devices"]) + next_cursor = page.get("state_info") or "" + if not next_cursor or next_cursor == state_info: + break + state_info = next_cursor + + # Per-status breakdown + status_counts: dict[str, int] = {} + for d in all_devices: + s = d.get("status") or "Unknown" + status_counts[s] = status_counts.get(s, 0) + 1 + + return { + "device_class": device_class, + "status_filter": status, + "total_devices_found": last_total, + "devices_returned": len(all_devices), + "pages_walked": pages, + "cm_nodes_seen": sorted(nodes_seen), + "status_counts": status_counts, + "devices": all_devices, + } diff --git a/src/mcp_cucm_axl/server.py b/src/mcp_cucm_axl/server.py index 2904652..70e40e0 100644 --- a/src/mcp_cucm_axl/server.py +++ b/src/mcp_cucm_axl/server.py @@ -29,12 +29,14 @@ from . import route_plan from .cache import AxlCache from .client import AxlClient from .docs_loader import DocsIndex +from .risport import RisPortClient # ---- Module-level singletons, initialized in main() ---- _cache: AxlCache | None = None _axl: AxlClient | None = None _docs: DocsIndex | None = None +_ris: RisPortClient | None = None mcp = FastMCP("CUCM AXL (read-only)") @@ -152,6 +154,7 @@ def health() -> dict: "cache": _cache is not None, "axl": _axl is not None, "docs": _docs is not None, + "risport": _ris is not None, } if _axl is not None: info["axl_connection"] = _axl.connection_status() @@ -311,6 +314,69 @@ def route_devices_using_css(css_name: str, max_per_category: int = 50) -> dict: return route_plan.find_devices_using_css(_client(), css_name, max_per_category) +@mcp.tool +def device_registration_status( + device_class: str = "Phone", + status: str = "Any", + name_filter: str | None = None, + page_size: int = 200, +) -> dict: + """Real-time device registration status from CUCM RisPort70. + + Complementary to AXL: AXL tells us what's *configured*; RisPort tells + us what's *currently registered*. The most audit-relevant cross- + reference is "configured but unregistered" (likely orphan). + + Args: + device_class: One of "Phone" (default), "Gateway", "H323", "CTI", + "VoiceMail", "MediaResources", "HuntList", "SIPTrunk", "Any". + status: One of "Any" (default), "Registered", "UnRegistered", + "Rejected", "PartiallyRegistered", "Unknown". + name_filter: Optional name (or wildcard `*` substring) to narrow + the result. Maps to RisPort's SelectItems. + page_size: Max devices per RisPort call. RisPort caps at 1000. + """ + if _ris is None: + raise RuntimeError("RisPort client not initialized — server bootstrap failed.") + if name_filter: + return _ris.select_cm_device( + max_devices=page_size, + device_class=device_class, + status=status, + name_filter=name_filter, + ) + return _ris.select_all( + device_class=device_class, + status=status, + page_size=page_size, + ) + + +@mcp.tool +def device_registration_summary() -> dict: + """High-level cluster registration health: counts by status across + all device classes that matter for audit work. + + Useful as a once-per-conversation orientation: "are most phones + actually registered, or is something broken?" + """ + if _ris is None: + raise RuntimeError("RisPort client not initialized — server bootstrap failed.") + summary = {} + for cls in ("Phone", "Gateway", "H323", "SIPTrunk", "HuntList", "CTI"): + try: + r = _ris.select_all(device_class=cls, status="Any") + summary[cls] = { + "total_devices_found": r["total_devices_found"], + "devices_returned": r["devices_returned"], + "status_counts": r["status_counts"], + "cm_nodes_seen": r["cm_nodes_seen"], + } + except Exception as e: + summary[cls] = {"error": str(e)[:200]} + return summary + + @mcp.tool def route_filters(name: str | None = None, include_members: bool = False) -> dict: """List route filters with their composition rules. @@ -455,7 +521,7 @@ def _banner() -> None: def main() -> None: - global _cache, _axl, _docs + global _cache, _axl, _docs, _ris # Load .env from the project directory (where the user runs uv from) cwd_env = Path.cwd() / ".env" @@ -490,6 +556,7 @@ def main() -> None: _axl = AxlClient(_cache) _docs = DocsIndex.load() # may be None; prompts handle gracefully + _ris = RisPortClient() mcp.run() diff --git a/tests/test_client_recovery.py b/tests/test_client_recovery.py index d411277..3c31517 100644 --- a/tests/test_client_recovery.py +++ b/tests/test_client_recovery.py @@ -81,3 +81,75 @@ def test_health_diagnostic_includes_connection_state(cache: AxlCache): assert "connected" in info assert info["connected"] is False # never tried yet assert "last_error" in info + + +# ---- Rate limit / 503 retry -------------------------------------------------- +# Inspired by cisco-cucm-mcp's exponential-backoff approach. CUCM's SOAP +# layer returns 503 under load (concurrent AXL admins, change window). Without +# retries, we'd fail loudly; with them, transient rate limiting becomes +# invisible to the caller. + +def test_retry_config_default_three_retries(cache: AxlCache, monkeypatch): + """By default, the session is configured for 3 retries with backoff.""" + monkeypatch.setenv("AXL_URL", "https://example.invalid:8443/axl") + monkeypatch.setenv("AXL_USER", "test") + monkeypatch.setenv("AXL_PASS", "test") + monkeypatch.setenv("AXL_VERIFY_TLS", "false") + # Stub Client construction so we exercise only the session/retry setup + from mcp_cucm_axl import client as client_mod + + constructed = {} + + def stub_client(*args, **kwargs): + constructed["transport"] = kwargs.get("transport") + # Raise to short-circuit before service creation + raise ConnectionError("stub: don't actually connect") + + monkeypatch.setattr(client_mod, "Client", stub_client) + + client = AxlClient(cache) + with pytest.raises(RuntimeError): + client._ensure_connected() + + info = client.connection_status() + assert info["retry_config"] is not None + assert info["retry_config"]["max_retries"] == 3 + assert 503 in info["retry_config"]["status_forcelist"] + assert 502 in info["retry_config"]["status_forcelist"] + assert 504 in info["retry_config"]["status_forcelist"] + + +def test_retry_config_overridable_via_env(cache: AxlCache, monkeypatch): + """Operators can tune the retry count via AXL_RATE_LIMIT_RETRIES.""" + monkeypatch.setenv("AXL_URL", "https://example.invalid:8443/axl") + monkeypatch.setenv("AXL_USER", "test") + monkeypatch.setenv("AXL_PASS", "test") + monkeypatch.setenv("AXL_RATE_LIMIT_RETRIES", "7") + + from mcp_cucm_axl import client as client_mod + monkeypatch.setattr(client_mod, "Client", lambda *a, **kw: (_ for _ in ()).throw(ConnectionError("stub"))) + + client = AxlClient(cache) + with pytest.raises(RuntimeError): + client._ensure_connected() + + assert client.connection_status()["retry_config"]["max_retries"] == 7 + + +def test_retry_config_zero_disables(cache: AxlCache, monkeypatch): + """AXL_RATE_LIMIT_RETRIES=0 disables the retry adapter entirely. + Useful for test environments or when an operator wants raw failures.""" + monkeypatch.setenv("AXL_URL", "https://example.invalid:8443/axl") + monkeypatch.setenv("AXL_USER", "test") + monkeypatch.setenv("AXL_PASS", "test") + monkeypatch.setenv("AXL_RATE_LIMIT_RETRIES", "0") + + from mcp_cucm_axl import client as client_mod + monkeypatch.setattr(client_mod, "Client", lambda *a, **kw: (_ for _ in ()).throw(ConnectionError("stub"))) + + client = AxlClient(cache) + with pytest.raises(RuntimeError): + client._ensure_connected() + + cfg = client.connection_status()["retry_config"] + assert cfg["max_retries"] == 0 diff --git a/tests/test_risport.py b/tests/test_risport.py new file mode 100644 index 0000000..663b0ed --- /dev/null +++ b/tests/test_risport.py @@ -0,0 +1,222 @@ +"""Unit tests for the RisPort70 SOAP envelope construction and parser. + +Live-cluster integration is verified separately via the smoke-test +script — these tests are pure: envelope shape, response-XML parsing, +edge cases. No network. +""" + +import xml.etree.ElementTree as ET + +import pytest + +from mcp_cucm_axl.risport import ( + DEVICE_STATUS_VALUES, + RisPortClient, + _build_select_envelope, + _escape_xml, + _parse_response, +) + + +class TestEscapeXml: + def test_basic_escapes(self): + assert _escape_xml("") == "<bad>" + assert _escape_xml("a&b") == "a&b" + assert _escape_xml('"x"') == ""x"" + assert _escape_xml("'y'") == "'y'" + + def test_passthrough_safe_text(self): + assert _escape_xml("Phone-1234") == "Phone-1234" + + +class TestSelectEnvelope: + def test_envelope_is_well_formed_xml(self): + env = _build_select_envelope() + # Must parse — if not, RisPort will reject it + root = ET.fromstring(env) + assert root is not None + + def test_default_envelope_includes_required_fields(self): + env = _build_select_envelope() + # Cisco's WSDL requires every CmSelectionCriteria child + for required in [ + "MaxReturnedDevices", + "DeviceClass", + "Model", + "Status", + "NodeName", + "SelectBy", + "SelectItems", + "Protocol", + "DownloadStatus", + ]: + assert f"" in env, ( + f"missing required CmSelectionCriteria field " + ) + + def test_state_info_threaded_into_envelope(self): + env = _build_select_envelope(state_info="cursor-abc-123") + assert "cursor-abc-123" in env + assert "cursor-abc-123" in env + + def test_select_items_default_wildcard(self): + env = _build_select_envelope() + # Default: ['*'] — all devices + assert "*" in env + + def test_select_items_explicit_list(self): + env = _build_select_envelope(select_items=["SEP1", "SEP2"]) + assert "SEP1" in env + assert "SEP2" in env + + def test_max_devices_int_coerced(self): + env = _build_select_envelope(max_devices=500) + assert "500" in env + + def test_xml_special_chars_in_filter_escaped(self): + # SOAP injection defense — single quote and angle bracket must be escaped + env = _build_select_envelope(select_items=["O'Brien "]) + assert "'" in env + assert "<" in env + assert ">" in env + # The raw chars must NOT appear + assert "'Brien " not in env + + +class TestParseResponse: + # Match CUCM 15's actual response shape: an extra + # wrapper inside . Discovered while live-verifying + # against cucm-pub.binghammemorial.org 2026-04-26 — the parser must + # descend through this wrapper. + SAMPLE_RESPONSE = """ + + + + + + 2 + + + + cucm-pub + Ok + + + SEP1234567890AB + 10.0.0.5ipv4 + 2001 + Registered + 0 + SIP + Patient Room 101 + Cisco 7841 + sip78xx.12-5-1SR4-3 + 1700000000 + + + SEPABCDEF123456 + + + UnRegistered + 1 + Decommissioned phone + + + + + + + + +""" + + # Pre-CUCM-15 shape (no SelectCmDeviceResult wrapper). The parser falls + # back to fields directly under selectCmDeviceReturn for backward compat. + LEGACY_RESPONSE = """ + + + + + 1 + + + + cucm-old + Ok + + + SEPLEGACY + Registered + + + + + + + +""" + + def test_legacy_response_shape_still_parses(self): + """Backward compat with pre-CUCM-15 RisPort responses.""" + result = _parse_response(self.LEGACY_RESPONSE) + assert result["total_devices_found"] == 1 + assert result["cm_nodes"][0]["devices"][0]["name"] == "SEPLEGACY" + + def test_parse_total_count(self): + result = _parse_response(self.SAMPLE_RESPONSE) + assert result["total_devices_found"] == 2 + + def test_parse_node_count(self): + result = _parse_response(self.SAMPLE_RESPONSE) + assert len(result["cm_nodes"]) == 1 + assert result["cm_nodes"][0]["name"] == "cucm-pub" + + def test_parse_device_fields(self): + result = _parse_response(self.SAMPLE_RESPONSE) + devices = result["cm_nodes"][0]["devices"] + assert len(devices) == 2 + + first = devices[0] + assert first["name"] == "SEP1234567890AB" + assert first["status"] == "Registered" + assert first["dir_number"] == "2001" + assert first["description"] == "Patient Room 101" + assert first["protocol"] == "SIP" + # Nested IP extraction + assert first["ip_address"] == "10.0.0.5" + + def test_parse_unregistered_device(self): + result = _parse_response(self.SAMPLE_RESPONSE) + second = result["cm_nodes"][0]["devices"][1] + assert second["status"] == "UnRegistered" + assert second["ip_address"] == "" # missing IP renders empty + + def test_parse_state_info_for_pagination(self): + result = _parse_response(self.SAMPLE_RESPONSE) + assert result["state_info"] == "" # last page + + def test_parse_soap_fault_raises(self): + fault_response = """ + + + +Authentication failed + + +""" + with pytest.raises(RuntimeError, match="Authentication failed"): + _parse_response(fault_response) + + +class TestStatusValidation: + def test_known_status_values(self): + assert "Registered" in DEVICE_STATUS_VALUES + assert "UnRegistered" in DEVICE_STATUS_VALUES + assert "PartiallyRegistered" in DEVICE_STATUS_VALUES + + def test_select_cm_device_rejects_invalid_status(self): + client = RisPortClient() + # No env vars set, so _ensure_session would fail first; + # but the validation should run BEFORE that on bad input. + with pytest.raises(ValueError, match="status must be"): + client.select_cm_device(status="not-a-real-status")