From 59f9df5b3bb2d047e03c9e9054191a2066cd8c5f Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Wed, 29 Apr 2026 06:38:21 -0600 Subject: [PATCH] sql_validator: swap regex for sqlparse tokenization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The regex-based validator worked for everything tested, but had a class of structural blindspot: it didn't actually know what a token was, so it accepted `SELECT 1; SELECT 2` (no forbidden keyword in either statement) and relied entirely on the keyword scan catching write verbs. With sqlparse we get: - Explicit multi-statement detection via `len(sqlparse.parse(query))` — `SELECT 1; SELECT 2` is now refused with a clear "Multiple statements detected" message. - Proper string/comment boundary handling — `'log: DROP detected'` is one Literal.String token; the DROP inside it never reaches the forbidden-keyword scan. `inserted_at` is one Name token; INSERT isn't matched as a substring. - Same conservative behavior for keywords-as-identifiers (sqlparse is a lexer, not a parser, so `SELECT delete FROM device` is still refused — CUCM's data dictionary doesn't use SQL keywords as column names anyway). Hamilton review CRITICAL #1 preserved: the cleaned query returned to the caller is still byte-for-byte the input (modulo trailing ; and outer whitespace). sqlparse is consulted for analysis only. Tests: +6 sqlparse-specific cases in TestSqlparseSpecific covering multi-statement, comment-disguised injection, keyword-substring identifiers, and CTE walks. 2 existing tests broadened from match="DROP" to match="DROP|Multiple" — same query refused, the diagnosis just got more accurate (multi-statement caught earlier than forbidden-keyword scan). 36/36 validator tests pass. --- pyproject.toml | 5 ++ src/mcaxl/sql_validator.py | 121 +++++++++++++++++++++++++----------- tests/test_sql_validator.py | 66 ++++++++++++++++++-- uv.lock | 13 +++- 4 files changed, 163 insertions(+), 42 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index cc28080..a394a85 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,11 @@ dependencies = [ "platformdirs>=4.9", "numpy>=1.26", "python-dotenv>=1.0", + # SQL tokenizer for the executeSQLQuery validator. Using a real lexer + # (instead of regex keyword scanning) gives us proper string/comment + # boundary handling and explicit multi-statement detection. See + # src/mcaxl/sql_validator.py for the read-only enforcement layer. + "sqlparse>=0.5", ] [project.optional-dependencies] diff --git a/src/mcaxl/sql_validator.py b/src/mcaxl/sql_validator.py index 080d004..b80d301 100644 --- a/src/mcaxl/sql_validator.py +++ b/src/mcaxl/sql_validator.py @@ -5,79 +5,128 @@ through a separate executeSQLUpdate method that we never expose). This client- side check exists to: 1. Give the LLM a fast, clear error before a SOAP round-trip. 2. Make read-only intent visible at the boundary, not implicit. + +Uses sqlparse for proper token-level analysis. Compared to the prior +regex-based implementation, this gives us: + + - Explicit multi-statement detection via `len(sqlparse.parse(query))` + rather than hoping a forbidden keyword shows up in the second statement + (the regex version *accepted* `SELECT 1; SELECT 2` because neither + SELECT is forbidden). + - Proper string/comment boundary handling — `'Smith -- old line'` is + a single Literal.String token, so the `--` inside it is never confused + for a comment marker, and `'log: DROP detected'` doesn't trip the + forbidden-keyword check because the entire literal is one token. + - Word-boundary correctness on identifiers — `inserted_at` tokenizes as + `Token.Name`, never as `Token.Keyword.DML`, so columns named after + keyword fragments don't get false-positive blocked. + +What sqlparse does NOT do (and we accept the conservative behavior): + + - Disambiguate `SELECT delete FROM device` (where `delete` is meant as a + column name, not a keyword). sqlparse is a lexer, not a parser, and + classifies `delete` as `Token.Keyword.DML` regardless of context. The + check stays conservative — CUCM's data dictionary doesn't use SQL + keywords as column names anyway. """ from __future__ import annotations -import re +import sqlparse +from sqlparse.tokens import Comment, Keyword -_COMMENT_BLOCK = re.compile(r"/\*.*?\*/", re.DOTALL) -_COMMENT_LINE = re.compile(r"--[^\n]*") -# Match Informix string literals: single-quoted, with '' as escaped quote. -_STRING_LITERAL = re.compile(r"'(?:''|[^'])*'", re.DOTALL) _FORBIDDEN = { "INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER", "TRUNCATE", "GRANT", "REVOKE", "MERGE", "REPLACE", "RENAME", "EXEC", "EXECUTE", "CALL", "ATTACH", "DETACH", } -_WORD_RE = re.compile(r"\b([A-Za-z_]+)\b") class SqlValidationError(ValueError): """Raised when a query is not a safe read-only SELECT/WITH.""" +def _is_skippable(tok) -> bool: + """Whitespace and comments don't count toward token-position checks.""" + if tok.is_whitespace: + return True + if tok.ttype is not None and tok.ttype in Comment: + return True + return False + + +def _is_keyword(tok) -> bool: + return tok.ttype is not None and tok.ttype in Keyword + + +def _is_substantive(stmt) -> bool: + """A statement contributes to the multi-statement count only if it has + at least one non-whitespace, non-comment leaf token. This way trailing + semicolons and pure-comment "statements" don't inflate the count.""" + return any(not _is_skippable(t) for t in stmt.flatten()) + + def validate_select(query: str) -> str: """Return the cleaned query, or raise SqlValidationError. Accepts SELECT and WITH (CTEs that ultimately return SELECT). Rejects - anything else, and any query containing forbidden keywords as standalone + multi-statement input, and any query containing forbidden SQL keyword tokens *outside* string literals and comments. - Hamilton review CRITICAL #1: the output we return MUST preserve the input - byte-for-byte (modulo trailing semicolon and outer whitespace). Earlier - versions ran a non-literal-aware comment strip on the output, which would - silently eat `--` and `/* */` markers that legitimately appeared inside - string literals like `WHERE description = 'Smith -- old line'`. The query - going to AXL must be exactly what the caller intended — comment stripping - is an analysis-only operation, never a mutation of the wire query. + Hamilton review CRITICAL #1 (preserved across the sqlparse rewrite): + the output we return MUST preserve the input byte-for-byte (modulo + trailing semicolon and outer whitespace). sqlparse is consulted for + analysis only — the cleaned query going to AXL is exactly what the + caller intended. A bug here would mean the LLM's `description LIKE + '%-- old%'` query gets a different shape than it asked for. """ if not query or not query.strip(): raise SqlValidationError("Query is empty.") - # The query we'll send to AXL: original input, with only outer whitespace - # and a single trailing semicolon trimmed. NO mutation of literals or - # in-string comment markers. cleaned = query.strip().rstrip(";").strip() if not cleaned: raise SqlValidationError("Query is empty after trimming.") - # Analysis-only copy: strip string literals AND comments (in either order - # is safe here, since each strip uses its own regex on a non-AXL-bound - # buffer). Order chosen: literals first, then comments, so that any - # comment markers genuinely outside literals can be detected. - for_analysis = _STRING_LITERAL.sub(" ", cleaned) - for_analysis = _COMMENT_BLOCK.sub(" ", for_analysis) - for_analysis = _COMMENT_LINE.sub(" ", for_analysis) + parsed = sqlparse.parse(cleaned) + statements = [s for s in parsed if _is_substantive(s)] - if not for_analysis.strip(): + if not statements: raise SqlValidationError("Query is empty after stripping comments.") - - upper_tokens = [t.upper() for t in _WORD_RE.findall(for_analysis)] - if not upper_tokens: - raise SqlValidationError("Query contains no SQL keywords.") - - first = upper_tokens[0] - if first not in {"SELECT", "WITH"}: + if len(statements) > 1: raise SqlValidationError( - f"Only SELECT and WITH are permitted; query starts with {first!r}." + f"Multiple statements detected ({len(statements)}). " + f"Only single SELECT or WITH queries are permitted." ) - forbidden_hits = sorted(set(upper_tokens) & _FORBIDDEN) - if forbidden_hits: + stmt = statements[0] + + first = next((t for t in stmt.flatten() if not _is_skippable(t)), None) + if first is None: + raise SqlValidationError("Query contains no SQL keywords.") + + if not _is_keyword(first): raise SqlValidationError( - f"Forbidden keyword(s) present: {', '.join(forbidden_hits)}. " + f"Query must start with SELECT or WITH; first token is {first.value!r}." + ) + + first_upper = first.value.upper() + if first_upper not in {"SELECT", "WITH"}: + raise SqlValidationError( + f"Only SELECT and WITH are permitted; query starts with {first_upper!r}." + ) + + hits: set[str] = set() + for tok in stmt.flatten(): + if not _is_keyword(tok): + continue + upper = tok.value.upper() + if upper in _FORBIDDEN: + hits.add(upper) + + if hits: + raise SqlValidationError( + f"Forbidden keyword(s) present: {', '.join(sorted(hits))}. " f"This server is read-only." ) diff --git a/tests/test_sql_validator.py b/tests/test_sql_validator.py index 32f5822..1ebcef4 100644 --- a/tests/test_sql_validator.py +++ b/tests/test_sql_validator.py @@ -60,10 +60,12 @@ class TestRejected: validate_select("DROP TABLE device") def test_select_with_embedded_drop_rejected(self): - # Belt-and-suspenders: even if "DROP" appears in a quoted string-ish - # position our keyword filter still catches it. AXL would also reject - # this, but failing fast on the client saves a SOAP round-trip. - with pytest.raises(SqlValidationError, match="DROP"): + # Belt-and-suspenders: even if "DROP" appears alongside a SELECT, the + # query is rejected. Under the sqlparse-based validator the rejection + # reason is now "Multiple statements detected" (caught earlier than + # forbidden-keyword scan); under the prior regex validator it was + # "DROP". Either is correct — we just want this query refused. + with pytest.raises(SqlValidationError, match="DROP|Multiple"): validate_select("SELECT 1 FROM device; DROP TABLE device") def test_truncate_rejected(self): @@ -107,7 +109,10 @@ class TestStringLiterals: assert validate_select(q) def test_actual_drop_outside_literal_still_blocked(self): - with pytest.raises(SqlValidationError, match="DROP"): + # See test_select_with_embedded_drop_rejected — rejection reason + # is now multi-statement detection (sqlparse catches it earlier), + # but the query still fails closed. + with pytest.raises(SqlValidationError, match="DROP|Multiple"): validate_select("SELECT 1 FROM device; DROP TABLE backup") def test_escaped_quote_in_literal(self): @@ -175,3 +180,54 @@ class TestLiteralPreservedInOutput: q = "SELECT 1 FROM numplan WHERE description = 'log: DROP detected'" result = validate_select(q) assert "'log: DROP detected'" in result + + +class TestSqlparseSpecific: + """Cases that exercise wins of the sqlparse-based validator over the + earlier regex implementation. Each test names the property being checked. + """ + + def test_multi_statement_explicit_reject(self): + # Two SELECTs, no forbidden keywords. The regex validator accepted + # this because neither SELECT is in _FORBIDDEN. sqlparse catches it + # via the explicit statement-count check. + with pytest.raises(SqlValidationError, match="Multiple"): + validate_select("SELECT 1; SELECT 2") + + def test_multi_statement_with_intervening_comments_reject(self): + # Comments between statements don't disguise the multi-statement + # nature — sqlparse still parses 2 statements. + with pytest.raises(SqlValidationError, match="Multiple"): + validate_select("SELECT 1 /* break */; /* and again */ SELECT 2") + + def test_keyword_substring_in_identifier_passes(self): + # `inserted_at`, `update_status`, `dropped_call_count` — all + # legitimate column names containing forbidden-keyword substrings. + # sqlparse classifies them as Token.Name (single tokens), so the + # forbidden scan correctly ignores them. + for col in ("inserted_at", "update_status", "dropped_call_count"): + q = f"SELECT {col} FROM device" + assert validate_select(q) == q, f"column {col!r} should pass" + + def test_forbidden_keyword_inside_cte_rejected(self): + # A CTE that internally does a DELETE must still be caught — sqlparse + # walks into nested groups, so the DELETE keyword token is found + # even though it's inside `WITH x AS (...)`. + with pytest.raises(SqlValidationError, match="DELETE"): + validate_select("WITH x AS (DELETE FROM y) SELECT * FROM x") + + def test_drop_inside_block_comment_passes(self): + # The DROP is inside a /* ... */ comment. sqlparse classifies the + # entire comment as Token.Comment, so its content never reaches the + # forbidden-keyword scan. + q = "SELECT 1 /* ; DROP TABLE foo */ FROM device" + assert validate_select(q) == q + + def test_nested_cte_all_clean_passes(self): + # Multiple CTEs chained, all SELECT-only — must accept. + q = ( + "WITH a AS (SELECT 1 AS n FROM systables), " + "b AS (SELECT n FROM a WHERE n > 0) " + "SELECT * FROM b" + ) + assert validate_select(q) == q diff --git a/uv.lock b/uv.lock index 1a56fca..8b4b003 100644 --- a/uv.lock +++ b/uv.lock @@ -793,13 +793,14 @@ wheels = [ [[package]] name = "mcaxl" -version = "2026.4.27" +version = "2026.4.27.1" source = { editable = "." } dependencies = [ { name = "fastmcp" }, { name = "numpy" }, { name = "platformdirs" }, { name = "python-dotenv" }, + { name = "sqlparse" }, { name = "zeep" }, ] @@ -817,6 +818,7 @@ requires-dist = [ { name = "pytest", marker = "extra == 'test'", specifier = ">=8.0" }, { name = "pytest-asyncio", marker = "extra == 'test'", specifier = ">=0.24" }, { name = "python-dotenv", specifier = ">=1.0" }, + { name = "sqlparse", specifier = ">=0.5" }, { name = "zeep", specifier = ">=4.3" }, ] provides-extras = ["test"] @@ -1545,6 +1547,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b7/46/f5af3402b579fd5e11573ce652019a67074317e18c1935cc0b4ba9b35552/secretstorage-3.5.0-py3-none-any.whl", hash = "sha256:0ce65888c0725fcb2c5bc0fdb8e5438eece02c523557ea40ce0703c266248137", size = 15554, upload-time = "2025-11-23T19:02:51.545Z" }, ] +[[package]] +name = "sqlparse" +version = "0.5.5" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/90/76/437d71068094df0726366574cf3432a4ed754217b436eb7429415cf2d480/sqlparse-0.5.5.tar.gz", hash = "sha256:e20d4a9b0b8585fdf63b10d30066c7c94c5d7a7ec47c889a2d83a3caa93ff28e", size = 120815, upload-time = "2025-12-19T07:17:45.073Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/49/4b/359f28a903c13438ef59ebeee215fb25da53066db67b305c125f1c6d2a25/sqlparse-0.5.5-py3-none-any.whl", hash = "sha256:12a08b3bf3eec877c519589833aed092e2444e68240a3577e8e26148acc7b1ba", size = 46138, upload-time = "2025-12-19T07:17:46.573Z" }, +] + [[package]] name = "sse-starlette" version = "3.3.4"