Compare commits
3 Commits
d597bd3569
...
3cf7dbc785
| Author | SHA1 | Date | |
|---|---|---|---|
| 3cf7dbc785 | |||
| 639d706200 | |||
| 59f9df5b3b |
@ -47,13 +47,65 @@ The validator's rules:
|
||||
1. After stripping comments and trimming whitespace, the query must
|
||||
start with `SELECT` or `WITH` (case-insensitive)
|
||||
2. The query must not contain any of: `INSERT`, `UPDATE`, `DELETE`,
|
||||
`DROP`, `CREATE`, `ALTER`, `TRUNCATE`, `EXEC`, `EXECUTE`, `CALL`
|
||||
3. Trailing semicolons are stripped before submission
|
||||
`DROP`, `CREATE`, `ALTER`, `TRUNCATE`, `MERGE`, `REPLACE`, `RENAME`,
|
||||
`GRANT`, `REVOKE`, `EXEC`, `EXECUTE`, `CALL`, `ATTACH`, `DETACH`
|
||||
3. Multi-statement input is rejected explicitly (the parser sees
|
||||
`SELECT 1; SELECT 2` as two statements and refuses)
|
||||
4. Trailing semicolons are stripped before submission
|
||||
|
||||
This catches the obvious cases. It is not a SQL parser, and it doesn't
|
||||
try to be — the structural read-only guarantee is the primary defense,
|
||||
The validator uses the `sqlparse` tokenizer rather than regex, so
|
||||
string literals and comments are correctly bounded — a description
|
||||
field containing the word `DELETE` doesn't trip the check, and a
|
||||
column name like `inserted_at` isn't confused with the `INSERT`
|
||||
keyword. The structural read-only guarantee is the primary defense;
|
||||
the validator is the secondary defense.
|
||||
|
||||
## Defense-in-depth: read-only allowlist proxy
|
||||
|
||||
`client.py` wraps the zeep AXL service in a `_ReadOnlyServiceProxy`
|
||||
whose `__getattr__` refuses any method outside an explicit
|
||||
allowlist (`getCCMVersion`, `executeSQLQuery`). If a future
|
||||
contributor adds a tool that accidentally calls
|
||||
`self._service.addRoutePartition(...)`, the proxy raises
|
||||
`ReadOnlyViolation` at attribute lookup — before zeep ever
|
||||
serializes a SOAP envelope.
|
||||
|
||||
The RisPort70 path has the parallel guard. RisPort doesn't go
|
||||
through zeep — its envelopes are hand-rolled — so the allowlist
|
||||
chokepoint lives in the envelope builders themselves: every
|
||||
builder calls `_check_operation_allowed(name)` as its first
|
||||
statement, and the only allowed operation today is `selectCmDevice`.
|
||||
|
||||
You can verify the proxy is active at runtime via the `health`
|
||||
tool — it surfaces `axl_connection.read_only_proxy: true` and
|
||||
`allowed_axl_methods: ["executeSQLQuery", "getCCMVersion"]`. If
|
||||
either field is missing or false, something has gone wrong with
|
||||
bootstrap.
|
||||
|
||||
## What read-only does NOT mean
|
||||
|
||||
"Read-only" in this server is precisely scoped: **read-only against
|
||||
CUCM**. Specifically that means:
|
||||
|
||||
- No SOAP method outside the AXL/RisPort allowlists is dispatched
|
||||
- No SQL outside `SELECT`/`WITH` reaches `executeSQLQuery`
|
||||
- No tool call mutates cluster configuration, registration state,
|
||||
or any other CUCM-side resource
|
||||
|
||||
It does **not** mean "no mutation anywhere." There is exactly one
|
||||
local mutation point: the `cache_clear` tool deletes entries from
|
||||
the SQLite response cache at
|
||||
`~/.cache/mcaxl/responses/axl_responses.sqlite`. CUCM is unaware
|
||||
this happened — the cache is a local optimization, not a CUCM
|
||||
artefact. The operation is idempotent, contains no PII beyond what
|
||||
the corresponding read-only query already returned, and is
|
||||
reversible by simply waiting for the next tool call to repopulate
|
||||
the relevant entries from AXL.
|
||||
|
||||
If you need *zero* local writes (e.g., when running mcaxl from a
|
||||
read-only filesystem), set `AXL_CACHE_TTL=0` to disable the cache
|
||||
entirely — `cache_clear` then has nothing to clear.
|
||||
|
||||
## Operational consequence: minimal-privilege service account
|
||||
|
||||
Because the server is structurally incapable of writes, the AXL service
|
||||
|
||||
@ -10,10 +10,19 @@ sidebar:
|
||||
(partitions, CSSs, patterns, lists, groups, transformations, filters),
|
||||
and **real-time registration** (RisPort70 device state).
|
||||
|
||||
Every tool is read-only. The server never registers AXL write methods —
|
||||
no `executeSQLUpdate`, no `add*` / `update*` / `remove*` / `apply*` /
|
||||
`reset*` / `restart*`. See [Read-only by structure](/explanation/read-only-by-structure/)
|
||||
for the rationale.
|
||||
Every tool is read-only **against CUCM**. The server never registers AXL
|
||||
write methods — no `executeSQLUpdate`, no `add*` / `update*` / `remove*`
|
||||
/ `apply*` / `reset*` / `restart*`. As of 2026-04, this is enforced at
|
||||
two layers: a `sqlparse`-based validator on `axl_sql` input that rejects
|
||||
non-`SELECT`/`WITH` statements (and explicit multi-statement input), and
|
||||
a runtime allowlist proxy around the AXL service that refuses any SOAP
|
||||
method outside `{getCCMVersion, executeSQLQuery}`. RisPort70 has the
|
||||
parallel allowlist in its envelope builder.
|
||||
|
||||
`cache_clear` is the **one** tool that mutates any state, and the state
|
||||
it mutates is the **local SQLite response cache only** — CUCM is
|
||||
unaware the call happened. See [Read-only by structure](/explanation/read-only-by-structure/)
|
||||
for the rationale and what read-only does NOT mean.
|
||||
|
||||
## Foundational
|
||||
|
||||
|
||||
@ -31,6 +31,11 @@ dependencies = [
|
||||
"platformdirs>=4.9",
|
||||
"numpy>=1.26",
|
||||
"python-dotenv>=1.0",
|
||||
# SQL tokenizer for the executeSQLQuery validator. Using a real lexer
|
||||
# (instead of regex keyword scanning) gives us proper string/comment
|
||||
# boundary handling and explicit multi-statement detection. See
|
||||
# src/mcaxl/sql_validator.py for the read-only enforcement layer.
|
||||
"sqlparse>=0.5",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
|
||||
@ -34,6 +34,51 @@ class _ConfigError(RuntimeError):
|
||||
"""
|
||||
|
||||
|
||||
class ReadOnlyViolation(RuntimeError):
|
||||
"""Raised when code attempts to dispatch an AXL method outside the
|
||||
read-only allowlist. This is a defense-in-depth guard: the SQL
|
||||
validator already prevents write SQL, and the tools as-written never
|
||||
call write methods, but the proxy ensures that a future contributor
|
||||
accidentally calling `_service.addRoutePartition(...)` gets a refusal
|
||||
at the boundary instead of a silent cluster mutation.
|
||||
"""
|
||||
|
||||
|
||||
# AXL methods this server is permitted to dispatch. Adding a new method
|
||||
# here is a deliberate decision — every additional name widens the
|
||||
# read-only surface and should be reviewed.
|
||||
_ALLOWED_AXL_METHODS = frozenset({
|
||||
"getCCMVersion",
|
||||
"executeSQLQuery",
|
||||
})
|
||||
|
||||
|
||||
class _ReadOnlyServiceProxy:
|
||||
"""Wraps a zeep service object. Refuses any method not in
|
||||
_ALLOWED_AXL_METHODS, raising ReadOnlyViolation.
|
||||
|
||||
Non-method attributes (zeep internals like `_binding_options`) are
|
||||
passed through untouched so the underlying client keeps working.
|
||||
"""
|
||||
|
||||
def __init__(self, inner):
|
||||
self._inner = inner
|
||||
|
||||
def __getattr__(self, name):
|
||||
# Dunders and private attributes pass through — we're guarding
|
||||
# against accidental SOAP dispatch, not introspection.
|
||||
if name.startswith("_"):
|
||||
return getattr(self._inner, name)
|
||||
if name not in _ALLOWED_AXL_METHODS:
|
||||
raise ReadOnlyViolation(
|
||||
f"AXL method {name!r} is not in the read-only allowlist. "
|
||||
f"Allowed: {sorted(_ALLOWED_AXL_METHODS)}. "
|
||||
f"This server is structurally read-only — see "
|
||||
f"docs/explanation/read-only-by-structure/ for context."
|
||||
)
|
||||
return getattr(self._inner, name)
|
||||
|
||||
|
||||
class AxlClient:
|
||||
"""Lazy-loaded zeep client for CUCM AXL.
|
||||
|
||||
@ -67,6 +112,13 @@ class AxlClient:
|
||||
"config_error": self._config_error, # permanent until restart
|
||||
"last_error": self._last_error,
|
||||
"retry_config": self._retry_config,
|
||||
# Operators should be able to verify the read-only proxy is
|
||||
# active without reading the source. True means writes will
|
||||
# be refused at dispatch time even if some future tool tries.
|
||||
"read_only_proxy": self._service is not None and isinstance(
|
||||
self._service, _ReadOnlyServiceProxy
|
||||
),
|
||||
"allowed_axl_methods": sorted(_ALLOWED_AXL_METHODS),
|
||||
}
|
||||
|
||||
def _ensure_connected(self) -> None:
|
||||
@ -153,10 +205,15 @@ class AxlClient:
|
||||
)
|
||||
# AXL endpoint is the AXL_URL itself; override the WSDL's default
|
||||
# service location which usually points at a placeholder host.
|
||||
self._service = self._client.create_service(
|
||||
# Wrap the resulting service in a read-only allowlist proxy —
|
||||
# any SOAP method not in _ALLOWED_AXL_METHODS will raise
|
||||
# ReadOnlyViolation at attribute lookup time, before zeep
|
||||
# serializes a SOAP envelope.
|
||||
raw_service = self._client.create_service(
|
||||
"{http://www.cisco.com/AXLAPIService/}AXLAPIBinding",
|
||||
url,
|
||||
)
|
||||
self._service = _ReadOnlyServiceProxy(raw_service)
|
||||
import time as _time
|
||||
self._connected_at = _time.monotonic()
|
||||
self._last_error = None # operational state is now clean
|
||||
|
||||
@ -36,10 +36,35 @@ from requests.adapters import HTTPAdapter
|
||||
from requests.auth import HTTPBasicAuth
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
from .client import ReadOnlyViolation
|
||||
|
||||
|
||||
# RisPort path on the CUCM publisher
|
||||
_RIS_PATH = "/realtimeservice2/services/RISService70"
|
||||
|
||||
# RisPort70 operations this server is permitted to invoke. Parallels the
|
||||
# AXL allowlist in client.py — adding a new operation here is a deliberate
|
||||
# decision that widens the read-only surface.
|
||||
_ALLOWED_RISPORT_OPERATIONS = frozenset({
|
||||
"selectCmDevice",
|
||||
})
|
||||
|
||||
|
||||
def _check_operation_allowed(operation: str) -> None:
|
||||
"""Raise ReadOnlyViolation if `operation` is outside the allowlist.
|
||||
|
||||
Called at the top of each envelope builder. RisPort70 doesn't go
|
||||
through zeep — envelopes are hand-rolled — so the proxy pattern
|
||||
used for AXL doesn't apply directly. This function is the equivalent
|
||||
chokepoint: any envelope builder that reaches the network must first
|
||||
check its operation name against the allowlist.
|
||||
"""
|
||||
if operation not in _ALLOWED_RISPORT_OPERATIONS:
|
||||
raise ReadOnlyViolation(
|
||||
f"RisPort70 operation {operation!r} is not in the read-only "
|
||||
f"allowlist. Allowed: {sorted(_ALLOWED_RISPORT_OPERATIONS)}."
|
||||
)
|
||||
|
||||
# SOAP namespaces. These match Cisco's published values for RisPort70.
|
||||
_NS_SOAPENV = "http://schemas.xmlsoap.org/soap/envelope/"
|
||||
_NS_RIS = "http://schemas.cisco.com/ast/soap"
|
||||
@ -77,6 +102,7 @@ def _build_select_envelope(
|
||||
are rejected. We err on the side of always including every field
|
||||
with sensible defaults.
|
||||
"""
|
||||
_check_operation_allowed("selectCmDevice")
|
||||
items = select_items if select_items else ["*"]
|
||||
items_xml = "".join(
|
||||
f"<soap:item><soap:Item>{_escape_xml(i)}</soap:Item></soap:item>"
|
||||
|
||||
@ -123,6 +123,13 @@ def cache_stats() -> dict:
|
||||
def cache_clear(method_pattern: str | None = None) -> dict:
|
||||
"""Clear cache entries for the current cluster.
|
||||
|
||||
**Local-only:** mutates the SQLite response cache at
|
||||
~/.cache/mcaxl/responses/axl_responses.sqlite. Does NOT touch CUCM —
|
||||
the cluster is unaware this tool ran. The next tool call will re-fetch
|
||||
from AXL. This is the only mcaxl tool that mutates any state, and the
|
||||
state it mutates is local cache only; see /explanation/read-only-by-structure/
|
||||
for what "read-only" means in this server's context.
|
||||
|
||||
Args:
|
||||
method_pattern: Optional method-name pattern (% wildcards). If omitted,
|
||||
clears the entire cache. Use after a known config change to force
|
||||
|
||||
@ -5,79 +5,128 @@ through a separate executeSQLUpdate method that we never expose). This client-
|
||||
side check exists to:
|
||||
1. Give the LLM a fast, clear error before a SOAP round-trip.
|
||||
2. Make read-only intent visible at the boundary, not implicit.
|
||||
|
||||
Uses sqlparse for proper token-level analysis. Compared to the prior
|
||||
regex-based implementation, this gives us:
|
||||
|
||||
- Explicit multi-statement detection via `len(sqlparse.parse(query))`
|
||||
rather than hoping a forbidden keyword shows up in the second statement
|
||||
(the regex version *accepted* `SELECT 1; SELECT 2` because neither
|
||||
SELECT is forbidden).
|
||||
- Proper string/comment boundary handling — `'Smith -- old line'` is
|
||||
a single Literal.String token, so the `--` inside it is never confused
|
||||
for a comment marker, and `'log: DROP detected'` doesn't trip the
|
||||
forbidden-keyword check because the entire literal is one token.
|
||||
- Word-boundary correctness on identifiers — `inserted_at` tokenizes as
|
||||
`Token.Name`, never as `Token.Keyword.DML`, so columns named after
|
||||
keyword fragments don't get false-positive blocked.
|
||||
|
||||
What sqlparse does NOT do (and we accept the conservative behavior):
|
||||
|
||||
- Disambiguate `SELECT delete FROM device` (where `delete` is meant as a
|
||||
column name, not a keyword). sqlparse is a lexer, not a parser, and
|
||||
classifies `delete` as `Token.Keyword.DML` regardless of context. The
|
||||
check stays conservative — CUCM's data dictionary doesn't use SQL
|
||||
keywords as column names anyway.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import sqlparse
|
||||
from sqlparse.tokens import Comment, Keyword
|
||||
|
||||
|
||||
_COMMENT_BLOCK = re.compile(r"/\*.*?\*/", re.DOTALL)
|
||||
_COMMENT_LINE = re.compile(r"--[^\n]*")
|
||||
# Match Informix string literals: single-quoted, with '' as escaped quote.
|
||||
_STRING_LITERAL = re.compile(r"'(?:''|[^'])*'", re.DOTALL)
|
||||
_FORBIDDEN = {
|
||||
"INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER",
|
||||
"TRUNCATE", "GRANT", "REVOKE", "MERGE", "REPLACE", "RENAME",
|
||||
"EXEC", "EXECUTE", "CALL", "ATTACH", "DETACH",
|
||||
}
|
||||
_WORD_RE = re.compile(r"\b([A-Za-z_]+)\b")
|
||||
|
||||
|
||||
class SqlValidationError(ValueError):
|
||||
"""Raised when a query is not a safe read-only SELECT/WITH."""
|
||||
|
||||
|
||||
def _is_skippable(tok) -> bool:
|
||||
"""Whitespace and comments don't count toward token-position checks."""
|
||||
if tok.is_whitespace:
|
||||
return True
|
||||
if tok.ttype is not None and tok.ttype in Comment:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _is_keyword(tok) -> bool:
|
||||
return tok.ttype is not None and tok.ttype in Keyword
|
||||
|
||||
|
||||
def _is_substantive(stmt) -> bool:
|
||||
"""A statement contributes to the multi-statement count only if it has
|
||||
at least one non-whitespace, non-comment leaf token. This way trailing
|
||||
semicolons and pure-comment "statements" don't inflate the count."""
|
||||
return any(not _is_skippable(t) for t in stmt.flatten())
|
||||
|
||||
|
||||
def validate_select(query: str) -> str:
|
||||
"""Return the cleaned query, or raise SqlValidationError.
|
||||
|
||||
Accepts SELECT and WITH (CTEs that ultimately return SELECT). Rejects
|
||||
anything else, and any query containing forbidden keywords as standalone
|
||||
multi-statement input, and any query containing forbidden SQL keyword
|
||||
tokens *outside* string literals and comments.
|
||||
|
||||
Hamilton review CRITICAL #1: the output we return MUST preserve the input
|
||||
byte-for-byte (modulo trailing semicolon and outer whitespace). Earlier
|
||||
versions ran a non-literal-aware comment strip on the output, which would
|
||||
silently eat `--` and `/* */` markers that legitimately appeared inside
|
||||
string literals like `WHERE description = 'Smith -- old line'`. The query
|
||||
going to AXL must be exactly what the caller intended — comment stripping
|
||||
is an analysis-only operation, never a mutation of the wire query.
|
||||
Hamilton review CRITICAL #1 (preserved across the sqlparse rewrite):
|
||||
the output we return MUST preserve the input byte-for-byte (modulo
|
||||
trailing semicolon and outer whitespace). sqlparse is consulted for
|
||||
analysis only — the cleaned query going to AXL is exactly what the
|
||||
caller intended. A bug here would mean the LLM's `description LIKE
|
||||
'%-- old%'` query gets a different shape than it asked for.
|
||||
"""
|
||||
if not query or not query.strip():
|
||||
raise SqlValidationError("Query is empty.")
|
||||
|
||||
# The query we'll send to AXL: original input, with only outer whitespace
|
||||
# and a single trailing semicolon trimmed. NO mutation of literals or
|
||||
# in-string comment markers.
|
||||
cleaned = query.strip().rstrip(";").strip()
|
||||
if not cleaned:
|
||||
raise SqlValidationError("Query is empty after trimming.")
|
||||
|
||||
# Analysis-only copy: strip string literals AND comments (in either order
|
||||
# is safe here, since each strip uses its own regex on a non-AXL-bound
|
||||
# buffer). Order chosen: literals first, then comments, so that any
|
||||
# comment markers genuinely outside literals can be detected.
|
||||
for_analysis = _STRING_LITERAL.sub(" ", cleaned)
|
||||
for_analysis = _COMMENT_BLOCK.sub(" ", for_analysis)
|
||||
for_analysis = _COMMENT_LINE.sub(" ", for_analysis)
|
||||
parsed = sqlparse.parse(cleaned)
|
||||
statements = [s for s in parsed if _is_substantive(s)]
|
||||
|
||||
if not for_analysis.strip():
|
||||
if not statements:
|
||||
raise SqlValidationError("Query is empty after stripping comments.")
|
||||
|
||||
upper_tokens = [t.upper() for t in _WORD_RE.findall(for_analysis)]
|
||||
if not upper_tokens:
|
||||
raise SqlValidationError("Query contains no SQL keywords.")
|
||||
|
||||
first = upper_tokens[0]
|
||||
if first not in {"SELECT", "WITH"}:
|
||||
if len(statements) > 1:
|
||||
raise SqlValidationError(
|
||||
f"Only SELECT and WITH are permitted; query starts with {first!r}."
|
||||
f"Multiple statements detected ({len(statements)}). "
|
||||
f"Only single SELECT or WITH queries are permitted."
|
||||
)
|
||||
|
||||
forbidden_hits = sorted(set(upper_tokens) & _FORBIDDEN)
|
||||
if forbidden_hits:
|
||||
stmt = statements[0]
|
||||
|
||||
first = next((t for t in stmt.flatten() if not _is_skippable(t)), None)
|
||||
if first is None:
|
||||
raise SqlValidationError("Query contains no SQL keywords.")
|
||||
|
||||
if not _is_keyword(first):
|
||||
raise SqlValidationError(
|
||||
f"Forbidden keyword(s) present: {', '.join(forbidden_hits)}. "
|
||||
f"Query must start with SELECT or WITH; first token is {first.value!r}."
|
||||
)
|
||||
|
||||
first_upper = first.value.upper()
|
||||
if first_upper not in {"SELECT", "WITH"}:
|
||||
raise SqlValidationError(
|
||||
f"Only SELECT and WITH are permitted; query starts with {first_upper!r}."
|
||||
)
|
||||
|
||||
hits: set[str] = set()
|
||||
for tok in stmt.flatten():
|
||||
if not _is_keyword(tok):
|
||||
continue
|
||||
upper = tok.value.upper()
|
||||
if upper in _FORBIDDEN:
|
||||
hits.add(upper)
|
||||
|
||||
if hits:
|
||||
raise SqlValidationError(
|
||||
f"Forbidden keyword(s) present: {', '.join(sorted(hits))}. "
|
||||
f"This server is read-only."
|
||||
)
|
||||
|
||||
|
||||
88
tests/test_readonly_proxy.py
Normal file
88
tests/test_readonly_proxy.py
Normal file
@ -0,0 +1,88 @@
|
||||
"""Tests for _ReadOnlyServiceProxy: defense-in-depth allowlist for AXL methods.
|
||||
|
||||
The proxy wraps the zeep service object so any SOAP method outside
|
||||
{getCCMVersion, executeSQLQuery} raises ReadOnlyViolation at attribute
|
||||
lookup, before zeep serializes an envelope. This is a guard against future
|
||||
contributors accidentally calling write methods like addRoutePartition().
|
||||
"""
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from mcaxl.client import (
|
||||
ReadOnlyViolation,
|
||||
_ALLOWED_AXL_METHODS,
|
||||
_ReadOnlyServiceProxy,
|
||||
)
|
||||
|
||||
|
||||
class TestAllowlistEnforcement:
|
||||
def test_allowed_method_dispatches_through(self):
|
||||
# Both methods we currently use must pass through the proxy.
|
||||
inner = MagicMock()
|
||||
inner.getCCMVersion.return_value = {"version": "15.0(1)"}
|
||||
inner.executeSQLQuery.return_value = {"rows": []}
|
||||
proxy = _ReadOnlyServiceProxy(inner)
|
||||
|
||||
assert proxy.getCCMVersion() == {"version": "15.0(1)"}
|
||||
assert proxy.executeSQLQuery(sql="SELECT 1") == {"rows": []}
|
||||
inner.getCCMVersion.assert_called_once()
|
||||
inner.executeSQLQuery.assert_called_once_with(sql="SELECT 1")
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"method_name",
|
||||
[
|
||||
"addRoutePartition",
|
||||
"updatePhone",
|
||||
"removeUser",
|
||||
"applyPhone",
|
||||
"resetPhone",
|
||||
"restartPhone",
|
||||
"executeSQLUpdate", # the AXL write counterpart
|
||||
"doDeviceLogin",
|
||||
"wipePhone",
|
||||
],
|
||||
)
|
||||
def test_disallowed_method_raises(self, method_name):
|
||||
inner = MagicMock()
|
||||
proxy = _ReadOnlyServiceProxy(inner)
|
||||
|
||||
with pytest.raises(ReadOnlyViolation, match=method_name):
|
||||
getattr(proxy, method_name)
|
||||
|
||||
# The inner service must NOT have been touched at all — refusal
|
||||
# happens before any SOAP serialization.
|
||||
assert not getattr(inner, method_name).called
|
||||
|
||||
def test_allowlist_is_exactly_what_we_advertise(self):
|
||||
# If this set ever grows, that's a deliberate decision and the
|
||||
# test should be updated alongside the change. Catching unintended
|
||||
# widening of the read-only surface is the point.
|
||||
assert _ALLOWED_AXL_METHODS == frozenset(
|
||||
{"getCCMVersion", "executeSQLQuery"}
|
||||
)
|
||||
|
||||
|
||||
class TestAttributePassthrough:
|
||||
def test_dunder_attributes_pass_through(self):
|
||||
# Zeep introspects services via dunder attributes (__class__,
|
||||
# __dict__, etc.). The proxy must not break those.
|
||||
inner = MagicMock()
|
||||
inner.__class__ = MagicMock
|
||||
proxy = _ReadOnlyServiceProxy(inner)
|
||||
|
||||
# Reading the class doesn't raise
|
||||
_ = proxy.__class__
|
||||
|
||||
def test_underscore_prefixed_attributes_pass_through(self):
|
||||
# zeep service internals like `_binding_options`, `_operations`
|
||||
# are accessed by name. We don't want to gate those because they
|
||||
# don't dispatch SOAP — they read metadata.
|
||||
inner = MagicMock()
|
||||
inner._binding_options = {"address": "https://cucm/axl/"}
|
||||
inner._operations = ["getCCMVersion", "executeSQLQuery", "addPhone"]
|
||||
proxy = _ReadOnlyServiceProxy(inner)
|
||||
|
||||
assert proxy._binding_options == {"address": "https://cucm/axl/"}
|
||||
assert "addPhone" in proxy._operations # introspection, not dispatch
|
||||
@ -9,10 +9,13 @@ import xml.etree.ElementTree as ET
|
||||
|
||||
import pytest
|
||||
|
||||
from mcaxl.client import ReadOnlyViolation
|
||||
from mcaxl.risport import (
|
||||
DEVICE_STATUS_VALUES,
|
||||
_ALLOWED_RISPORT_OPERATIONS,
|
||||
RisPortClient,
|
||||
_build_select_envelope,
|
||||
_check_operation_allowed,
|
||||
_escape_xml,
|
||||
_parse_response,
|
||||
)
|
||||
@ -220,3 +223,34 @@ class TestStatusValidation:
|
||||
# but the validation should run BEFORE that on bad input.
|
||||
with pytest.raises(ValueError, match="status must be"):
|
||||
client.select_cm_device(status="not-a-real-status")
|
||||
|
||||
|
||||
class TestReadOnlyAllowlist:
|
||||
"""The RisPort envelope builders all gate on _check_operation_allowed.
|
||||
This is the equivalent of the AXL service proxy — the chokepoint that
|
||||
blocks any future write-shaped operation from being assembled.
|
||||
"""
|
||||
|
||||
def test_selectCmDevice_is_allowed(self):
|
||||
# No raise — selectCmDevice is in the allowlist
|
||||
_check_operation_allowed("selectCmDevice")
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"operation",
|
||||
[
|
||||
"addCmDevice",
|
||||
"removeCmDevice",
|
||||
"resetDevice",
|
||||
"restartDevice",
|
||||
"applyCmDevice",
|
||||
"executeSQLUpdate", # leakage from the AXL surface
|
||||
],
|
||||
)
|
||||
def test_disallowed_operation_raises_in_check(self, operation):
|
||||
with pytest.raises(ReadOnlyViolation, match=operation):
|
||||
_check_operation_allowed(operation)
|
||||
|
||||
def test_allowlist_is_exactly_what_we_advertise(self):
|
||||
# As with the AXL allowlist, widening this set is a deliberate
|
||||
# decision that should be matched by an update to this test.
|
||||
assert _ALLOWED_RISPORT_OPERATIONS == frozenset({"selectCmDevice"})
|
||||
|
||||
@ -60,10 +60,12 @@ class TestRejected:
|
||||
validate_select("DROP TABLE device")
|
||||
|
||||
def test_select_with_embedded_drop_rejected(self):
|
||||
# Belt-and-suspenders: even if "DROP" appears in a quoted string-ish
|
||||
# position our keyword filter still catches it. AXL would also reject
|
||||
# this, but failing fast on the client saves a SOAP round-trip.
|
||||
with pytest.raises(SqlValidationError, match="DROP"):
|
||||
# Belt-and-suspenders: even if "DROP" appears alongside a SELECT, the
|
||||
# query is rejected. Under the sqlparse-based validator the rejection
|
||||
# reason is now "Multiple statements detected" (caught earlier than
|
||||
# forbidden-keyword scan); under the prior regex validator it was
|
||||
# "DROP". Either is correct — we just want this query refused.
|
||||
with pytest.raises(SqlValidationError, match="DROP|Multiple"):
|
||||
validate_select("SELECT 1 FROM device; DROP TABLE device")
|
||||
|
||||
def test_truncate_rejected(self):
|
||||
@ -107,7 +109,10 @@ class TestStringLiterals:
|
||||
assert validate_select(q)
|
||||
|
||||
def test_actual_drop_outside_literal_still_blocked(self):
|
||||
with pytest.raises(SqlValidationError, match="DROP"):
|
||||
# See test_select_with_embedded_drop_rejected — rejection reason
|
||||
# is now multi-statement detection (sqlparse catches it earlier),
|
||||
# but the query still fails closed.
|
||||
with pytest.raises(SqlValidationError, match="DROP|Multiple"):
|
||||
validate_select("SELECT 1 FROM device; DROP TABLE backup")
|
||||
|
||||
def test_escaped_quote_in_literal(self):
|
||||
@ -175,3 +180,54 @@ class TestLiteralPreservedInOutput:
|
||||
q = "SELECT 1 FROM numplan WHERE description = 'log: DROP detected'"
|
||||
result = validate_select(q)
|
||||
assert "'log: DROP detected'" in result
|
||||
|
||||
|
||||
class TestSqlparseSpecific:
|
||||
"""Cases that exercise wins of the sqlparse-based validator over the
|
||||
earlier regex implementation. Each test names the property being checked.
|
||||
"""
|
||||
|
||||
def test_multi_statement_explicit_reject(self):
|
||||
# Two SELECTs, no forbidden keywords. The regex validator accepted
|
||||
# this because neither SELECT is in _FORBIDDEN. sqlparse catches it
|
||||
# via the explicit statement-count check.
|
||||
with pytest.raises(SqlValidationError, match="Multiple"):
|
||||
validate_select("SELECT 1; SELECT 2")
|
||||
|
||||
def test_multi_statement_with_intervening_comments_reject(self):
|
||||
# Comments between statements don't disguise the multi-statement
|
||||
# nature — sqlparse still parses 2 statements.
|
||||
with pytest.raises(SqlValidationError, match="Multiple"):
|
||||
validate_select("SELECT 1 /* break */; /* and again */ SELECT 2")
|
||||
|
||||
def test_keyword_substring_in_identifier_passes(self):
|
||||
# `inserted_at`, `update_status`, `dropped_call_count` — all
|
||||
# legitimate column names containing forbidden-keyword substrings.
|
||||
# sqlparse classifies them as Token.Name (single tokens), so the
|
||||
# forbidden scan correctly ignores them.
|
||||
for col in ("inserted_at", "update_status", "dropped_call_count"):
|
||||
q = f"SELECT {col} FROM device"
|
||||
assert validate_select(q) == q, f"column {col!r} should pass"
|
||||
|
||||
def test_forbidden_keyword_inside_cte_rejected(self):
|
||||
# A CTE that internally does a DELETE must still be caught — sqlparse
|
||||
# walks into nested groups, so the DELETE keyword token is found
|
||||
# even though it's inside `WITH x AS (...)`.
|
||||
with pytest.raises(SqlValidationError, match="DELETE"):
|
||||
validate_select("WITH x AS (DELETE FROM y) SELECT * FROM x")
|
||||
|
||||
def test_drop_inside_block_comment_passes(self):
|
||||
# The DROP is inside a /* ... */ comment. sqlparse classifies the
|
||||
# entire comment as Token.Comment, so its content never reaches the
|
||||
# forbidden-keyword scan.
|
||||
q = "SELECT 1 /* ; DROP TABLE foo */ FROM device"
|
||||
assert validate_select(q) == q
|
||||
|
||||
def test_nested_cte_all_clean_passes(self):
|
||||
# Multiple CTEs chained, all SELECT-only — must accept.
|
||||
q = (
|
||||
"WITH a AS (SELECT 1 AS n FROM systables), "
|
||||
"b AS (SELECT n FROM a WHERE n > 0) "
|
||||
"SELECT * FROM b"
|
||||
)
|
||||
assert validate_select(q) == q
|
||||
|
||||
13
uv.lock
generated
13
uv.lock
generated
@ -793,13 +793,14 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "mcaxl"
|
||||
version = "2026.4.27"
|
||||
version = "2026.4.27.1"
|
||||
source = { editable = "." }
|
||||
dependencies = [
|
||||
{ name = "fastmcp" },
|
||||
{ name = "numpy" },
|
||||
{ name = "platformdirs" },
|
||||
{ name = "python-dotenv" },
|
||||
{ name = "sqlparse" },
|
||||
{ name = "zeep" },
|
||||
]
|
||||
|
||||
@ -817,6 +818,7 @@ requires-dist = [
|
||||
{ name = "pytest", marker = "extra == 'test'", specifier = ">=8.0" },
|
||||
{ name = "pytest-asyncio", marker = "extra == 'test'", specifier = ">=0.24" },
|
||||
{ name = "python-dotenv", specifier = ">=1.0" },
|
||||
{ name = "sqlparse", specifier = ">=0.5" },
|
||||
{ name = "zeep", specifier = ">=4.3" },
|
||||
]
|
||||
provides-extras = ["test"]
|
||||
@ -1545,6 +1547,15 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/b7/46/f5af3402b579fd5e11573ce652019a67074317e18c1935cc0b4ba9b35552/secretstorage-3.5.0-py3-none-any.whl", hash = "sha256:0ce65888c0725fcb2c5bc0fdb8e5438eece02c523557ea40ce0703c266248137", size = 15554, upload-time = "2025-11-23T19:02:51.545Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sqlparse"
|
||||
version = "0.5.5"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/90/76/437d71068094df0726366574cf3432a4ed754217b436eb7429415cf2d480/sqlparse-0.5.5.tar.gz", hash = "sha256:e20d4a9b0b8585fdf63b10d30066c7c94c5d7a7ec47c889a2d83a3caa93ff28e", size = 120815, upload-time = "2025-12-19T07:17:45.073Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/49/4b/359f28a903c13438ef59ebeee215fb25da53066db67b305c125f1c6d2a25/sqlparse-0.5.5-py3-none-any.whl", hash = "sha256:12a08b3bf3eec877c519589833aed092e2444e68240a3577e8e26148acc7b1ba", size = 46138, upload-time = "2025-12-19T07:17:46.573Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sse-starlette"
|
||||
version = "3.3.4"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user