SQL validator: ignore string literals; CSS impact: add primary + 7 more
Two defects found during live-cluster audit shakedown.
1. SQL validator false-positives on string literals
The forbidden-keyword check tokenized the entire query, including
contents of single-quoted string literals. CSS names like
'Call Forward-CSS', DN descriptions containing 'DELETE', or partition
names with 'INSERT' all tripped the validator even though the SQL
itself was clean read-only. Found while running impact analysis on
"Call Forward-CSS".
Fix: strip string literals (single-quoted, with '' as escape) into
whitespace before the forbidden-keyword tokenization. The cleaned
query returned to the caller still contains the literals — they're
only invisible to the analysis pass.
7 new tests covering: words inside literals (Call/Drop/Delete/etc.),
escaped quotes, multiple literals, and the critical case where a
forbidden keyword appears immediately after a literal.
2. CSS impact analysis missed primary device CSS + 7 other refs
Running route_devices_using_css("E911CSS") returned total=0 even
though E911CSS is configured in the cluster. Root cause: our
enumeration covered device.fkcallingsearchspace_{reroute,restrict,
refer,rdntransform} but not the primary device.fkcallingsearchspace
itself — the column the GUI sets when assigning a CSS to a phone.
The simple unsuffixed name didn't match our earlier "%css%" schema
filter (the actual column spells out "callingsearchspace").
Added 8 new reference categories:
device_primary_css — the big one
device_cgpn_unknown_css — calling-party-unknown
line_monitoring_css — devicenumplanmap monitoring CSS
gateway_h323_called_xform_css — H.323 gateway transform
gateway_sip_called_xform_css — SIP trunk transform
huntpilot_max_wait_css — hunt pilot queue handling
huntpilot_no_agent_css — hunt pilot queue handling
huntpilot_queue_full_css — hunt pilot queue handling
Re-running on live cluster:
Internal-CSS: 146 -> 163 refs (16 new device_primary_css matches)
Call Forward-CSS: previously rejected by validator -> 150 refs
E911CSS: still 0 — high-confidence orphan finding now
This commit is contained in:
parent
e3fb10cb4b
commit
82d8fbe563
@ -517,6 +517,89 @@ _CSS_REFERENCE_QUERIES: dict[str, dict] = {
|
|||||||
WHERE rl.fkcallingsearchspace = '{pkid}'
|
WHERE rl.fkcallingsearchspace = '{pkid}'
|
||||||
""",
|
""",
|
||||||
},
|
},
|
||||||
|
# PRIMARY device CSS — the field the GUI sets when you assign "Calling
|
||||||
|
# Search Space" to a phone, gateway, or trunk. Not suffixed; spelled out
|
||||||
|
# as fkcallingsearchspace (no _xxx). This is where most CSS assignments
|
||||||
|
# actually live; missing it produced false-zero impact analyses.
|
||||||
|
"device_primary_css": {
|
||||||
|
"table": "device", "column": "fkcallingsearchspace",
|
||||||
|
"sql": """
|
||||||
|
SELECT d.name AS name, tc.name AS context, d.description AS description
|
||||||
|
FROM device d LEFT OUTER JOIN typeclass tc ON d.tkclass = tc.enum
|
||||||
|
WHERE d.fkcallingsearchspace = '{pkid}'
|
||||||
|
""",
|
||||||
|
},
|
||||||
|
"device_cgpn_unknown_css": {
|
||||||
|
"table": "device", "column": "fkcallingsearchspace_cgpnunknown",
|
||||||
|
"sql": """
|
||||||
|
SELECT d.name AS name, tc.name AS context, d.description AS description
|
||||||
|
FROM device d LEFT OUTER JOIN typeclass tc ON d.tkclass = tc.enum
|
||||||
|
WHERE d.fkcallingsearchspace_cgpnunknown = '{pkid}'
|
||||||
|
""",
|
||||||
|
},
|
||||||
|
# Line-level CSS for phone-line monitoring (BLF, presence)
|
||||||
|
"line_monitoring_css": {
|
||||||
|
"table": "devicenumplanmap", "column": "fkcallingsearchspace_monitoring",
|
||||||
|
"sql": """
|
||||||
|
SELECT d.name AS name, np.dnorpattern AS context, d.description AS description
|
||||||
|
FROM devicenumplanmap dnm
|
||||||
|
JOIN device d ON dnm.fkdevice = d.pkid
|
||||||
|
JOIN numplan np ON dnm.fknumplan = np.pkid
|
||||||
|
WHERE dnm.fkcallingsearchspace_monitoring = '{pkid}'
|
||||||
|
""",
|
||||||
|
},
|
||||||
|
# H.323 / SIP gateway called-party transformation CSSs
|
||||||
|
"gateway_h323_called_xform_css": {
|
||||||
|
"table": "h323device", "column": "fkcallingsearchspace_cntdpntransform",
|
||||||
|
"sql": """
|
||||||
|
SELECT d.name AS name, tc.name AS context, d.description AS description
|
||||||
|
FROM h323device h
|
||||||
|
JOIN device d ON h.fkdevice = d.pkid
|
||||||
|
LEFT OUTER JOIN typeclass tc ON d.tkclass = tc.enum
|
||||||
|
WHERE h.fkcallingsearchspace_cntdpntransform = '{pkid}'
|
||||||
|
""",
|
||||||
|
},
|
||||||
|
"gateway_sip_called_xform_css": {
|
||||||
|
"table": "sipdevice", "column": "fkcallingsearchspace_cntdpntransform",
|
||||||
|
"sql": """
|
||||||
|
SELECT d.name AS name, tc.name AS context, d.description AS description
|
||||||
|
FROM sipdevice s
|
||||||
|
JOIN device d ON s.fkdevice = d.pkid
|
||||||
|
LEFT OUTER JOIN typeclass tc ON d.tkclass = tc.enum
|
||||||
|
WHERE s.fkcallingsearchspace_cntdpntransform = '{pkid}'
|
||||||
|
""",
|
||||||
|
},
|
||||||
|
# Hunt pilot queue CSSs (max-wait, no-agent, queue-full destinations)
|
||||||
|
"huntpilot_max_wait_css": {
|
||||||
|
"table": "huntpilotqueue", "column": "fkcallingsearchspace_maxwaittime",
|
||||||
|
"sql": """
|
||||||
|
SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description
|
||||||
|
FROM huntpilotqueue hpq
|
||||||
|
JOIN numplan np ON hpq.fknumplan = np.pkid
|
||||||
|
LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid
|
||||||
|
WHERE hpq.fkcallingsearchspace_maxwaittime = '{pkid}'
|
||||||
|
""",
|
||||||
|
},
|
||||||
|
"huntpilot_no_agent_css": {
|
||||||
|
"table": "huntpilotqueue", "column": "fkcallingsearchspace_noagent",
|
||||||
|
"sql": """
|
||||||
|
SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description
|
||||||
|
FROM huntpilotqueue hpq
|
||||||
|
JOIN numplan np ON hpq.fknumplan = np.pkid
|
||||||
|
LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid
|
||||||
|
WHERE hpq.fkcallingsearchspace_noagent = '{pkid}'
|
||||||
|
""",
|
||||||
|
},
|
||||||
|
"huntpilot_queue_full_css": {
|
||||||
|
"table": "huntpilotqueue", "column": "fkcallingsearchspace_pilotqueuefull",
|
||||||
|
"sql": """
|
||||||
|
SELECT np.dnorpattern AS name, rp.name AS context, np.description AS description
|
||||||
|
FROM huntpilotqueue hpq
|
||||||
|
JOIN numplan np ON hpq.fknumplan = np.pkid
|
||||||
|
LEFT OUTER JOIN routepartition rp ON np.fkroutepartition = rp.pkid
|
||||||
|
WHERE hpq.fkcallingsearchspace_pilotqueuefull = '{pkid}'
|
||||||
|
""",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -14,6 +14,8 @@ import re
|
|||||||
|
|
||||||
_COMMENT_BLOCK = re.compile(r"/\*.*?\*/", re.DOTALL)
|
_COMMENT_BLOCK = re.compile(r"/\*.*?\*/", re.DOTALL)
|
||||||
_COMMENT_LINE = re.compile(r"--[^\n]*")
|
_COMMENT_LINE = re.compile(r"--[^\n]*")
|
||||||
|
# Match Informix string literals: single-quoted, with '' as escaped quote.
|
||||||
|
_STRING_LITERAL = re.compile(r"'(?:''|[^'])*'", re.DOTALL)
|
||||||
_FORBIDDEN = {
|
_FORBIDDEN = {
|
||||||
"INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER",
|
"INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER",
|
||||||
"TRUNCATE", "GRANT", "REVOKE", "MERGE", "REPLACE", "RENAME",
|
"TRUNCATE", "GRANT", "REVOKE", "MERGE", "REPLACE", "RENAME",
|
||||||
@ -31,17 +33,30 @@ def validate_select(query: str) -> str:
|
|||||||
|
|
||||||
Accepts SELECT and WITH (CTEs that ultimately return SELECT). Rejects
|
Accepts SELECT and WITH (CTEs that ultimately return SELECT). Rejects
|
||||||
anything else, and any query containing forbidden keywords as standalone
|
anything else, and any query containing forbidden keywords as standalone
|
||||||
tokens.
|
tokens *outside* string literals.
|
||||||
|
|
||||||
|
The cleaned query (with comments stripped) is what gets returned and sent
|
||||||
|
to AXL — string literals are NOT modified, only ignored during keyword
|
||||||
|
tokenization. So a query selecting WHERE name = 'Call Forward-CSS' is
|
||||||
|
safe: the literal "Call" inside quotes is invisible to the keyword check,
|
||||||
|
while the actual SQL with the unmodified literal travels intact to AXL.
|
||||||
"""
|
"""
|
||||||
if not query or not query.strip():
|
if not query or not query.strip():
|
||||||
raise SqlValidationError("Query is empty.")
|
raise SqlValidationError("Query is empty.")
|
||||||
|
|
||||||
stripped = _COMMENT_BLOCK.sub(" ", query)
|
cleaned = _COMMENT_BLOCK.sub(" ", query)
|
||||||
stripped = _COMMENT_LINE.sub(" ", stripped).strip().rstrip(";").strip()
|
cleaned = _COMMENT_LINE.sub(" ", cleaned).strip().rstrip(";").strip()
|
||||||
if not stripped:
|
if not cleaned:
|
||||||
raise SqlValidationError("Query is empty after stripping comments.")
|
raise SqlValidationError("Query is empty after stripping comments.")
|
||||||
|
|
||||||
upper_tokens = [t.upper() for t in _WORD_RE.findall(stripped)]
|
# Strip string literals before tokenizing so that words inside quoted
|
||||||
|
# values (e.g. CSS names containing "Call", DN descriptions containing
|
||||||
|
# "DELETE") don't trip the forbidden-keyword check. The cleaned query
|
||||||
|
# we return still contains the literals — only the analysis copy strips
|
||||||
|
# them.
|
||||||
|
for_analysis = _STRING_LITERAL.sub(" ", cleaned)
|
||||||
|
|
||||||
|
upper_tokens = [t.upper() for t in _WORD_RE.findall(for_analysis)]
|
||||||
if not upper_tokens:
|
if not upper_tokens:
|
||||||
raise SqlValidationError("Query contains no SQL keywords.")
|
raise SqlValidationError("Query contains no SQL keywords.")
|
||||||
|
|
||||||
@ -58,4 +73,4 @@ def validate_select(query: str) -> str:
|
|||||||
f"This server is read-only."
|
f"This server is read-only."
|
||||||
)
|
)
|
||||||
|
|
||||||
return stripped
|
return cleaned
|
||||||
|
|||||||
@ -82,3 +82,45 @@ class TestEdgeCases:
|
|||||||
def test_select_with_subquery(self):
|
def test_select_with_subquery(self):
|
||||||
q = "SELECT name FROM device WHERE pkid IN (SELECT fkdevice FROM numplan)"
|
q = "SELECT name FROM device WHERE pkid IN (SELECT fkdevice FROM numplan)"
|
||||||
assert "SELECT name FROM device" in validate_select(q)
|
assert "SELECT name FROM device" in validate_select(q)
|
||||||
|
|
||||||
|
|
||||||
|
class TestStringLiterals:
|
||||||
|
"""Forbidden keywords inside string literals must be ignored.
|
||||||
|
|
||||||
|
Otherwise CSS names like 'Call Forward-CSS', DN descriptions containing
|
||||||
|
'DELETE' (e.g., 'Delete this voicemail line'), or partition names with
|
||||||
|
'INSERT' would all fail to query, even though the SQL itself is read-only.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_call_inside_string_literal_passes(self):
|
||||||
|
q = "SELECT pkid FROM callingsearchspace WHERE name = 'Call Forward-CSS'"
|
||||||
|
result = validate_select(q)
|
||||||
|
assert "Call Forward-CSS" in result # literal preserved
|
||||||
|
|
||||||
|
def test_delete_inside_string_literal_passes(self):
|
||||||
|
q = "SELECT pkid FROM numplan WHERE description = 'Delete after audit'"
|
||||||
|
result = validate_select(q)
|
||||||
|
assert "Delete after audit" in result
|
||||||
|
|
||||||
|
def test_drop_inside_string_literal_passes(self):
|
||||||
|
q = "SELECT pkid FROM numplan WHERE description = 'DROP table backup'"
|
||||||
|
assert validate_select(q)
|
||||||
|
|
||||||
|
def test_actual_drop_outside_literal_still_blocked(self):
|
||||||
|
with pytest.raises(SqlValidationError, match="DROP"):
|
||||||
|
validate_select("SELECT 1 FROM device; DROP TABLE backup")
|
||||||
|
|
||||||
|
def test_escaped_quote_in_literal(self):
|
||||||
|
# Informix uses '' (doubled) as escaped single quote within literals
|
||||||
|
q = "SELECT pkid FROM numplan WHERE description = 'O''Brien''s line'"
|
||||||
|
result = validate_select(q)
|
||||||
|
assert "O''Brien''s line" in result
|
||||||
|
|
||||||
|
def test_keyword_just_outside_literal_blocked(self):
|
||||||
|
# The literal 'safe text' is fine; the trailing DROP is not.
|
||||||
|
with pytest.raises(SqlValidationError, match="DROP"):
|
||||||
|
validate_select("SELECT 1 FROM device WHERE x = 'safe text' OR DROP")
|
||||||
|
|
||||||
|
def test_multiple_literals(self):
|
||||||
|
q = "SELECT 1 FROM numplan WHERE name = 'CALL' AND description = 'UPDATE pending'"
|
||||||
|
assert validate_select(q)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user