Compare commits

..

3 Commits

Author SHA1 Message Date
3cf7dbc785 docs: qualify read-only as "against CUCM"; document local-cache exception
Drift between the docs ("every tool is read-only") and reality
(cache_clear mutates the local SQLite cache) is the bug being
addressed here. The code is fine — cache_clear has zero CUCM-side
effect — but the docs over-promised by not naming the local-cache
exception explicitly.

cache_clear docstring (server.py): now leads with "Local-only:
mutates the SQLite response cache ... Does NOT touch CUCM" with a
pointer to the explanation page.

reference/tools.md: read-only claim qualified as "against CUCM";
the two enforcement layers (sqlparse validator + allowlist proxy)
named explicitly; cache_clear flagged as the lone local-mutation
tool.

explanation/read-only-by-structure.md: validator section updated
with the full forbidden-keyword list, multi-statement detection,
and an explanation of how sqlparse fixes the regex blindspots.
New "Defense-in-depth: read-only allowlist proxy" section
describing _ReadOnlyServiceProxy and the parallel RisPort gate.
New "What read-only does NOT mean" section enumerating the
local-cache exception and the AXL_CACHE_TTL=0 opt-out for
read-only-filesystem deployments.
2026-04-29 06:38:52 -06:00
639d706200 client/risport: add read-only allowlist proxies (defense-in-depth)
Today, mcaxl is read-only against CUCM by *absence* — the tools
never call write methods. But absence isn't enforced: a future
contributor adding a tool could write
self._service.addRoutePartition(...) and zeep would happily
dispatch it. There's no positive guard.

Two new chokepoints close that gap:

AXL side — _ReadOnlyServiceProxy wraps the zeep service object.
__getattr__ refuses any method outside _ALLOWED_AXL_METHODS
(currently {getCCMVersion, executeSQLQuery}) with a new
ReadOnlyViolation exception, raised at attribute lookup BEFORE
zeep serializes a SOAP envelope. Underscore-prefixed and dunder
attributes pass through (zeep introspects via _binding_options,
__class__, etc., and those don't dispatch SOAP).

RisPort side — RisPort70 envelopes are hand-rolled, so the proxy
pattern doesn't apply directly. The equivalent chokepoint lives in
the envelope builders: _check_operation_allowed(name) is the first
line of every builder, and _ALLOWED_RISPORT_OPERATIONS is the
allowlist (currently {selectCmDevice}).

Operators can verify the proxy is active via the health tool —
connection_status() now reports read_only_proxy: true and
allowed_axl_methods: [...].

Tests:
- new tests/test_readonly_proxy.py (13 tests):
  * allowed methods dispatch through to inner service
  * 9 parameterized refusals (addRoutePartition, updatePhone,
    removeUser, applyPhone, resetPhone, restartPhone,
    executeSQLUpdate, doDeviceLogin, wipePhone)
  * allowlist drift detection (set must be exactly what we
    advertise — accidental widening fails red)
  * dunder + underscore-prefixed passthrough
- tests/test_risport.py: +TestReadOnlyAllowlist (7 tests):
  * selectCmDevice passes _check_operation_allowed
  * 6 parameterized refusals (addCmDevice, removeCmDevice,
    resetDevice, restartDevice, applyCmDevice, executeSQLUpdate)
  * allowlist drift detection

182 tests pass total (was 161; +13 proxy + 7 risport + 1 allowlist
drift catch).
2026-04-29 06:38:41 -06:00
59f9df5b3b sql_validator: swap regex for sqlparse tokenization
The regex-based validator worked for everything tested, but had a
class of structural blindspot: it didn't actually know what a token
was, so it accepted `SELECT 1; SELECT 2` (no forbidden keyword in
either statement) and relied entirely on the keyword scan catching
write verbs. With sqlparse we get:

- Explicit multi-statement detection via `len(sqlparse.parse(query))`
  — `SELECT 1; SELECT 2` is now refused with a clear "Multiple
  statements detected" message.
- Proper string/comment boundary handling — `'log: DROP detected'`
  is one Literal.String token; the DROP inside it never reaches the
  forbidden-keyword scan. `inserted_at` is one Name token; INSERT
  isn't matched as a substring.
- Same conservative behavior for keywords-as-identifiers (sqlparse
  is a lexer, not a parser, so `SELECT delete FROM device` is still
  refused — CUCM's data dictionary doesn't use SQL keywords as
  column names anyway).

Hamilton review CRITICAL #1 preserved: the cleaned query returned to
the caller is still byte-for-byte the input (modulo trailing ; and
outer whitespace). sqlparse is consulted for analysis only.

Tests: +6 sqlparse-specific cases in TestSqlparseSpecific covering
multi-statement, comment-disguised injection, keyword-substring
identifiers, and CTE walks. 2 existing tests broadened from
match="DROP" to match="DROP|Multiple" — same query refused, the
diagnosis just got more accurate (multi-statement caught earlier
than forbidden-keyword scan).

36/36 validator tests pass.
2026-04-29 06:38:21 -06:00
11 changed files with 445 additions and 51 deletions

View File

@ -47,13 +47,65 @@ The validator's rules:
1. After stripping comments and trimming whitespace, the query must 1. After stripping comments and trimming whitespace, the query must
start with `SELECT` or `WITH` (case-insensitive) start with `SELECT` or `WITH` (case-insensitive)
2. The query must not contain any of: `INSERT`, `UPDATE`, `DELETE`, 2. The query must not contain any of: `INSERT`, `UPDATE`, `DELETE`,
`DROP`, `CREATE`, `ALTER`, `TRUNCATE`, `EXEC`, `EXECUTE`, `CALL` `DROP`, `CREATE`, `ALTER`, `TRUNCATE`, `MERGE`, `REPLACE`, `RENAME`,
3. Trailing semicolons are stripped before submission `GRANT`, `REVOKE`, `EXEC`, `EXECUTE`, `CALL`, `ATTACH`, `DETACH`
3. Multi-statement input is rejected explicitly (the parser sees
`SELECT 1; SELECT 2` as two statements and refuses)
4. Trailing semicolons are stripped before submission
This catches the obvious cases. It is not a SQL parser, and it doesn't The validator uses the `sqlparse` tokenizer rather than regex, so
try to be — the structural read-only guarantee is the primary defense, string literals and comments are correctly bounded — a description
field containing the word `DELETE` doesn't trip the check, and a
column name like `inserted_at` isn't confused with the `INSERT`
keyword. The structural read-only guarantee is the primary defense;
the validator is the secondary defense. the validator is the secondary defense.
## Defense-in-depth: read-only allowlist proxy
`client.py` wraps the zeep AXL service in a `_ReadOnlyServiceProxy`
whose `__getattr__` refuses any method outside an explicit
allowlist (`getCCMVersion`, `executeSQLQuery`). If a future
contributor adds a tool that accidentally calls
`self._service.addRoutePartition(...)`, the proxy raises
`ReadOnlyViolation` at attribute lookup — before zeep ever
serializes a SOAP envelope.
The RisPort70 path has the parallel guard. RisPort doesn't go
through zeep — its envelopes are hand-rolled — so the allowlist
chokepoint lives in the envelope builders themselves: every
builder calls `_check_operation_allowed(name)` as its first
statement, and the only allowed operation today is `selectCmDevice`.
You can verify the proxy is active at runtime via the `health`
tool — it surfaces `axl_connection.read_only_proxy: true` and
`allowed_axl_methods: ["executeSQLQuery", "getCCMVersion"]`. If
either field is missing or false, something has gone wrong with
bootstrap.
## What read-only does NOT mean
"Read-only" in this server is precisely scoped: **read-only against
CUCM**. Specifically that means:
- No SOAP method outside the AXL/RisPort allowlists is dispatched
- No SQL outside `SELECT`/`WITH` reaches `executeSQLQuery`
- No tool call mutates cluster configuration, registration state,
or any other CUCM-side resource
It does **not** mean "no mutation anywhere." There is exactly one
local mutation point: the `cache_clear` tool deletes entries from
the SQLite response cache at
`~/.cache/mcaxl/responses/axl_responses.sqlite`. CUCM is unaware
this happened — the cache is a local optimization, not a CUCM
artefact. The operation is idempotent, contains no PII beyond what
the corresponding read-only query already returned, and is
reversible by simply waiting for the next tool call to repopulate
the relevant entries from AXL.
If you need *zero* local writes (e.g., when running mcaxl from a
read-only filesystem), set `AXL_CACHE_TTL=0` to disable the cache
entirely — `cache_clear` then has nothing to clear.
## Operational consequence: minimal-privilege service account ## Operational consequence: minimal-privilege service account
Because the server is structurally incapable of writes, the AXL service Because the server is structurally incapable of writes, the AXL service

View File

@ -10,10 +10,19 @@ sidebar:
(partitions, CSSs, patterns, lists, groups, transformations, filters), (partitions, CSSs, patterns, lists, groups, transformations, filters),
and **real-time registration** (RisPort70 device state). and **real-time registration** (RisPort70 device state).
Every tool is read-only. The server never registers AXL write methods — Every tool is read-only **against CUCM**. The server never registers AXL
no `executeSQLUpdate`, no `add*` / `update*` / `remove*` / `apply*` / write methods — no `executeSQLUpdate`, no `add*` / `update*` / `remove*`
`reset*` / `restart*`. See [Read-only by structure](/explanation/read-only-by-structure/) / `apply*` / `reset*` / `restart*`. As of 2026-04, this is enforced at
for the rationale. two layers: a `sqlparse`-based validator on `axl_sql` input that rejects
non-`SELECT`/`WITH` statements (and explicit multi-statement input), and
a runtime allowlist proxy around the AXL service that refuses any SOAP
method outside `{getCCMVersion, executeSQLQuery}`. RisPort70 has the
parallel allowlist in its envelope builder.
`cache_clear` is the **one** tool that mutates any state, and the state
it mutates is the **local SQLite response cache only** — CUCM is
unaware the call happened. See [Read-only by structure](/explanation/read-only-by-structure/)
for the rationale and what read-only does NOT mean.
## Foundational ## Foundational

View File

@ -31,6 +31,11 @@ dependencies = [
"platformdirs>=4.9", "platformdirs>=4.9",
"numpy>=1.26", "numpy>=1.26",
"python-dotenv>=1.0", "python-dotenv>=1.0",
# SQL tokenizer for the executeSQLQuery validator. Using a real lexer
# (instead of regex keyword scanning) gives us proper string/comment
# boundary handling and explicit multi-statement detection. See
# src/mcaxl/sql_validator.py for the read-only enforcement layer.
"sqlparse>=0.5",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View File

@ -34,6 +34,51 @@ class _ConfigError(RuntimeError):
""" """
class ReadOnlyViolation(RuntimeError):
"""Raised when code attempts to dispatch an AXL method outside the
read-only allowlist. This is a defense-in-depth guard: the SQL
validator already prevents write SQL, and the tools as-written never
call write methods, but the proxy ensures that a future contributor
accidentally calling `_service.addRoutePartition(...)` gets a refusal
at the boundary instead of a silent cluster mutation.
"""
# AXL methods this server is permitted to dispatch. Adding a new method
# here is a deliberate decision — every additional name widens the
# read-only surface and should be reviewed.
_ALLOWED_AXL_METHODS = frozenset({
"getCCMVersion",
"executeSQLQuery",
})
class _ReadOnlyServiceProxy:
"""Wraps a zeep service object. Refuses any method not in
_ALLOWED_AXL_METHODS, raising ReadOnlyViolation.
Non-method attributes (zeep internals like `_binding_options`) are
passed through untouched so the underlying client keeps working.
"""
def __init__(self, inner):
self._inner = inner
def __getattr__(self, name):
# Dunders and private attributes pass through — we're guarding
# against accidental SOAP dispatch, not introspection.
if name.startswith("_"):
return getattr(self._inner, name)
if name not in _ALLOWED_AXL_METHODS:
raise ReadOnlyViolation(
f"AXL method {name!r} is not in the read-only allowlist. "
f"Allowed: {sorted(_ALLOWED_AXL_METHODS)}. "
f"This server is structurally read-only — see "
f"docs/explanation/read-only-by-structure/ for context."
)
return getattr(self._inner, name)
class AxlClient: class AxlClient:
"""Lazy-loaded zeep client for CUCM AXL. """Lazy-loaded zeep client for CUCM AXL.
@ -67,6 +112,13 @@ class AxlClient:
"config_error": self._config_error, # permanent until restart "config_error": self._config_error, # permanent until restart
"last_error": self._last_error, "last_error": self._last_error,
"retry_config": self._retry_config, "retry_config": self._retry_config,
# Operators should be able to verify the read-only proxy is
# active without reading the source. True means writes will
# be refused at dispatch time even if some future tool tries.
"read_only_proxy": self._service is not None and isinstance(
self._service, _ReadOnlyServiceProxy
),
"allowed_axl_methods": sorted(_ALLOWED_AXL_METHODS),
} }
def _ensure_connected(self) -> None: def _ensure_connected(self) -> None:
@ -153,10 +205,15 @@ class AxlClient:
) )
# AXL endpoint is the AXL_URL itself; override the WSDL's default # AXL endpoint is the AXL_URL itself; override the WSDL's default
# service location which usually points at a placeholder host. # service location which usually points at a placeholder host.
self._service = self._client.create_service( # Wrap the resulting service in a read-only allowlist proxy —
# any SOAP method not in _ALLOWED_AXL_METHODS will raise
# ReadOnlyViolation at attribute lookup time, before zeep
# serializes a SOAP envelope.
raw_service = self._client.create_service(
"{http://www.cisco.com/AXLAPIService/}AXLAPIBinding", "{http://www.cisco.com/AXLAPIService/}AXLAPIBinding",
url, url,
) )
self._service = _ReadOnlyServiceProxy(raw_service)
import time as _time import time as _time
self._connected_at = _time.monotonic() self._connected_at = _time.monotonic()
self._last_error = None # operational state is now clean self._last_error = None # operational state is now clean

View File

@ -36,10 +36,35 @@ from requests.adapters import HTTPAdapter
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
from urllib3.util.retry import Retry from urllib3.util.retry import Retry
from .client import ReadOnlyViolation
# RisPort path on the CUCM publisher # RisPort path on the CUCM publisher
_RIS_PATH = "/realtimeservice2/services/RISService70" _RIS_PATH = "/realtimeservice2/services/RISService70"
# RisPort70 operations this server is permitted to invoke. Parallels the
# AXL allowlist in client.py — adding a new operation here is a deliberate
# decision that widens the read-only surface.
_ALLOWED_RISPORT_OPERATIONS = frozenset({
"selectCmDevice",
})
def _check_operation_allowed(operation: str) -> None:
"""Raise ReadOnlyViolation if `operation` is outside the allowlist.
Called at the top of each envelope builder. RisPort70 doesn't go
through zeep envelopes are hand-rolled so the proxy pattern
used for AXL doesn't apply directly. This function is the equivalent
chokepoint: any envelope builder that reaches the network must first
check its operation name against the allowlist.
"""
if operation not in _ALLOWED_RISPORT_OPERATIONS:
raise ReadOnlyViolation(
f"RisPort70 operation {operation!r} is not in the read-only "
f"allowlist. Allowed: {sorted(_ALLOWED_RISPORT_OPERATIONS)}."
)
# SOAP namespaces. These match Cisco's published values for RisPort70. # SOAP namespaces. These match Cisco's published values for RisPort70.
_NS_SOAPENV = "http://schemas.xmlsoap.org/soap/envelope/" _NS_SOAPENV = "http://schemas.xmlsoap.org/soap/envelope/"
_NS_RIS = "http://schemas.cisco.com/ast/soap" _NS_RIS = "http://schemas.cisco.com/ast/soap"
@ -77,6 +102,7 @@ def _build_select_envelope(
are rejected. We err on the side of always including every field are rejected. We err on the side of always including every field
with sensible defaults. with sensible defaults.
""" """
_check_operation_allowed("selectCmDevice")
items = select_items if select_items else ["*"] items = select_items if select_items else ["*"]
items_xml = "".join( items_xml = "".join(
f"<soap:item><soap:Item>{_escape_xml(i)}</soap:Item></soap:item>" f"<soap:item><soap:Item>{_escape_xml(i)}</soap:Item></soap:item>"

View File

@ -123,6 +123,13 @@ def cache_stats() -> dict:
def cache_clear(method_pattern: str | None = None) -> dict: def cache_clear(method_pattern: str | None = None) -> dict:
"""Clear cache entries for the current cluster. """Clear cache entries for the current cluster.
**Local-only:** mutates the SQLite response cache at
~/.cache/mcaxl/responses/axl_responses.sqlite. Does NOT touch CUCM
the cluster is unaware this tool ran. The next tool call will re-fetch
from AXL. This is the only mcaxl tool that mutates any state, and the
state it mutates is local cache only; see /explanation/read-only-by-structure/
for what "read-only" means in this server's context.
Args: Args:
method_pattern: Optional method-name pattern (% wildcards). If omitted, method_pattern: Optional method-name pattern (% wildcards). If omitted,
clears the entire cache. Use after a known config change to force clears the entire cache. Use after a known config change to force

View File

@ -5,79 +5,128 @@ through a separate executeSQLUpdate method that we never expose). This client-
side check exists to: side check exists to:
1. Give the LLM a fast, clear error before a SOAP round-trip. 1. Give the LLM a fast, clear error before a SOAP round-trip.
2. Make read-only intent visible at the boundary, not implicit. 2. Make read-only intent visible at the boundary, not implicit.
Uses sqlparse for proper token-level analysis. Compared to the prior
regex-based implementation, this gives us:
- Explicit multi-statement detection via `len(sqlparse.parse(query))`
rather than hoping a forbidden keyword shows up in the second statement
(the regex version *accepted* `SELECT 1; SELECT 2` because neither
SELECT is forbidden).
- Proper string/comment boundary handling `'Smith -- old line'` is
a single Literal.String token, so the `--` inside it is never confused
for a comment marker, and `'log: DROP detected'` doesn't trip the
forbidden-keyword check because the entire literal is one token.
- Word-boundary correctness on identifiers `inserted_at` tokenizes as
`Token.Name`, never as `Token.Keyword.DML`, so columns named after
keyword fragments don't get false-positive blocked.
What sqlparse does NOT do (and we accept the conservative behavior):
- Disambiguate `SELECT delete FROM device` (where `delete` is meant as a
column name, not a keyword). sqlparse is a lexer, not a parser, and
classifies `delete` as `Token.Keyword.DML` regardless of context. The
check stays conservative CUCM's data dictionary doesn't use SQL
keywords as column names anyway.
""" """
from __future__ import annotations from __future__ import annotations
import re import sqlparse
from sqlparse.tokens import Comment, Keyword
_COMMENT_BLOCK = re.compile(r"/\*.*?\*/", re.DOTALL)
_COMMENT_LINE = re.compile(r"--[^\n]*")
# Match Informix string literals: single-quoted, with '' as escaped quote.
_STRING_LITERAL = re.compile(r"'(?:''|[^'])*'", re.DOTALL)
_FORBIDDEN = { _FORBIDDEN = {
"INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER", "INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER",
"TRUNCATE", "GRANT", "REVOKE", "MERGE", "REPLACE", "RENAME", "TRUNCATE", "GRANT", "REVOKE", "MERGE", "REPLACE", "RENAME",
"EXEC", "EXECUTE", "CALL", "ATTACH", "DETACH", "EXEC", "EXECUTE", "CALL", "ATTACH", "DETACH",
} }
_WORD_RE = re.compile(r"\b([A-Za-z_]+)\b")
class SqlValidationError(ValueError): class SqlValidationError(ValueError):
"""Raised when a query is not a safe read-only SELECT/WITH.""" """Raised when a query is not a safe read-only SELECT/WITH."""
def _is_skippable(tok) -> bool:
"""Whitespace and comments don't count toward token-position checks."""
if tok.is_whitespace:
return True
if tok.ttype is not None and tok.ttype in Comment:
return True
return False
def _is_keyword(tok) -> bool:
return tok.ttype is not None and tok.ttype in Keyword
def _is_substantive(stmt) -> bool:
"""A statement contributes to the multi-statement count only if it has
at least one non-whitespace, non-comment leaf token. This way trailing
semicolons and pure-comment "statements" don't inflate the count."""
return any(not _is_skippable(t) for t in stmt.flatten())
def validate_select(query: str) -> str: def validate_select(query: str) -> str:
"""Return the cleaned query, or raise SqlValidationError. """Return the cleaned query, or raise SqlValidationError.
Accepts SELECT and WITH (CTEs that ultimately return SELECT). Rejects Accepts SELECT and WITH (CTEs that ultimately return SELECT). Rejects
anything else, and any query containing forbidden keywords as standalone multi-statement input, and any query containing forbidden SQL keyword
tokens *outside* string literals and comments. tokens *outside* string literals and comments.
Hamilton review CRITICAL #1: the output we return MUST preserve the input Hamilton review CRITICAL #1 (preserved across the sqlparse rewrite):
byte-for-byte (modulo trailing semicolon and outer whitespace). Earlier the output we return MUST preserve the input byte-for-byte (modulo
versions ran a non-literal-aware comment strip on the output, which would trailing semicolon and outer whitespace). sqlparse is consulted for
silently eat `--` and `/* */` markers that legitimately appeared inside analysis only the cleaned query going to AXL is exactly what the
string literals like `WHERE description = 'Smith -- old line'`. The query caller intended. A bug here would mean the LLM's `description LIKE
going to AXL must be exactly what the caller intended comment stripping '%-- old%'` query gets a different shape than it asked for.
is an analysis-only operation, never a mutation of the wire query.
""" """
if not query or not query.strip(): if not query or not query.strip():
raise SqlValidationError("Query is empty.") raise SqlValidationError("Query is empty.")
# The query we'll send to AXL: original input, with only outer whitespace
# and a single trailing semicolon trimmed. NO mutation of literals or
# in-string comment markers.
cleaned = query.strip().rstrip(";").strip() cleaned = query.strip().rstrip(";").strip()
if not cleaned: if not cleaned:
raise SqlValidationError("Query is empty after trimming.") raise SqlValidationError("Query is empty after trimming.")
# Analysis-only copy: strip string literals AND comments (in either order parsed = sqlparse.parse(cleaned)
# is safe here, since each strip uses its own regex on a non-AXL-bound statements = [s for s in parsed if _is_substantive(s)]
# buffer). Order chosen: literals first, then comments, so that any
# comment markers genuinely outside literals can be detected.
for_analysis = _STRING_LITERAL.sub(" ", cleaned)
for_analysis = _COMMENT_BLOCK.sub(" ", for_analysis)
for_analysis = _COMMENT_LINE.sub(" ", for_analysis)
if not for_analysis.strip(): if not statements:
raise SqlValidationError("Query is empty after stripping comments.") raise SqlValidationError("Query is empty after stripping comments.")
if len(statements) > 1:
upper_tokens = [t.upper() for t in _WORD_RE.findall(for_analysis)]
if not upper_tokens:
raise SqlValidationError("Query contains no SQL keywords.")
first = upper_tokens[0]
if first not in {"SELECT", "WITH"}:
raise SqlValidationError( raise SqlValidationError(
f"Only SELECT and WITH are permitted; query starts with {first!r}." f"Multiple statements detected ({len(statements)}). "
f"Only single SELECT or WITH queries are permitted."
) )
forbidden_hits = sorted(set(upper_tokens) & _FORBIDDEN) stmt = statements[0]
if forbidden_hits:
first = next((t for t in stmt.flatten() if not _is_skippable(t)), None)
if first is None:
raise SqlValidationError("Query contains no SQL keywords.")
if not _is_keyword(first):
raise SqlValidationError( raise SqlValidationError(
f"Forbidden keyword(s) present: {', '.join(forbidden_hits)}. " f"Query must start with SELECT or WITH; first token is {first.value!r}."
)
first_upper = first.value.upper()
if first_upper not in {"SELECT", "WITH"}:
raise SqlValidationError(
f"Only SELECT and WITH are permitted; query starts with {first_upper!r}."
)
hits: set[str] = set()
for tok in stmt.flatten():
if not _is_keyword(tok):
continue
upper = tok.value.upper()
if upper in _FORBIDDEN:
hits.add(upper)
if hits:
raise SqlValidationError(
f"Forbidden keyword(s) present: {', '.join(sorted(hits))}. "
f"This server is read-only." f"This server is read-only."
) )

View File

@ -0,0 +1,88 @@
"""Tests for _ReadOnlyServiceProxy: defense-in-depth allowlist for AXL methods.
The proxy wraps the zeep service object so any SOAP method outside
{getCCMVersion, executeSQLQuery} raises ReadOnlyViolation at attribute
lookup, before zeep serializes an envelope. This is a guard against future
contributors accidentally calling write methods like addRoutePartition().
"""
from unittest.mock import MagicMock
import pytest
from mcaxl.client import (
ReadOnlyViolation,
_ALLOWED_AXL_METHODS,
_ReadOnlyServiceProxy,
)
class TestAllowlistEnforcement:
def test_allowed_method_dispatches_through(self):
# Both methods we currently use must pass through the proxy.
inner = MagicMock()
inner.getCCMVersion.return_value = {"version": "15.0(1)"}
inner.executeSQLQuery.return_value = {"rows": []}
proxy = _ReadOnlyServiceProxy(inner)
assert proxy.getCCMVersion() == {"version": "15.0(1)"}
assert proxy.executeSQLQuery(sql="SELECT 1") == {"rows": []}
inner.getCCMVersion.assert_called_once()
inner.executeSQLQuery.assert_called_once_with(sql="SELECT 1")
@pytest.mark.parametrize(
"method_name",
[
"addRoutePartition",
"updatePhone",
"removeUser",
"applyPhone",
"resetPhone",
"restartPhone",
"executeSQLUpdate", # the AXL write counterpart
"doDeviceLogin",
"wipePhone",
],
)
def test_disallowed_method_raises(self, method_name):
inner = MagicMock()
proxy = _ReadOnlyServiceProxy(inner)
with pytest.raises(ReadOnlyViolation, match=method_name):
getattr(proxy, method_name)
# The inner service must NOT have been touched at all — refusal
# happens before any SOAP serialization.
assert not getattr(inner, method_name).called
def test_allowlist_is_exactly_what_we_advertise(self):
# If this set ever grows, that's a deliberate decision and the
# test should be updated alongside the change. Catching unintended
# widening of the read-only surface is the point.
assert _ALLOWED_AXL_METHODS == frozenset(
{"getCCMVersion", "executeSQLQuery"}
)
class TestAttributePassthrough:
def test_dunder_attributes_pass_through(self):
# Zeep introspects services via dunder attributes (__class__,
# __dict__, etc.). The proxy must not break those.
inner = MagicMock()
inner.__class__ = MagicMock
proxy = _ReadOnlyServiceProxy(inner)
# Reading the class doesn't raise
_ = proxy.__class__
def test_underscore_prefixed_attributes_pass_through(self):
# zeep service internals like `_binding_options`, `_operations`
# are accessed by name. We don't want to gate those because they
# don't dispatch SOAP — they read metadata.
inner = MagicMock()
inner._binding_options = {"address": "https://cucm/axl/"}
inner._operations = ["getCCMVersion", "executeSQLQuery", "addPhone"]
proxy = _ReadOnlyServiceProxy(inner)
assert proxy._binding_options == {"address": "https://cucm/axl/"}
assert "addPhone" in proxy._operations # introspection, not dispatch

View File

@ -9,10 +9,13 @@ import xml.etree.ElementTree as ET
import pytest import pytest
from mcaxl.client import ReadOnlyViolation
from mcaxl.risport import ( from mcaxl.risport import (
DEVICE_STATUS_VALUES, DEVICE_STATUS_VALUES,
_ALLOWED_RISPORT_OPERATIONS,
RisPortClient, RisPortClient,
_build_select_envelope, _build_select_envelope,
_check_operation_allowed,
_escape_xml, _escape_xml,
_parse_response, _parse_response,
) )
@ -220,3 +223,34 @@ class TestStatusValidation:
# but the validation should run BEFORE that on bad input. # but the validation should run BEFORE that on bad input.
with pytest.raises(ValueError, match="status must be"): with pytest.raises(ValueError, match="status must be"):
client.select_cm_device(status="not-a-real-status") client.select_cm_device(status="not-a-real-status")
class TestReadOnlyAllowlist:
"""The RisPort envelope builders all gate on _check_operation_allowed.
This is the equivalent of the AXL service proxy the chokepoint that
blocks any future write-shaped operation from being assembled.
"""
def test_selectCmDevice_is_allowed(self):
# No raise — selectCmDevice is in the allowlist
_check_operation_allowed("selectCmDevice")
@pytest.mark.parametrize(
"operation",
[
"addCmDevice",
"removeCmDevice",
"resetDevice",
"restartDevice",
"applyCmDevice",
"executeSQLUpdate", # leakage from the AXL surface
],
)
def test_disallowed_operation_raises_in_check(self, operation):
with pytest.raises(ReadOnlyViolation, match=operation):
_check_operation_allowed(operation)
def test_allowlist_is_exactly_what_we_advertise(self):
# As with the AXL allowlist, widening this set is a deliberate
# decision that should be matched by an update to this test.
assert _ALLOWED_RISPORT_OPERATIONS == frozenset({"selectCmDevice"})

View File

@ -60,10 +60,12 @@ class TestRejected:
validate_select("DROP TABLE device") validate_select("DROP TABLE device")
def test_select_with_embedded_drop_rejected(self): def test_select_with_embedded_drop_rejected(self):
# Belt-and-suspenders: even if "DROP" appears in a quoted string-ish # Belt-and-suspenders: even if "DROP" appears alongside a SELECT, the
# position our keyword filter still catches it. AXL would also reject # query is rejected. Under the sqlparse-based validator the rejection
# this, but failing fast on the client saves a SOAP round-trip. # reason is now "Multiple statements detected" (caught earlier than
with pytest.raises(SqlValidationError, match="DROP"): # forbidden-keyword scan); under the prior regex validator it was
# "DROP". Either is correct — we just want this query refused.
with pytest.raises(SqlValidationError, match="DROP|Multiple"):
validate_select("SELECT 1 FROM device; DROP TABLE device") validate_select("SELECT 1 FROM device; DROP TABLE device")
def test_truncate_rejected(self): def test_truncate_rejected(self):
@ -107,7 +109,10 @@ class TestStringLiterals:
assert validate_select(q) assert validate_select(q)
def test_actual_drop_outside_literal_still_blocked(self): def test_actual_drop_outside_literal_still_blocked(self):
with pytest.raises(SqlValidationError, match="DROP"): # See test_select_with_embedded_drop_rejected — rejection reason
# is now multi-statement detection (sqlparse catches it earlier),
# but the query still fails closed.
with pytest.raises(SqlValidationError, match="DROP|Multiple"):
validate_select("SELECT 1 FROM device; DROP TABLE backup") validate_select("SELECT 1 FROM device; DROP TABLE backup")
def test_escaped_quote_in_literal(self): def test_escaped_quote_in_literal(self):
@ -175,3 +180,54 @@ class TestLiteralPreservedInOutput:
q = "SELECT 1 FROM numplan WHERE description = 'log: DROP detected'" q = "SELECT 1 FROM numplan WHERE description = 'log: DROP detected'"
result = validate_select(q) result = validate_select(q)
assert "'log: DROP detected'" in result assert "'log: DROP detected'" in result
class TestSqlparseSpecific:
"""Cases that exercise wins of the sqlparse-based validator over the
earlier regex implementation. Each test names the property being checked.
"""
def test_multi_statement_explicit_reject(self):
# Two SELECTs, no forbidden keywords. The regex validator accepted
# this because neither SELECT is in _FORBIDDEN. sqlparse catches it
# via the explicit statement-count check.
with pytest.raises(SqlValidationError, match="Multiple"):
validate_select("SELECT 1; SELECT 2")
def test_multi_statement_with_intervening_comments_reject(self):
# Comments between statements don't disguise the multi-statement
# nature — sqlparse still parses 2 statements.
with pytest.raises(SqlValidationError, match="Multiple"):
validate_select("SELECT 1 /* break */; /* and again */ SELECT 2")
def test_keyword_substring_in_identifier_passes(self):
# `inserted_at`, `update_status`, `dropped_call_count` — all
# legitimate column names containing forbidden-keyword substrings.
# sqlparse classifies them as Token.Name (single tokens), so the
# forbidden scan correctly ignores them.
for col in ("inserted_at", "update_status", "dropped_call_count"):
q = f"SELECT {col} FROM device"
assert validate_select(q) == q, f"column {col!r} should pass"
def test_forbidden_keyword_inside_cte_rejected(self):
# A CTE that internally does a DELETE must still be caught — sqlparse
# walks into nested groups, so the DELETE keyword token is found
# even though it's inside `WITH x AS (...)`.
with pytest.raises(SqlValidationError, match="DELETE"):
validate_select("WITH x AS (DELETE FROM y) SELECT * FROM x")
def test_drop_inside_block_comment_passes(self):
# The DROP is inside a /* ... */ comment. sqlparse classifies the
# entire comment as Token.Comment, so its content never reaches the
# forbidden-keyword scan.
q = "SELECT 1 /* ; DROP TABLE foo */ FROM device"
assert validate_select(q) == q
def test_nested_cte_all_clean_passes(self):
# Multiple CTEs chained, all SELECT-only — must accept.
q = (
"WITH a AS (SELECT 1 AS n FROM systables), "
"b AS (SELECT n FROM a WHERE n > 0) "
"SELECT * FROM b"
)
assert validate_select(q) == q

13
uv.lock generated
View File

@ -793,13 +793,14 @@ wheels = [
[[package]] [[package]]
name = "mcaxl" name = "mcaxl"
version = "2026.4.27" version = "2026.4.27.1"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "fastmcp" }, { name = "fastmcp" },
{ name = "numpy" }, { name = "numpy" },
{ name = "platformdirs" }, { name = "platformdirs" },
{ name = "python-dotenv" }, { name = "python-dotenv" },
{ name = "sqlparse" },
{ name = "zeep" }, { name = "zeep" },
] ]
@ -817,6 +818,7 @@ requires-dist = [
{ name = "pytest", marker = "extra == 'test'", specifier = ">=8.0" }, { name = "pytest", marker = "extra == 'test'", specifier = ">=8.0" },
{ name = "pytest-asyncio", marker = "extra == 'test'", specifier = ">=0.24" }, { name = "pytest-asyncio", marker = "extra == 'test'", specifier = ">=0.24" },
{ name = "python-dotenv", specifier = ">=1.0" }, { name = "python-dotenv", specifier = ">=1.0" },
{ name = "sqlparse", specifier = ">=0.5" },
{ name = "zeep", specifier = ">=4.3" }, { name = "zeep", specifier = ">=4.3" },
] ]
provides-extras = ["test"] provides-extras = ["test"]
@ -1545,6 +1547,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/46/f5af3402b579fd5e11573ce652019a67074317e18c1935cc0b4ba9b35552/secretstorage-3.5.0-py3-none-any.whl", hash = "sha256:0ce65888c0725fcb2c5bc0fdb8e5438eece02c523557ea40ce0703c266248137", size = 15554, upload-time = "2025-11-23T19:02:51.545Z" }, { url = "https://files.pythonhosted.org/packages/b7/46/f5af3402b579fd5e11573ce652019a67074317e18c1935cc0b4ba9b35552/secretstorage-3.5.0-py3-none-any.whl", hash = "sha256:0ce65888c0725fcb2c5bc0fdb8e5438eece02c523557ea40ce0703c266248137", size = 15554, upload-time = "2025-11-23T19:02:51.545Z" },
] ]
[[package]]
name = "sqlparse"
version = "0.5.5"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/90/76/437d71068094df0726366574cf3432a4ed754217b436eb7429415cf2d480/sqlparse-0.5.5.tar.gz", hash = "sha256:e20d4a9b0b8585fdf63b10d30066c7c94c5d7a7ec47c889a2d83a3caa93ff28e", size = 120815, upload-time = "2025-12-19T07:17:45.073Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/49/4b/359f28a903c13438ef59ebeee215fb25da53066db67b305c125f1c6d2a25/sqlparse-0.5.5-py3-none-any.whl", hash = "sha256:12a08b3bf3eec877c519589833aed092e2444e68240a3577e8e26148acc7b1ba", size = 46138, upload-time = "2025-12-19T07:17:46.573Z" },
]
[[package]] [[package]]
name = "sse-starlette" name = "sse-starlette"
version = "3.3.4" version = "3.3.4"