sql_validator: swap regex for sqlparse tokenization

The regex-based validator worked for everything tested, but had a
class of structural blindspot: it didn't actually know what a token
was, so it accepted `SELECT 1; SELECT 2` (no forbidden keyword in
either statement) and relied entirely on the keyword scan catching
write verbs. With sqlparse we get:

- Explicit multi-statement detection via `len(sqlparse.parse(query))`
  — `SELECT 1; SELECT 2` is now refused with a clear "Multiple
  statements detected" message.
- Proper string/comment boundary handling — `'log: DROP detected'`
  is one Literal.String token; the DROP inside it never reaches the
  forbidden-keyword scan. `inserted_at` is one Name token; INSERT
  isn't matched as a substring.
- Same conservative behavior for keywords-as-identifiers (sqlparse
  is a lexer, not a parser, so `SELECT delete FROM device` is still
  refused — CUCM's data dictionary doesn't use SQL keywords as
  column names anyway).

Hamilton review CRITICAL #1 preserved: the cleaned query returned to
the caller is still byte-for-byte the input (modulo trailing ; and
outer whitespace). sqlparse is consulted for analysis only.

Tests: +6 sqlparse-specific cases in TestSqlparseSpecific covering
multi-statement, comment-disguised injection, keyword-substring
identifiers, and CTE walks. 2 existing tests broadened from
match="DROP" to match="DROP|Multiple" — same query refused, the
diagnosis just got more accurate (multi-statement caught earlier
than forbidden-keyword scan).

36/36 validator tests pass.
This commit is contained in:
Ryan Malloy 2026-04-29 06:38:21 -06:00
parent d597bd3569
commit 59f9df5b3b
4 changed files with 163 additions and 42 deletions

View File

@ -31,6 +31,11 @@ dependencies = [
"platformdirs>=4.9", "platformdirs>=4.9",
"numpy>=1.26", "numpy>=1.26",
"python-dotenv>=1.0", "python-dotenv>=1.0",
# SQL tokenizer for the executeSQLQuery validator. Using a real lexer
# (instead of regex keyword scanning) gives us proper string/comment
# boundary handling and explicit multi-statement detection. See
# src/mcaxl/sql_validator.py for the read-only enforcement layer.
"sqlparse>=0.5",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View File

@ -5,79 +5,128 @@ through a separate executeSQLUpdate method that we never expose). This client-
side check exists to: side check exists to:
1. Give the LLM a fast, clear error before a SOAP round-trip. 1. Give the LLM a fast, clear error before a SOAP round-trip.
2. Make read-only intent visible at the boundary, not implicit. 2. Make read-only intent visible at the boundary, not implicit.
Uses sqlparse for proper token-level analysis. Compared to the prior
regex-based implementation, this gives us:
- Explicit multi-statement detection via `len(sqlparse.parse(query))`
rather than hoping a forbidden keyword shows up in the second statement
(the regex version *accepted* `SELECT 1; SELECT 2` because neither
SELECT is forbidden).
- Proper string/comment boundary handling `'Smith -- old line'` is
a single Literal.String token, so the `--` inside it is never confused
for a comment marker, and `'log: DROP detected'` doesn't trip the
forbidden-keyword check because the entire literal is one token.
- Word-boundary correctness on identifiers `inserted_at` tokenizes as
`Token.Name`, never as `Token.Keyword.DML`, so columns named after
keyword fragments don't get false-positive blocked.
What sqlparse does NOT do (and we accept the conservative behavior):
- Disambiguate `SELECT delete FROM device` (where `delete` is meant as a
column name, not a keyword). sqlparse is a lexer, not a parser, and
classifies `delete` as `Token.Keyword.DML` regardless of context. The
check stays conservative CUCM's data dictionary doesn't use SQL
keywords as column names anyway.
""" """
from __future__ import annotations from __future__ import annotations
import re import sqlparse
from sqlparse.tokens import Comment, Keyword
_COMMENT_BLOCK = re.compile(r"/\*.*?\*/", re.DOTALL)
_COMMENT_LINE = re.compile(r"--[^\n]*")
# Match Informix string literals: single-quoted, with '' as escaped quote.
_STRING_LITERAL = re.compile(r"'(?:''|[^'])*'", re.DOTALL)
_FORBIDDEN = { _FORBIDDEN = {
"INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER", "INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER",
"TRUNCATE", "GRANT", "REVOKE", "MERGE", "REPLACE", "RENAME", "TRUNCATE", "GRANT", "REVOKE", "MERGE", "REPLACE", "RENAME",
"EXEC", "EXECUTE", "CALL", "ATTACH", "DETACH", "EXEC", "EXECUTE", "CALL", "ATTACH", "DETACH",
} }
_WORD_RE = re.compile(r"\b([A-Za-z_]+)\b")
class SqlValidationError(ValueError): class SqlValidationError(ValueError):
"""Raised when a query is not a safe read-only SELECT/WITH.""" """Raised when a query is not a safe read-only SELECT/WITH."""
def _is_skippable(tok) -> bool:
"""Whitespace and comments don't count toward token-position checks."""
if tok.is_whitespace:
return True
if tok.ttype is not None and tok.ttype in Comment:
return True
return False
def _is_keyword(tok) -> bool:
return tok.ttype is not None and tok.ttype in Keyword
def _is_substantive(stmt) -> bool:
"""A statement contributes to the multi-statement count only if it has
at least one non-whitespace, non-comment leaf token. This way trailing
semicolons and pure-comment "statements" don't inflate the count."""
return any(not _is_skippable(t) for t in stmt.flatten())
def validate_select(query: str) -> str: def validate_select(query: str) -> str:
"""Return the cleaned query, or raise SqlValidationError. """Return the cleaned query, or raise SqlValidationError.
Accepts SELECT and WITH (CTEs that ultimately return SELECT). Rejects Accepts SELECT and WITH (CTEs that ultimately return SELECT). Rejects
anything else, and any query containing forbidden keywords as standalone multi-statement input, and any query containing forbidden SQL keyword
tokens *outside* string literals and comments. tokens *outside* string literals and comments.
Hamilton review CRITICAL #1: the output we return MUST preserve the input Hamilton review CRITICAL #1 (preserved across the sqlparse rewrite):
byte-for-byte (modulo trailing semicolon and outer whitespace). Earlier the output we return MUST preserve the input byte-for-byte (modulo
versions ran a non-literal-aware comment strip on the output, which would trailing semicolon and outer whitespace). sqlparse is consulted for
silently eat `--` and `/* */` markers that legitimately appeared inside analysis only the cleaned query going to AXL is exactly what the
string literals like `WHERE description = 'Smith -- old line'`. The query caller intended. A bug here would mean the LLM's `description LIKE
going to AXL must be exactly what the caller intended comment stripping '%-- old%'` query gets a different shape than it asked for.
is an analysis-only operation, never a mutation of the wire query.
""" """
if not query or not query.strip(): if not query or not query.strip():
raise SqlValidationError("Query is empty.") raise SqlValidationError("Query is empty.")
# The query we'll send to AXL: original input, with only outer whitespace
# and a single trailing semicolon trimmed. NO mutation of literals or
# in-string comment markers.
cleaned = query.strip().rstrip(";").strip() cleaned = query.strip().rstrip(";").strip()
if not cleaned: if not cleaned:
raise SqlValidationError("Query is empty after trimming.") raise SqlValidationError("Query is empty after trimming.")
# Analysis-only copy: strip string literals AND comments (in either order parsed = sqlparse.parse(cleaned)
# is safe here, since each strip uses its own regex on a non-AXL-bound statements = [s for s in parsed if _is_substantive(s)]
# buffer). Order chosen: literals first, then comments, so that any
# comment markers genuinely outside literals can be detected.
for_analysis = _STRING_LITERAL.sub(" ", cleaned)
for_analysis = _COMMENT_BLOCK.sub(" ", for_analysis)
for_analysis = _COMMENT_LINE.sub(" ", for_analysis)
if not for_analysis.strip(): if not statements:
raise SqlValidationError("Query is empty after stripping comments.") raise SqlValidationError("Query is empty after stripping comments.")
if len(statements) > 1:
upper_tokens = [t.upper() for t in _WORD_RE.findall(for_analysis)]
if not upper_tokens:
raise SqlValidationError("Query contains no SQL keywords.")
first = upper_tokens[0]
if first not in {"SELECT", "WITH"}:
raise SqlValidationError( raise SqlValidationError(
f"Only SELECT and WITH are permitted; query starts with {first!r}." f"Multiple statements detected ({len(statements)}). "
f"Only single SELECT or WITH queries are permitted."
) )
forbidden_hits = sorted(set(upper_tokens) & _FORBIDDEN) stmt = statements[0]
if forbidden_hits:
first = next((t for t in stmt.flatten() if not _is_skippable(t)), None)
if first is None:
raise SqlValidationError("Query contains no SQL keywords.")
if not _is_keyword(first):
raise SqlValidationError( raise SqlValidationError(
f"Forbidden keyword(s) present: {', '.join(forbidden_hits)}. " f"Query must start with SELECT or WITH; first token is {first.value!r}."
)
first_upper = first.value.upper()
if first_upper not in {"SELECT", "WITH"}:
raise SqlValidationError(
f"Only SELECT and WITH are permitted; query starts with {first_upper!r}."
)
hits: set[str] = set()
for tok in stmt.flatten():
if not _is_keyword(tok):
continue
upper = tok.value.upper()
if upper in _FORBIDDEN:
hits.add(upper)
if hits:
raise SqlValidationError(
f"Forbidden keyword(s) present: {', '.join(sorted(hits))}. "
f"This server is read-only." f"This server is read-only."
) )

View File

@ -60,10 +60,12 @@ class TestRejected:
validate_select("DROP TABLE device") validate_select("DROP TABLE device")
def test_select_with_embedded_drop_rejected(self): def test_select_with_embedded_drop_rejected(self):
# Belt-and-suspenders: even if "DROP" appears in a quoted string-ish # Belt-and-suspenders: even if "DROP" appears alongside a SELECT, the
# position our keyword filter still catches it. AXL would also reject # query is rejected. Under the sqlparse-based validator the rejection
# this, but failing fast on the client saves a SOAP round-trip. # reason is now "Multiple statements detected" (caught earlier than
with pytest.raises(SqlValidationError, match="DROP"): # forbidden-keyword scan); under the prior regex validator it was
# "DROP". Either is correct — we just want this query refused.
with pytest.raises(SqlValidationError, match="DROP|Multiple"):
validate_select("SELECT 1 FROM device; DROP TABLE device") validate_select("SELECT 1 FROM device; DROP TABLE device")
def test_truncate_rejected(self): def test_truncate_rejected(self):
@ -107,7 +109,10 @@ class TestStringLiterals:
assert validate_select(q) assert validate_select(q)
def test_actual_drop_outside_literal_still_blocked(self): def test_actual_drop_outside_literal_still_blocked(self):
with pytest.raises(SqlValidationError, match="DROP"): # See test_select_with_embedded_drop_rejected — rejection reason
# is now multi-statement detection (sqlparse catches it earlier),
# but the query still fails closed.
with pytest.raises(SqlValidationError, match="DROP|Multiple"):
validate_select("SELECT 1 FROM device; DROP TABLE backup") validate_select("SELECT 1 FROM device; DROP TABLE backup")
def test_escaped_quote_in_literal(self): def test_escaped_quote_in_literal(self):
@ -175,3 +180,54 @@ class TestLiteralPreservedInOutput:
q = "SELECT 1 FROM numplan WHERE description = 'log: DROP detected'" q = "SELECT 1 FROM numplan WHERE description = 'log: DROP detected'"
result = validate_select(q) result = validate_select(q)
assert "'log: DROP detected'" in result assert "'log: DROP detected'" in result
class TestSqlparseSpecific:
"""Cases that exercise wins of the sqlparse-based validator over the
earlier regex implementation. Each test names the property being checked.
"""
def test_multi_statement_explicit_reject(self):
# Two SELECTs, no forbidden keywords. The regex validator accepted
# this because neither SELECT is in _FORBIDDEN. sqlparse catches it
# via the explicit statement-count check.
with pytest.raises(SqlValidationError, match="Multiple"):
validate_select("SELECT 1; SELECT 2")
def test_multi_statement_with_intervening_comments_reject(self):
# Comments between statements don't disguise the multi-statement
# nature — sqlparse still parses 2 statements.
with pytest.raises(SqlValidationError, match="Multiple"):
validate_select("SELECT 1 /* break */; /* and again */ SELECT 2")
def test_keyword_substring_in_identifier_passes(self):
# `inserted_at`, `update_status`, `dropped_call_count` — all
# legitimate column names containing forbidden-keyword substrings.
# sqlparse classifies them as Token.Name (single tokens), so the
# forbidden scan correctly ignores them.
for col in ("inserted_at", "update_status", "dropped_call_count"):
q = f"SELECT {col} FROM device"
assert validate_select(q) == q, f"column {col!r} should pass"
def test_forbidden_keyword_inside_cte_rejected(self):
# A CTE that internally does a DELETE must still be caught — sqlparse
# walks into nested groups, so the DELETE keyword token is found
# even though it's inside `WITH x AS (...)`.
with pytest.raises(SqlValidationError, match="DELETE"):
validate_select("WITH x AS (DELETE FROM y) SELECT * FROM x")
def test_drop_inside_block_comment_passes(self):
# The DROP is inside a /* ... */ comment. sqlparse classifies the
# entire comment as Token.Comment, so its content never reaches the
# forbidden-keyword scan.
q = "SELECT 1 /* ; DROP TABLE foo */ FROM device"
assert validate_select(q) == q
def test_nested_cte_all_clean_passes(self):
# Multiple CTEs chained, all SELECT-only — must accept.
q = (
"WITH a AS (SELECT 1 AS n FROM systables), "
"b AS (SELECT n FROM a WHERE n > 0) "
"SELECT * FROM b"
)
assert validate_select(q) == q

13
uv.lock generated
View File

@ -793,13 +793,14 @@ wheels = [
[[package]] [[package]]
name = "mcaxl" name = "mcaxl"
version = "2026.4.27" version = "2026.4.27.1"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "fastmcp" }, { name = "fastmcp" },
{ name = "numpy" }, { name = "numpy" },
{ name = "platformdirs" }, { name = "platformdirs" },
{ name = "python-dotenv" }, { name = "python-dotenv" },
{ name = "sqlparse" },
{ name = "zeep" }, { name = "zeep" },
] ]
@ -817,6 +818,7 @@ requires-dist = [
{ name = "pytest", marker = "extra == 'test'", specifier = ">=8.0" }, { name = "pytest", marker = "extra == 'test'", specifier = ">=8.0" },
{ name = "pytest-asyncio", marker = "extra == 'test'", specifier = ">=0.24" }, { name = "pytest-asyncio", marker = "extra == 'test'", specifier = ">=0.24" },
{ name = "python-dotenv", specifier = ">=1.0" }, { name = "python-dotenv", specifier = ">=1.0" },
{ name = "sqlparse", specifier = ">=0.5" },
{ name = "zeep", specifier = ">=4.3" }, { name = "zeep", specifier = ">=4.3" },
] ]
provides-extras = ["test"] provides-extras = ["test"]
@ -1545,6 +1547,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/46/f5af3402b579fd5e11573ce652019a67074317e18c1935cc0b4ba9b35552/secretstorage-3.5.0-py3-none-any.whl", hash = "sha256:0ce65888c0725fcb2c5bc0fdb8e5438eece02c523557ea40ce0703c266248137", size = 15554, upload-time = "2025-11-23T19:02:51.545Z" }, { url = "https://files.pythonhosted.org/packages/b7/46/f5af3402b579fd5e11573ce652019a67074317e18c1935cc0b4ba9b35552/secretstorage-3.5.0-py3-none-any.whl", hash = "sha256:0ce65888c0725fcb2c5bc0fdb8e5438eece02c523557ea40ce0703c266248137", size = 15554, upload-time = "2025-11-23T19:02:51.545Z" },
] ]
[[package]]
name = "sqlparse"
version = "0.5.5"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/90/76/437d71068094df0726366574cf3432a4ed754217b436eb7429415cf2d480/sqlparse-0.5.5.tar.gz", hash = "sha256:e20d4a9b0b8585fdf63b10d30066c7c94c5d7a7ec47c889a2d83a3caa93ff28e", size = 120815, upload-time = "2025-12-19T07:17:45.073Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/49/4b/359f28a903c13438ef59ebeee215fb25da53066db67b305c125f1c6d2a25/sqlparse-0.5.5-py3-none-any.whl", hash = "sha256:12a08b3bf3eec877c519589833aed092e2444e68240a3577e8e26148acc7b1ba", size = 46138, upload-time = "2025-12-19T07:17:46.573Z" },
]
[[package]] [[package]]
name = "sse-starlette" name = "sse-starlette"
version = "3.3.4" version = "3.3.4"