Compare commits

..

No commits in common. "3cf7dbc785b2a63fdaf3aae8dd79f2fc9f04595d" and "d597bd3569adc3fc57af0a729eac026ecb598bd0" have entirely different histories.

11 changed files with 48 additions and 442 deletions

View File

@ -47,65 +47,13 @@ The validator's rules:
1. After stripping comments and trimming whitespace, the query must
start with `SELECT` or `WITH` (case-insensitive)
2. The query must not contain any of: `INSERT`, `UPDATE`, `DELETE`,
`DROP`, `CREATE`, `ALTER`, `TRUNCATE`, `MERGE`, `REPLACE`, `RENAME`,
`GRANT`, `REVOKE`, `EXEC`, `EXECUTE`, `CALL`, `ATTACH`, `DETACH`
3. Multi-statement input is rejected explicitly (the parser sees
`SELECT 1; SELECT 2` as two statements and refuses)
4. Trailing semicolons are stripped before submission
`DROP`, `CREATE`, `ALTER`, `TRUNCATE`, `EXEC`, `EXECUTE`, `CALL`
3. Trailing semicolons are stripped before submission
The validator uses the `sqlparse` tokenizer rather than regex, so
string literals and comments are correctly bounded — a description
field containing the word `DELETE` doesn't trip the check, and a
column name like `inserted_at` isn't confused with the `INSERT`
keyword. The structural read-only guarantee is the primary defense;
This catches the obvious cases. It is not a SQL parser, and it doesn't
try to be — the structural read-only guarantee is the primary defense,
the validator is the secondary defense.
## Defense-in-depth: read-only allowlist proxy
`client.py` wraps the zeep AXL service in a `_ReadOnlyServiceProxy`
whose `__getattr__` refuses any method outside an explicit
allowlist (`getCCMVersion`, `executeSQLQuery`). If a future
contributor adds a tool that accidentally calls
`self._service.addRoutePartition(...)`, the proxy raises
`ReadOnlyViolation` at attribute lookup — before zeep ever
serializes a SOAP envelope.
The RisPort70 path has the parallel guard. RisPort doesn't go
through zeep — its envelopes are hand-rolled — so the allowlist
chokepoint lives in the envelope builders themselves: every
builder calls `_check_operation_allowed(name)` as its first
statement, and the only allowed operation today is `selectCmDevice`.
You can verify the proxy is active at runtime via the `health`
tool — it surfaces `axl_connection.read_only_proxy: true` and
`allowed_axl_methods: ["executeSQLQuery", "getCCMVersion"]`. If
either field is missing or false, something has gone wrong with
bootstrap.
## What read-only does NOT mean
"Read-only" in this server is precisely scoped: **read-only against
CUCM**. Specifically that means:
- No SOAP method outside the AXL/RisPort allowlists is dispatched
- No SQL outside `SELECT`/`WITH` reaches `executeSQLQuery`
- No tool call mutates cluster configuration, registration state,
or any other CUCM-side resource
It does **not** mean "no mutation anywhere." There is exactly one
local mutation point: the `cache_clear` tool deletes entries from
the SQLite response cache at
`~/.cache/mcaxl/responses/axl_responses.sqlite`. CUCM is unaware
this happened — the cache is a local optimization, not a CUCM
artefact. The operation is idempotent, contains no PII beyond what
the corresponding read-only query already returned, and is
reversible by simply waiting for the next tool call to repopulate
the relevant entries from AXL.
If you need *zero* local writes (e.g., when running mcaxl from a
read-only filesystem), set `AXL_CACHE_TTL=0` to disable the cache
entirely — `cache_clear` then has nothing to clear.
## Operational consequence: minimal-privilege service account
Because the server is structurally incapable of writes, the AXL service

View File

@ -10,19 +10,10 @@ sidebar:
(partitions, CSSs, patterns, lists, groups, transformations, filters),
and **real-time registration** (RisPort70 device state).
Every tool is read-only **against CUCM**. The server never registers AXL
write methods — no `executeSQLUpdate`, no `add*` / `update*` / `remove*`
/ `apply*` / `reset*` / `restart*`. As of 2026-04, this is enforced at
two layers: a `sqlparse`-based validator on `axl_sql` input that rejects
non-`SELECT`/`WITH` statements (and explicit multi-statement input), and
a runtime allowlist proxy around the AXL service that refuses any SOAP
method outside `{getCCMVersion, executeSQLQuery}`. RisPort70 has the
parallel allowlist in its envelope builder.
`cache_clear` is the **one** tool that mutates any state, and the state
it mutates is the **local SQLite response cache only** — CUCM is
unaware the call happened. See [Read-only by structure](/explanation/read-only-by-structure/)
for the rationale and what read-only does NOT mean.
Every tool is read-only. The server never registers AXL write methods —
no `executeSQLUpdate`, no `add*` / `update*` / `remove*` / `apply*` /
`reset*` / `restart*`. See [Read-only by structure](/explanation/read-only-by-structure/)
for the rationale.
## Foundational

View File

@ -31,11 +31,6 @@ dependencies = [
"platformdirs>=4.9",
"numpy>=1.26",
"python-dotenv>=1.0",
# SQL tokenizer for the executeSQLQuery validator. Using a real lexer
# (instead of regex keyword scanning) gives us proper string/comment
# boundary handling and explicit multi-statement detection. See
# src/mcaxl/sql_validator.py for the read-only enforcement layer.
"sqlparse>=0.5",
]
[project.optional-dependencies]

View File

@ -34,51 +34,6 @@ class _ConfigError(RuntimeError):
"""
class ReadOnlyViolation(RuntimeError):
"""Raised when code attempts to dispatch an AXL method outside the
read-only allowlist. This is a defense-in-depth guard: the SQL
validator already prevents write SQL, and the tools as-written never
call write methods, but the proxy ensures that a future contributor
accidentally calling `_service.addRoutePartition(...)` gets a refusal
at the boundary instead of a silent cluster mutation.
"""
# AXL methods this server is permitted to dispatch. Adding a new method
# here is a deliberate decision — every additional name widens the
# read-only surface and should be reviewed.
_ALLOWED_AXL_METHODS = frozenset({
"getCCMVersion",
"executeSQLQuery",
})
class _ReadOnlyServiceProxy:
"""Wraps a zeep service object. Refuses any method not in
_ALLOWED_AXL_METHODS, raising ReadOnlyViolation.
Non-method attributes (zeep internals like `_binding_options`) are
passed through untouched so the underlying client keeps working.
"""
def __init__(self, inner):
self._inner = inner
def __getattr__(self, name):
# Dunders and private attributes pass through — we're guarding
# against accidental SOAP dispatch, not introspection.
if name.startswith("_"):
return getattr(self._inner, name)
if name not in _ALLOWED_AXL_METHODS:
raise ReadOnlyViolation(
f"AXL method {name!r} is not in the read-only allowlist. "
f"Allowed: {sorted(_ALLOWED_AXL_METHODS)}. "
f"This server is structurally read-only — see "
f"docs/explanation/read-only-by-structure/ for context."
)
return getattr(self._inner, name)
class AxlClient:
"""Lazy-loaded zeep client for CUCM AXL.
@ -112,13 +67,6 @@ class AxlClient:
"config_error": self._config_error, # permanent until restart
"last_error": self._last_error,
"retry_config": self._retry_config,
# Operators should be able to verify the read-only proxy is
# active without reading the source. True means writes will
# be refused at dispatch time even if some future tool tries.
"read_only_proxy": self._service is not None and isinstance(
self._service, _ReadOnlyServiceProxy
),
"allowed_axl_methods": sorted(_ALLOWED_AXL_METHODS),
}
def _ensure_connected(self) -> None:
@ -205,15 +153,10 @@ class AxlClient:
)
# AXL endpoint is the AXL_URL itself; override the WSDL's default
# service location which usually points at a placeholder host.
# Wrap the resulting service in a read-only allowlist proxy —
# any SOAP method not in _ALLOWED_AXL_METHODS will raise
# ReadOnlyViolation at attribute lookup time, before zeep
# serializes a SOAP envelope.
raw_service = self._client.create_service(
self._service = self._client.create_service(
"{http://www.cisco.com/AXLAPIService/}AXLAPIBinding",
url,
)
self._service = _ReadOnlyServiceProxy(raw_service)
import time as _time
self._connected_at = _time.monotonic()
self._last_error = None # operational state is now clean

View File

@ -36,35 +36,10 @@ from requests.adapters import HTTPAdapter
from requests.auth import HTTPBasicAuth
from urllib3.util.retry import Retry
from .client import ReadOnlyViolation
# RisPort path on the CUCM publisher
_RIS_PATH = "/realtimeservice2/services/RISService70"
# RisPort70 operations this server is permitted to invoke. Parallels the
# AXL allowlist in client.py — adding a new operation here is a deliberate
# decision that widens the read-only surface.
_ALLOWED_RISPORT_OPERATIONS = frozenset({
"selectCmDevice",
})
def _check_operation_allowed(operation: str) -> None:
"""Raise ReadOnlyViolation if `operation` is outside the allowlist.
Called at the top of each envelope builder. RisPort70 doesn't go
through zeep envelopes are hand-rolled so the proxy pattern
used for AXL doesn't apply directly. This function is the equivalent
chokepoint: any envelope builder that reaches the network must first
check its operation name against the allowlist.
"""
if operation not in _ALLOWED_RISPORT_OPERATIONS:
raise ReadOnlyViolation(
f"RisPort70 operation {operation!r} is not in the read-only "
f"allowlist. Allowed: {sorted(_ALLOWED_RISPORT_OPERATIONS)}."
)
# SOAP namespaces. These match Cisco's published values for RisPort70.
_NS_SOAPENV = "http://schemas.xmlsoap.org/soap/envelope/"
_NS_RIS = "http://schemas.cisco.com/ast/soap"
@ -102,7 +77,6 @@ def _build_select_envelope(
are rejected. We err on the side of always including every field
with sensible defaults.
"""
_check_operation_allowed("selectCmDevice")
items = select_items if select_items else ["*"]
items_xml = "".join(
f"<soap:item><soap:Item>{_escape_xml(i)}</soap:Item></soap:item>"

View File

@ -123,13 +123,6 @@ def cache_stats() -> dict:
def cache_clear(method_pattern: str | None = None) -> dict:
"""Clear cache entries for the current cluster.
**Local-only:** mutates the SQLite response cache at
~/.cache/mcaxl/responses/axl_responses.sqlite. Does NOT touch CUCM
the cluster is unaware this tool ran. The next tool call will re-fetch
from AXL. This is the only mcaxl tool that mutates any state, and the
state it mutates is local cache only; see /explanation/read-only-by-structure/
for what "read-only" means in this server's context.
Args:
method_pattern: Optional method-name pattern (% wildcards). If omitted,
clears the entire cache. Use after a known config change to force

View File

@ -5,128 +5,79 @@ through a separate executeSQLUpdate method that we never expose). This client-
side check exists to:
1. Give the LLM a fast, clear error before a SOAP round-trip.
2. Make read-only intent visible at the boundary, not implicit.
Uses sqlparse for proper token-level analysis. Compared to the prior
regex-based implementation, this gives us:
- Explicit multi-statement detection via `len(sqlparse.parse(query))`
rather than hoping a forbidden keyword shows up in the second statement
(the regex version *accepted* `SELECT 1; SELECT 2` because neither
SELECT is forbidden).
- Proper string/comment boundary handling `'Smith -- old line'` is
a single Literal.String token, so the `--` inside it is never confused
for a comment marker, and `'log: DROP detected'` doesn't trip the
forbidden-keyword check because the entire literal is one token.
- Word-boundary correctness on identifiers `inserted_at` tokenizes as
`Token.Name`, never as `Token.Keyword.DML`, so columns named after
keyword fragments don't get false-positive blocked.
What sqlparse does NOT do (and we accept the conservative behavior):
- Disambiguate `SELECT delete FROM device` (where `delete` is meant as a
column name, not a keyword). sqlparse is a lexer, not a parser, and
classifies `delete` as `Token.Keyword.DML` regardless of context. The
check stays conservative CUCM's data dictionary doesn't use SQL
keywords as column names anyway.
"""
from __future__ import annotations
import sqlparse
from sqlparse.tokens import Comment, Keyword
import re
_COMMENT_BLOCK = re.compile(r"/\*.*?\*/", re.DOTALL)
_COMMENT_LINE = re.compile(r"--[^\n]*")
# Match Informix string literals: single-quoted, with '' as escaped quote.
_STRING_LITERAL = re.compile(r"'(?:''|[^'])*'", re.DOTALL)
_FORBIDDEN = {
"INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER",
"TRUNCATE", "GRANT", "REVOKE", "MERGE", "REPLACE", "RENAME",
"EXEC", "EXECUTE", "CALL", "ATTACH", "DETACH",
}
_WORD_RE = re.compile(r"\b([A-Za-z_]+)\b")
class SqlValidationError(ValueError):
"""Raised when a query is not a safe read-only SELECT/WITH."""
def _is_skippable(tok) -> bool:
"""Whitespace and comments don't count toward token-position checks."""
if tok.is_whitespace:
return True
if tok.ttype is not None and tok.ttype in Comment:
return True
return False
def _is_keyword(tok) -> bool:
return tok.ttype is not None and tok.ttype in Keyword
def _is_substantive(stmt) -> bool:
"""A statement contributes to the multi-statement count only if it has
at least one non-whitespace, non-comment leaf token. This way trailing
semicolons and pure-comment "statements" don't inflate the count."""
return any(not _is_skippable(t) for t in stmt.flatten())
def validate_select(query: str) -> str:
"""Return the cleaned query, or raise SqlValidationError.
Accepts SELECT and WITH (CTEs that ultimately return SELECT). Rejects
multi-statement input, and any query containing forbidden SQL keyword
anything else, and any query containing forbidden keywords as standalone
tokens *outside* string literals and comments.
Hamilton review CRITICAL #1 (preserved across the sqlparse rewrite):
the output we return MUST preserve the input byte-for-byte (modulo
trailing semicolon and outer whitespace). sqlparse is consulted for
analysis only the cleaned query going to AXL is exactly what the
caller intended. A bug here would mean the LLM's `description LIKE
'%-- old%'` query gets a different shape than it asked for.
Hamilton review CRITICAL #1: the output we return MUST preserve the input
byte-for-byte (modulo trailing semicolon and outer whitespace). Earlier
versions ran a non-literal-aware comment strip on the output, which would
silently eat `--` and `/* */` markers that legitimately appeared inside
string literals like `WHERE description = 'Smith -- old line'`. The query
going to AXL must be exactly what the caller intended comment stripping
is an analysis-only operation, never a mutation of the wire query.
"""
if not query or not query.strip():
raise SqlValidationError("Query is empty.")
# The query we'll send to AXL: original input, with only outer whitespace
# and a single trailing semicolon trimmed. NO mutation of literals or
# in-string comment markers.
cleaned = query.strip().rstrip(";").strip()
if not cleaned:
raise SqlValidationError("Query is empty after trimming.")
parsed = sqlparse.parse(cleaned)
statements = [s for s in parsed if _is_substantive(s)]
# Analysis-only copy: strip string literals AND comments (in either order
# is safe here, since each strip uses its own regex on a non-AXL-bound
# buffer). Order chosen: literals first, then comments, so that any
# comment markers genuinely outside literals can be detected.
for_analysis = _STRING_LITERAL.sub(" ", cleaned)
for_analysis = _COMMENT_BLOCK.sub(" ", for_analysis)
for_analysis = _COMMENT_LINE.sub(" ", for_analysis)
if not statements:
if not for_analysis.strip():
raise SqlValidationError("Query is empty after stripping comments.")
if len(statements) > 1:
raise SqlValidationError(
f"Multiple statements detected ({len(statements)}). "
f"Only single SELECT or WITH queries are permitted."
)
stmt = statements[0]
first = next((t for t in stmt.flatten() if not _is_skippable(t)), None)
if first is None:
upper_tokens = [t.upper() for t in _WORD_RE.findall(for_analysis)]
if not upper_tokens:
raise SqlValidationError("Query contains no SQL keywords.")
if not _is_keyword(first):
first = upper_tokens[0]
if first not in {"SELECT", "WITH"}:
raise SqlValidationError(
f"Query must start with SELECT or WITH; first token is {first.value!r}."
f"Only SELECT and WITH are permitted; query starts with {first!r}."
)
first_upper = first.value.upper()
if first_upper not in {"SELECT", "WITH"}:
forbidden_hits = sorted(set(upper_tokens) & _FORBIDDEN)
if forbidden_hits:
raise SqlValidationError(
f"Only SELECT and WITH are permitted; query starts with {first_upper!r}."
)
hits: set[str] = set()
for tok in stmt.flatten():
if not _is_keyword(tok):
continue
upper = tok.value.upper()
if upper in _FORBIDDEN:
hits.add(upper)
if hits:
raise SqlValidationError(
f"Forbidden keyword(s) present: {', '.join(sorted(hits))}. "
f"Forbidden keyword(s) present: {', '.join(forbidden_hits)}. "
f"This server is read-only."
)

View File

@ -1,88 +0,0 @@
"""Tests for _ReadOnlyServiceProxy: defense-in-depth allowlist for AXL methods.
The proxy wraps the zeep service object so any SOAP method outside
{getCCMVersion, executeSQLQuery} raises ReadOnlyViolation at attribute
lookup, before zeep serializes an envelope. This is a guard against future
contributors accidentally calling write methods like addRoutePartition().
"""
from unittest.mock import MagicMock
import pytest
from mcaxl.client import (
ReadOnlyViolation,
_ALLOWED_AXL_METHODS,
_ReadOnlyServiceProxy,
)
class TestAllowlistEnforcement:
def test_allowed_method_dispatches_through(self):
# Both methods we currently use must pass through the proxy.
inner = MagicMock()
inner.getCCMVersion.return_value = {"version": "15.0(1)"}
inner.executeSQLQuery.return_value = {"rows": []}
proxy = _ReadOnlyServiceProxy(inner)
assert proxy.getCCMVersion() == {"version": "15.0(1)"}
assert proxy.executeSQLQuery(sql="SELECT 1") == {"rows": []}
inner.getCCMVersion.assert_called_once()
inner.executeSQLQuery.assert_called_once_with(sql="SELECT 1")
@pytest.mark.parametrize(
"method_name",
[
"addRoutePartition",
"updatePhone",
"removeUser",
"applyPhone",
"resetPhone",
"restartPhone",
"executeSQLUpdate", # the AXL write counterpart
"doDeviceLogin",
"wipePhone",
],
)
def test_disallowed_method_raises(self, method_name):
inner = MagicMock()
proxy = _ReadOnlyServiceProxy(inner)
with pytest.raises(ReadOnlyViolation, match=method_name):
getattr(proxy, method_name)
# The inner service must NOT have been touched at all — refusal
# happens before any SOAP serialization.
assert not getattr(inner, method_name).called
def test_allowlist_is_exactly_what_we_advertise(self):
# If this set ever grows, that's a deliberate decision and the
# test should be updated alongside the change. Catching unintended
# widening of the read-only surface is the point.
assert _ALLOWED_AXL_METHODS == frozenset(
{"getCCMVersion", "executeSQLQuery"}
)
class TestAttributePassthrough:
def test_dunder_attributes_pass_through(self):
# Zeep introspects services via dunder attributes (__class__,
# __dict__, etc.). The proxy must not break those.
inner = MagicMock()
inner.__class__ = MagicMock
proxy = _ReadOnlyServiceProxy(inner)
# Reading the class doesn't raise
_ = proxy.__class__
def test_underscore_prefixed_attributes_pass_through(self):
# zeep service internals like `_binding_options`, `_operations`
# are accessed by name. We don't want to gate those because they
# don't dispatch SOAP — they read metadata.
inner = MagicMock()
inner._binding_options = {"address": "https://cucm/axl/"}
inner._operations = ["getCCMVersion", "executeSQLQuery", "addPhone"]
proxy = _ReadOnlyServiceProxy(inner)
assert proxy._binding_options == {"address": "https://cucm/axl/"}
assert "addPhone" in proxy._operations # introspection, not dispatch

View File

@ -9,13 +9,10 @@ import xml.etree.ElementTree as ET
import pytest
from mcaxl.client import ReadOnlyViolation
from mcaxl.risport import (
DEVICE_STATUS_VALUES,
_ALLOWED_RISPORT_OPERATIONS,
RisPortClient,
_build_select_envelope,
_check_operation_allowed,
_escape_xml,
_parse_response,
)
@ -223,34 +220,3 @@ class TestStatusValidation:
# but the validation should run BEFORE that on bad input.
with pytest.raises(ValueError, match="status must be"):
client.select_cm_device(status="not-a-real-status")
class TestReadOnlyAllowlist:
"""The RisPort envelope builders all gate on _check_operation_allowed.
This is the equivalent of the AXL service proxy the chokepoint that
blocks any future write-shaped operation from being assembled.
"""
def test_selectCmDevice_is_allowed(self):
# No raise — selectCmDevice is in the allowlist
_check_operation_allowed("selectCmDevice")
@pytest.mark.parametrize(
"operation",
[
"addCmDevice",
"removeCmDevice",
"resetDevice",
"restartDevice",
"applyCmDevice",
"executeSQLUpdate", # leakage from the AXL surface
],
)
def test_disallowed_operation_raises_in_check(self, operation):
with pytest.raises(ReadOnlyViolation, match=operation):
_check_operation_allowed(operation)
def test_allowlist_is_exactly_what_we_advertise(self):
# As with the AXL allowlist, widening this set is a deliberate
# decision that should be matched by an update to this test.
assert _ALLOWED_RISPORT_OPERATIONS == frozenset({"selectCmDevice"})

View File

@ -60,12 +60,10 @@ class TestRejected:
validate_select("DROP TABLE device")
def test_select_with_embedded_drop_rejected(self):
# Belt-and-suspenders: even if "DROP" appears alongside a SELECT, the
# query is rejected. Under the sqlparse-based validator the rejection
# reason is now "Multiple statements detected" (caught earlier than
# forbidden-keyword scan); under the prior regex validator it was
# "DROP". Either is correct — we just want this query refused.
with pytest.raises(SqlValidationError, match="DROP|Multiple"):
# Belt-and-suspenders: even if "DROP" appears in a quoted string-ish
# position our keyword filter still catches it. AXL would also reject
# this, but failing fast on the client saves a SOAP round-trip.
with pytest.raises(SqlValidationError, match="DROP"):
validate_select("SELECT 1 FROM device; DROP TABLE device")
def test_truncate_rejected(self):
@ -109,10 +107,7 @@ class TestStringLiterals:
assert validate_select(q)
def test_actual_drop_outside_literal_still_blocked(self):
# See test_select_with_embedded_drop_rejected — rejection reason
# is now multi-statement detection (sqlparse catches it earlier),
# but the query still fails closed.
with pytest.raises(SqlValidationError, match="DROP|Multiple"):
with pytest.raises(SqlValidationError, match="DROP"):
validate_select("SELECT 1 FROM device; DROP TABLE backup")
def test_escaped_quote_in_literal(self):
@ -180,54 +175,3 @@ class TestLiteralPreservedInOutput:
q = "SELECT 1 FROM numplan WHERE description = 'log: DROP detected'"
result = validate_select(q)
assert "'log: DROP detected'" in result
class TestSqlparseSpecific:
"""Cases that exercise wins of the sqlparse-based validator over the
earlier regex implementation. Each test names the property being checked.
"""
def test_multi_statement_explicit_reject(self):
# Two SELECTs, no forbidden keywords. The regex validator accepted
# this because neither SELECT is in _FORBIDDEN. sqlparse catches it
# via the explicit statement-count check.
with pytest.raises(SqlValidationError, match="Multiple"):
validate_select("SELECT 1; SELECT 2")
def test_multi_statement_with_intervening_comments_reject(self):
# Comments between statements don't disguise the multi-statement
# nature — sqlparse still parses 2 statements.
with pytest.raises(SqlValidationError, match="Multiple"):
validate_select("SELECT 1 /* break */; /* and again */ SELECT 2")
def test_keyword_substring_in_identifier_passes(self):
# `inserted_at`, `update_status`, `dropped_call_count` — all
# legitimate column names containing forbidden-keyword substrings.
# sqlparse classifies them as Token.Name (single tokens), so the
# forbidden scan correctly ignores them.
for col in ("inserted_at", "update_status", "dropped_call_count"):
q = f"SELECT {col} FROM device"
assert validate_select(q) == q, f"column {col!r} should pass"
def test_forbidden_keyword_inside_cte_rejected(self):
# A CTE that internally does a DELETE must still be caught — sqlparse
# walks into nested groups, so the DELETE keyword token is found
# even though it's inside `WITH x AS (...)`.
with pytest.raises(SqlValidationError, match="DELETE"):
validate_select("WITH x AS (DELETE FROM y) SELECT * FROM x")
def test_drop_inside_block_comment_passes(self):
# The DROP is inside a /* ... */ comment. sqlparse classifies the
# entire comment as Token.Comment, so its content never reaches the
# forbidden-keyword scan.
q = "SELECT 1 /* ; DROP TABLE foo */ FROM device"
assert validate_select(q) == q
def test_nested_cte_all_clean_passes(self):
# Multiple CTEs chained, all SELECT-only — must accept.
q = (
"WITH a AS (SELECT 1 AS n FROM systables), "
"b AS (SELECT n FROM a WHERE n > 0) "
"SELECT * FROM b"
)
assert validate_select(q) == q

13
uv.lock generated
View File

@ -793,14 +793,13 @@ wheels = [
[[package]]
name = "mcaxl"
version = "2026.4.27.1"
version = "2026.4.27"
source = { editable = "." }
dependencies = [
{ name = "fastmcp" },
{ name = "numpy" },
{ name = "platformdirs" },
{ name = "python-dotenv" },
{ name = "sqlparse" },
{ name = "zeep" },
]
@ -818,7 +817,6 @@ requires-dist = [
{ name = "pytest", marker = "extra == 'test'", specifier = ">=8.0" },
{ name = "pytest-asyncio", marker = "extra == 'test'", specifier = ">=0.24" },
{ name = "python-dotenv", specifier = ">=1.0" },
{ name = "sqlparse", specifier = ">=0.5" },
{ name = "zeep", specifier = ">=4.3" },
]
provides-extras = ["test"]
@ -1547,15 +1545,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/46/f5af3402b579fd5e11573ce652019a67074317e18c1935cc0b4ba9b35552/secretstorage-3.5.0-py3-none-any.whl", hash = "sha256:0ce65888c0725fcb2c5bc0fdb8e5438eece02c523557ea40ce0703c266248137", size = 15554, upload-time = "2025-11-23T19:02:51.545Z" },
]
[[package]]
name = "sqlparse"
version = "0.5.5"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/90/76/437d71068094df0726366574cf3432a4ed754217b436eb7429415cf2d480/sqlparse-0.5.5.tar.gz", hash = "sha256:e20d4a9b0b8585fdf63b10d30066c7c94c5d7a7ec47c889a2d83a3caa93ff28e", size = 120815, upload-time = "2025-12-19T07:17:45.073Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/49/4b/359f28a903c13438ef59ebeee215fb25da53066db67b305c125f1c6d2a25/sqlparse-0.5.5-py3-none-any.whl", hash = "sha256:12a08b3bf3eec877c519589833aed092e2444e68240a3577e8e26148acc7b1ba", size = 46138, upload-time = "2025-12-19T07:17:46.573Z" },
]
[[package]]
name = "sse-starlette"
version = "3.3.4"