Compare commits

...

2 Commits

Author SHA1 Message Date
9427e3d4df route_plan + client: device_grep + AXL anti-pattern error hints
Two complementary additions from cucx-docs's prompt-suggestions handoff
(see axl/agent-threads/cucx-prompt-suggestions/ for the source thread).

device_grep(pattern, classes=None) — fuzzy device discovery by name OR
description, optionally filtered by tkclass.name. Surfaces "wait, there
are TWO of these?" findings (parallel fax servers, duplicate CUBEs,
vestigial conference bridges) by grouping matches by class so the
structure of what matched is visible at a glance. CUCM-style % wildcards
work; case-insensitive matching via UPPER(); single quotes properly
escaped via _esc.

axl_sql error hints — when AXL returns an error AND the query contains
the trigger phrase, append a path-correction hint to the error message.
Two patterns shipped:

  - "Column (fkdevice) not found" + numplan in query → suggest the
    devicenumplanmap M:N join (the literal multi-attempt schema-discovery
    experience cucx-docs hit at Bingham — numplan has no direct fkdevice)

  - "not in database" + sipdestination in query → suggest sipdestinationgroup
    + sipprofile + axl_list_tables(pattern='sip%') for discovery (the
    `sipdestination` table is reasonable-sounding but doesn't exist)

Hints are surgical (both error fragment AND query trigger must match)
to keep false-positive risk near zero. Validator behavior unchanged —
this is post-execution error augmentation, not gate enhancement. Failing
queries now raise RuntimeError(augmented) when a hint applies; otherwise
the original exception passes through unchanged.

Tests: +19 (8 device_grep + 11 error-hints with end-to-end mock through
execute_sql_query). Full suite 219 → 238 passing.

Live-cluster smoke test still pending (TLS handshake intermittent
this session). Sequencing nit from cucx-docs's msg 003 (move error-hint
earlier) honored — bundled with device_grep in this single commit.
2026-05-05 17:41:15 -06:00
cd08a7ec76 route_plan: add patterns_targeting_device + wildcard count/enumerate
Inverse of list_route_lists_and_groups — given a destination device,
return every numplan whose direct target is that device. Closes the
highest-priority gap from cucx-docs's prompt-suggestions handoff
(see axl/agent-threads/cucx-prompt-suggestions/001 + 003 for the
multi-message context).

Schema walk: device → devicenumplanmap → numplan, with LEFT JOINs
to routepartition + typepatternusage for friendly output. M:N is the
landmark — numplan does NOT have a direct fkdevice column, which was
cucx-docs's literal multi-attempt schema-discovery experience that
motivated the tool.

Three wildcard-expansion modes per cucx-docs Q2:
  - False (default) — patterns intact
  - "count" — per-pattern digit-string estimate + total surface;
    unbounded patterns (`!`, `@`) reported as None count and force
    the total to None so an auditor sees the partial measurement;
    per-pattern caps at 10,000 to prevent runaway estimation
  - "enumerate" — actual digit-string list, only for tightly-bounded
    patterns (no `!`, no `@`, no `.`, ≤ 1,000 expansions); patterns
    that violate any constraint return null with a skip reason

Direct-target only per Q1 — full transitive reachability composes
with route_lists_and_groups + route_translation_chain, called out
both in the docstring and in the response's _note field.

37 new tests cover the math layer (count/enumerate helpers in
isolation), the bounds/cap behavior, the unbounded-pattern flagging,
empty-result handling, SQL injection escaping, and the integration
through a FakeAxlClient. Full suite: 219/219 passing.

Live-cluster smoke test pending — cluster TLS intermittently failing
this session; will re-verify once stable.
2026-05-05 17:38:07 -06:00
6 changed files with 912 additions and 1 deletions

View File

@ -254,7 +254,19 @@ class AxlClient:
if cached is not None:
return {**cached, "_cache": "hit"}
self._ensure_connected()
resp = self._service.executeSQLQuery(sql=cleaned)
try:
resp = self._service.executeSQLQuery(sql=cleaned)
except Exception as e:
# Anti-pattern hints — if the AXL error matches a known operator
# mistake (numplan.fkdevice, FROM sipdestination, etc.), append
# the right-path hint to the error message. Saves the 3-5 turns
# an LLM would otherwise spend rediscovering the schema. See
# axl/agent-threads/cucx-prompt-suggestions/ for the source
# observations that motivated each hint.
augmented = _augment_axl_error(str(e), cleaned)
if augmented != str(e):
raise RuntimeError(augmented) from e
raise
rows = _parse_sql_rows(resp)
result = {"row_count": len(rows), "rows": rows, "query": cleaned}
self._response_cache.set("executeSQLQuery", {"sql": cleaned}, result)
@ -431,3 +443,52 @@ def _stringify(v: Any) -> Any:
if v is None or isinstance(v, (str, int, float, bool)):
return v
return str(v)
# ─── AXL anti-pattern error hints ──────────────────────────────────────
#
# When AXL returns a specific class of error AND the query contains the
# trigger phrase, append a hint to the error explaining the right path.
# These are surgical (not fuzzy) because targeted hints have lower false-
# positive risk than generic "did you mean ..." suggestions.
#
# Source: cucx-docs handoff (msg 003 in
# axl/agent-threads/cucx-prompt-suggestions/) — three patterns recurring
# enough during cucx-docs CUCM 15 audits to warrant a structural fix.
_AXL_ERROR_HINTS: list[dict[str, str]] = [
{
"error_fragment": "Column (fkdevice) not found",
"query_must_contain": "numplan",
"hint": (
"Hint: numplan has no fkdevice column. The numplan-to-device "
"link is M:N through devicenumplanmap. Try:\n"
" JOIN devicenumplanmap m ON m.fknumplan = numplan.pkid\n"
" JOIN device d ON m.fkdevice = d.pkid"
),
},
{
"error_fragment": "not in database",
"query_must_contain": "sipdestination",
"hint": (
"Hint: `sipdestination` does not exist as a table. SIP trunk "
"destinations live on `device` joined with `sipdestinationgroup` "
"and `sipprofile`. Try `axl_list_tables(pattern='sip%')` to see "
"the actual SIP-related tables."
),
},
]
def _augment_axl_error(error_text: str, query: str) -> str:
"""Append a hint to the AXL error message if any anti-pattern matches.
Returns `error_text` unchanged when no hint applies caller can compare
identity to decide whether to wrap the original exception.
"""
upper_query = query.upper()
for entry in _AXL_ERROR_HINTS:
if (entry["error_fragment"] in error_text
and entry["query_must_contain"].upper() in upper_query):
return f"{error_text}\n\n{entry['hint']}"
return error_text

View File

@ -261,6 +261,332 @@ def inspect_pattern(
}
def device_grep(
client: "AxlClient",
pattern: str,
classes: list[str] | None = None,
) -> dict:
"""Fuzzy device discovery — match `pattern` against name OR description,
optionally filtered by device class.
Surfaces the kind of "wait, there are TWO of these?" findings that
cucx-docs hit at Bingham (parallel ZetaFax + RightFax fax servers, etc.):
devices whose name or description contains a vendor/role keyword,
grouped by class so trunks-vs-phones-vs-route-lists is visible at a
glance.
Args:
pattern: substring to match (case-insensitive) against `device.name`
OR `device.description`. CUCM-style `%` wildcards work pass
`"FAX"` for substring, `"FAX%"` for prefix-only, etc.
classes: optional `tkclass.name` filter e.g.
`["Phone", "Trunk"]`, `["Route List"]`, `["Gateway", "H323 Gateway"]`.
If None, all classes returned.
Returns:
`{pattern, match_count, groups: {class_name: [{name, description, type, pool}, ...]}}`
"""
if not pattern or not pattern.strip():
raise ValueError("pattern must be a non-empty string")
safe_pat = _esc(pattern)
class_filter = ""
if classes:
escaped = ", ".join(f"'{_esc(c)}'" for c in classes)
class_filter = f"AND tc.name IN ({escaped})"
sql = f"""
SELECT
d.name,
d.description,
tc.name AS class_name,
dt.name AS device_type,
dp.name AS pool_name
FROM device d
LEFT OUTER JOIN typeclass tc ON d.tkclass = tc.enum
LEFT OUTER JOIN typemodel dt ON d.tkmodel = dt.enum
LEFT OUTER JOIN devicepool dp ON d.fkdevicepool = dp.pkid
WHERE (UPPER(d.name) LIKE UPPER('%{safe_pat}%')
OR UPPER(d.description) LIKE UPPER('%{safe_pat}%'))
{class_filter}
ORDER BY tc.name, d.name
"""
result = client.execute_sql_query(sql)
rows = result["rows"]
groups: dict[str, list] = {}
for row in rows:
cls = row.get("class_name") or "Unknown"
groups.setdefault(cls, []).append({
"name": row.get("name"),
"description": row.get("description"),
"type": row.get("device_type"),
"pool": row.get("pool_name"),
})
return {
"pattern": pattern,
"classes_filter": classes,
"match_count": len(rows),
"groups": groups,
}
def patterns_targeting_device(
client: "AxlClient",
device_name: str,
expand_wildcards: str | bool = False,
) -> dict:
"""Inverse of `list_route_lists_and_groups` — given a destination device,
return every numplan whose direct target is that device.
Walks `device devicenumplanmap numplan` (the M:N join `numplan` does
NOT have a direct `fkdevice`; that's a common schema gotcha).
For "what numbers can ultimately reach this device" (including
translation-pattern hops and route-list intermediaries), compose this
tool with `list_route_lists_and_groups` and `route_translation_chain`.
This returns only direct-target patterns.
Args:
device_name: exact device name. Route Lists, SIP Trunks, CTI Route
Points, Phones any device that can be a direct numplan target.
expand_wildcards: how to handle wildcard patterns in the result.
- `False` (default): patterns returned as-is, syntax intact
- `"count"`: each pattern annotated with a per-pattern digit-string
count estimate; total surface across all patterns also returned.
Estimates capped at `_COUNT_CAP` per pattern; unbounded patterns
(`!`, `@`) reported as `None` count with `unbounded: true`
- `"enumerate"`: actually list digit strings only safe for
tightly bounded patterns (no `!`, no `@`, no `.`,
`_ENUMERATE_CAP` total digit strings). Patterns that violate
either constraint are returned with `enumerated_digits: null` +
`enumeration_skipped: <reason>`
"""
if expand_wildcards not in (False, "count", "enumerate"):
raise ValueError(
f"expand_wildcards must be False, 'count', or 'enumerate'; "
f"got {expand_wildcards!r}"
)
sql = f"""
SELECT
n.dnorpattern AS pattern,
rp.name AS partition,
n.description,
n.calledpartytransformationmask AS xform_mask,
n.prefixdigitsout AS prefix,
tp.name AS pattern_type,
d.name AS destination_device,
tc.name AS destination_class
FROM device d
JOIN typeclass tc ON d.tkclass = tc.enum
JOIN devicenumplanmap m ON m.fkdevice = d.pkid
JOIN numplan n ON m.fknumplan = n.pkid
LEFT OUTER JOIN routepartition rp ON n.fkroutepartition = rp.pkid
LEFT OUTER JOIN typepatternusage tp ON n.tkpatternusage = tp.enum
WHERE d.name = '{_esc(device_name)}'
ORDER BY n.dnorpattern
"""
result = client.execute_sql_query(sql)
rows = result["rows"]
patterns: list[dict] = []
total_count: int | None = 0 # None once we hit an unbounded pattern in 'count' mode
truncated = False
for row in rows:
entry: dict = {
"pattern": row.get("pattern"),
"partition": row.get("partition"),
"description": row.get("description"),
"pattern_type": row.get("pattern_type"),
"xform_mask": row.get("xform_mask"),
"prefix": row.get("prefix"),
}
pattern = entry["pattern"] or ""
if expand_wildcards == "count":
count, unbounded, capped = _count_pattern_digit_strings(pattern)
entry["digit_count"] = count
if unbounded:
entry["unbounded"] = True
total_count = None
if capped:
entry["count_capped"] = True
truncated = True
if total_count is not None and count is not None:
total_count += count
elif expand_wildcards == "enumerate":
digits, reason = _enumerate_pattern_digit_strings(pattern)
entry["enumerated_digits"] = digits
if digits is None:
entry["enumeration_skipped"] = reason
patterns.append(entry)
out = {
"device_name": device_name,
"destination_class": rows[0]["destination_class"] if rows else None,
"pattern_count": len(patterns),
"patterns": patterns,
"_note": (
"Direct-target patterns only. For transitive reachability "
"(translation patterns, route-list intermediaries), compose "
"with route_lists_and_groups + route_translation_chain."
),
}
if expand_wildcards == "count":
out["total_digit_count"] = total_count
if truncated:
out["truncated"] = True
return out
# Wildcard expansion bounds — per cucx-docs Q2 reply (msg 003 in
# axl/agent-threads/cucx-prompt-suggestions/). 'count' caps per-pattern
# estimates at _COUNT_CAP to avoid runaway computation on patterns like
# X{8} (1e8); 'enumerate' caps total digit strings at _ENUMERATE_CAP and
# refuses unbounded or `.`-containing patterns entirely.
_COUNT_CAP = 10_000
_ENUMERATE_CAP = 1_000
def _count_pattern_digit_strings(pattern: str) -> tuple[int | None, bool, bool]:
"""Estimate the number of distinct digit strings a pattern can match.
Returns (count, unbounded, capped):
- count: estimate, or None if unbounded
- unbounded: True if pattern contains `!` or `@` (treated as
infinite the @ NANP/international dial plan is bounded but
too large to enumerate meaningfully)
- capped: True if the estimate hit _COUNT_CAP and was truncated
"""
if not pattern:
return 0, False, False
# `+` is a literal in CUCM (E.164 prefix); strip for analysis but it
# doesn't change the count. Same for `.` (purely visual separator).
p = pattern.replace("+", "").replace(".", "")
if "!" in p or "@" in p:
return None, True, False
count = 1
i = 0
while i < len(p):
ch = p[i]
if ch == "X":
count *= 10
elif ch == "[":
close = p.find("]", i)
if close == -1:
# Malformed — be conservative
return None, False, False
spec = p[i + 1 : close]
count *= _count_charclass(spec)
i = close
# other characters are literals contributing factor 1
if count > _COUNT_CAP:
return _COUNT_CAP, False, True
i += 1
return count, False, False
def _count_charclass(spec: str) -> int:
"""Count the size of a CUCM character class like '0-9' or 'abc' or '2-9a-c'.
Negation (`^`) is technically supported in CUCM but rare; treat as
its set size (the negated set has size 10 - len(set)).
"""
if not spec:
return 1
negated = spec.startswith("^")
body = spec[1:] if negated else spec
chars: set[str] = set()
i = 0
while i < len(body):
if i + 2 < len(body) and body[i + 1] == "-":
lo, hi = body[i], body[i + 2]
if lo.isdigit() and hi.isdigit() and lo <= hi:
chars.update(chr(c) for c in range(ord(lo), ord(hi) + 1))
i += 3
else:
chars.add(body[i])
i += 1
size = len(chars) or 1
return (10 - size) if negated else size
def _enumerate_pattern_digit_strings(pattern: str) -> tuple[list[str] | None, str | None]:
"""Actually enumerate the digit strings a pattern matches.
Returns (digits, skip_reason):
- digits: list of digit strings, or None if skipped
- skip_reason: human-readable explanation when skipped
Only safe for tightly-bounded patterns. Per cucx-docs:
no `!`, no `@`, no `.`, total _ENUMERATE_CAP.
"""
if not pattern:
return [""], None
if "!" in pattern:
return None, "contains '!' (unbounded)"
if "@" in pattern:
return None, "contains '@' (NANP/international dial plan; too large)"
if "." in pattern:
return None, "contains '.' (separator semantics; not enumerable per spec)"
p = pattern.replace("+", "")
count, _, _ = _count_pattern_digit_strings(p)
if count is None:
return None, "estimated count is unbounded"
if count > _ENUMERATE_CAP:
return None, f"estimated count {count} exceeds cap {_ENUMERATE_CAP}; use 'count' mode"
out = [""]
i = 0
while i < len(p):
ch = p[i]
if ch == "X":
out = [s + d for s in out for d in "0123456789"]
i += 1
elif ch == "[":
close = p.find("]", i)
if close == -1:
return None, "malformed character class"
spec = p[i + 1 : close]
chars = _expand_charclass(spec)
out = [s + c for s in out for c in chars]
i = close + 1
else:
out = [s + ch for s in out]
i += 1
return out, None
def _expand_charclass(spec: str) -> list[str]:
"""Expand a character class like '0-9' or 'abc' to its member list."""
if not spec:
return [""]
negated = spec.startswith("^")
body = spec[1:] if negated else spec
chars: set[str] = set()
i = 0
while i < len(body):
if i + 2 < len(body) and body[i + 1] == "-":
lo, hi = body[i], body[i + 2]
if lo.isdigit() and hi.isdigit() and lo <= hi:
chars.update(chr(c) for c in range(ord(lo), ord(hi) + 1))
i += 3
else:
chars.add(body[i])
i += 1
if negated:
chars = set("0123456789") - chars
return sorted(chars)
def list_route_lists_and_groups(client: "AxlClient", name: str | None = None) -> dict:
"""Route lists with their ordered route groups and member gateways/trunks.

View File

@ -239,6 +239,63 @@ def route_inspect_pattern(pattern: str, partition: str | None = None) -> dict:
return route_plan.inspect_pattern(_client(), pattern, partition)
@mcp.tool
def device_grep(
pattern: str,
classes: list[str] | None = None,
) -> dict:
"""Fuzzy device discovery — match `pattern` against name OR description,
optionally filtered by device class.
Surfaces "wait, there are TWO of these?" findings vendor systems
deployed twice and forgotten once (parallel fax servers, duplicate
CUBEs, vestigial conference bridges, etc.). Grouped by class so the
structure of what matched is visible at a glance.
Args:
pattern: substring (case-insensitive) matched against device name
OR description. CUCM-style `%` wildcards work `"FAX"` is
substring, `"FAX%"` is prefix-only.
classes: optional list of `tkclass.name` values to filter e.g.
`["Phone", "Trunk"]`, `["Route List"]`, `["Gateway", "H323 Gateway"]`.
"""
return route_plan.device_grep(_client(), pattern, classes)
@mcp.tool
def route_patterns_targeting(
device_name: str,
expand_wildcards: str | bool = False,
) -> dict:
"""Inverse of `route_lists_and_groups`: given a destination device name,
return every numplan whose direct target is that device.
Answers questions like *"which DIDs route to this fax server?"* in a
single call instead of a multi-attempt schema-discovery walk through
`numplan` `devicenumplanmap` `device`.
Direct-target only does NOT walk through translation patterns or
route-list intermediaries. For full transitive reachability, compose
with `route_lists_and_groups` and `route_translation_chain`.
Args:
device_name: exact device name (case-sensitive). Route Lists, SIP
trunks, CTI Route Points, Phones any device that can be a
direct numplan target.
expand_wildcards: how to handle wildcard patterns:
False (default) patterns returned as-is.
"count" annotate each pattern with a digit-string count
estimate; total surface across all patterns also returned.
Patterns containing `!` or `@` are flagged unbounded.
"enumerate" actually list digit strings. Only safe for
tightly-bounded patterns; returns a skip reason for any
pattern with `!`, `@`, `.`, or estimated >1000 expansions.
"""
return route_plan.patterns_targeting_device(
_client(), device_name, expand_wildcards
)
@mcp.tool
def route_lists_and_groups(name: str | None = None) -> dict:
"""Route lists with their ordered route groups and member devices.

View File

@ -0,0 +1,126 @@
"""Tests for the AXL anti-pattern error-hint enhancement.
Source: cucx-docs handoff (msg 003 in
axl/agent-threads/cucx-prompt-suggestions/) three recurring operator
mistakes with cluster-side error messages that don't suggest the right
path. Hints are surgical: only fire when both the error fragment AND
the query trigger phrase match.
"""
import pytest
from mcaxl.client import _augment_axl_error
class TestNumplanFkdeviceHint:
def test_fires_on_matching_error_and_query(self):
err = "Column (fkdevice) not found in any table in the query (or SLV is undefined)."
query = "SELECT name FROM numplan WHERE fkdevice = 'foo'"
out = _augment_axl_error(err, query)
assert err in out
assert "devicenumplanmap" in out
assert "M:N" in out
def test_no_fire_on_error_alone(self):
# Same error fragment but query doesn't mention numplan
err = "Column (fkdevice) not found in any table in the query."
query = "SELECT name FROM device WHERE fkdevice = 'foo'"
assert _augment_axl_error(err, query) == err
def test_no_fire_on_unrelated_error(self):
err = "Some completely different error about a different column."
query = "SELECT name FROM numplan"
assert _augment_axl_error(err, query) == err
def test_case_insensitive_query_match(self):
# User's query in mixed case should still trigger
err = "Column (fkdevice) not found in any table in the query."
query = "SELECT name FROM NumPlan np WHERE np.fkdevice IS NOT NULL"
out = _augment_axl_error(err, query)
assert "devicenumplanmap" in out
class TestSipDestinationHint:
def test_fires_on_matching_error_and_query(self):
err = "Table (sipdestination) is not in database."
query = "SELECT * FROM sipdestination"
out = _augment_axl_error(err, query)
assert "sipdestinationgroup" in out
assert "axl_list_tables" in out
def test_no_fire_when_query_doesnt_reference_table(self):
err = "Table (xyz) is not in database."
query = "SELECT * FROM xyz"
assert _augment_axl_error(err, query) == err
class TestNoMatch:
def test_unrelated_error_returns_unchanged(self):
err = "Permission denied for user 'CCMSysUser'."
query = "SELECT * FROM device"
assert _augment_axl_error(err, query) == err
def test_empty_error_returns_empty(self):
assert _augment_axl_error("", "SELECT 1") == ""
def test_identity_check_for_no_hint(self):
"""Caller compares identity to decide whether to wrap the original
exception. Make sure no-hint path returns the literal input."""
err = "Random AXL error"
query = "SELECT 1"
out = _augment_axl_error(err, query)
assert out is err # not just equal — same object
class TestEndToEnd:
"""Confirm the augmentation propagates through execute_sql_query.
Uses a mock that raises the AXL exception from inside zeep's call;
the augmenter should wrap it as a RuntimeError with the hint
appended.
"""
def test_execute_sql_query_wraps_with_hint(self):
from unittest.mock import MagicMock
from mcaxl.client import AxlClient
from mcaxl.cache import AxlCache
# Real cache, but in a temp location so tests don't pollute
import tempfile
from pathlib import Path
with tempfile.TemporaryDirectory() as td:
cache = AxlCache(Path(td) / "test.sqlite", default_ttl=0, cluster_id="test")
client = AxlClient(cache)
# Bypass _ensure_connected by setting service directly
mock_service = MagicMock()
mock_service.executeSQLQuery.side_effect = RuntimeError(
"Server raised fault: Column (fkdevice) not found in any table"
)
client._service = mock_service
client._connected_at = 0.0
with pytest.raises(RuntimeError, match="devicenumplanmap"):
client.execute_sql_query(
"SELECT name FROM numplan WHERE fkdevice IS NOT NULL"
)
def test_execute_sql_query_passes_through_unrelated_error(self):
from unittest.mock import MagicMock
from mcaxl.client import AxlClient
from mcaxl.cache import AxlCache
import tempfile
from pathlib import Path
with tempfile.TemporaryDirectory() as td:
cache = AxlCache(Path(td) / "test.sqlite", default_ttl=0, cluster_id="test")
client = AxlClient(cache)
mock_service = MagicMock()
mock_service.executeSQLQuery.side_effect = RuntimeError(
"Some unrelated cluster error"
)
client._service = mock_service
client._connected_at = 0.0
# Original exception type + message preserved when no hint applies
with pytest.raises(RuntimeError, match="Some unrelated cluster error"):
client.execute_sql_query("SELECT * FROM device")

94
tests/test_device_grep.py Normal file
View File

@ -0,0 +1,94 @@
"""Tests for device_grep — fuzzy device discovery by name/description.
The "wait there are TWO of these?" finding shape from cucx-docs's
ZetaFax-vs-RightFax discovery (msg 001 in
axl/agent-threads/cucx-prompt-suggestions/).
"""
import pytest
from mcaxl.route_plan import device_grep
class FakeAxlClient:
def __init__(self, rows):
self._rows = rows
self.queries = []
def execute_sql_query(self, sql):
self.queries.append(sql)
return {"row_count": len(self._rows), "rows": self._rows}
def _make_row(name, description, class_name, device_type="SIP Trunk", pool="DP-1"):
return {
"name": name,
"description": description,
"class_name": class_name,
"device_type": device_type,
"pool_name": pool,
}
class TestDeviceGrepBasics:
def test_groups_by_class(self):
rows = [
_make_row("RightFax-Trunk", "RightFax inbound", "Trunk"),
_make_row("ZetaFax-Trunk", "ZetaFax internal", "Trunk"),
_make_row("SEPABCDFAXX01", "RightFax desk test", "Phone", "Cisco 8841"),
_make_row("RightFax-RL", "RightFax route list", "Route List"),
]
client = FakeAxlClient(rows)
result = device_grep(client, "FAX")
assert result["match_count"] == 4
assert set(result["groups"].keys()) == {"Trunk", "Phone", "Route List"}
assert len(result["groups"]["Trunk"]) == 2
def test_class_filter_passed_to_sql(self):
client = FakeAxlClient([])
device_grep(client, "FAX", classes=["Trunk", "Route List"])
# Both class names appear escaped in the SQL IN clause
sql = client.queries[0]
assert "tc.name IN" in sql
assert "'Trunk'" in sql
assert "'Route List'" in sql
def test_no_class_filter_omits_in_clause(self):
client = FakeAxlClient([])
device_grep(client, "FAX")
assert "tc.name IN" not in client.queries[0]
def test_empty_pattern_raises(self):
client = FakeAxlClient([])
with pytest.raises(ValueError, match="non-empty"):
device_grep(client, "")
with pytest.raises(ValueError, match="non-empty"):
device_grep(client, " ")
def test_pattern_quote_escaped(self):
client = FakeAxlClient([])
device_grep(client, "fake'; DROP TABLE device --")
# SQL injection via pattern is escaped (doubled single quotes)
assert "fake''" in client.queries[0]
def test_class_filter_quote_escaped(self):
client = FakeAxlClient([])
device_grep(client, "FAX", classes=["Phone'; DROP TABLE device --"])
assert "Phone''" in client.queries[0]
def test_unknown_class_grouped_separately(self):
# If tkclass enum doesn't resolve (LEFT JOIN miss), class_name is NULL
rows = [
{"name": "WeirdDevice", "description": "?", "class_name": None,
"device_type": None, "pool_name": None},
]
client = FakeAxlClient(rows)
result = device_grep(client, "weird")
assert "Unknown" in result["groups"]
assert result["groups"]["Unknown"][0]["name"] == "WeirdDevice"
def test_response_includes_filter_metadata(self):
client = FakeAxlClient([])
result = device_grep(client, "FAX", classes=["Trunk"])
assert result["pattern"] == "FAX"
assert result["classes_filter"] == ["Trunk"]

View File

@ -0,0 +1,247 @@
"""Tests for patterns_targeting_device + wildcard count/enumerate helpers.
Single-call tool that answers "what numplan rows directly target this
device" — the inverse of `list_route_lists_and_groups`. Direct-target
only per cucx-docs Q1 (msg 003 in axl/agent-threads/cucx-prompt-suggestions/).
Three wildcard-expansion modes per cucx-docs Q2:
- False: patterns returned as-is
- "count": digit-string estimates per pattern + total
- "enumerate": actual digit-string list, only for tightly-bounded patterns
"""
import pytest
from mcaxl.route_plan import (
_COUNT_CAP,
_ENUMERATE_CAP,
_count_pattern_digit_strings,
_enumerate_pattern_digit_strings,
patterns_targeting_device,
)
class FakeAxlClient:
"""Returns canned SQL results — no network, no zeep."""
def __init__(self, rows: list[dict]):
self._rows = rows
self.queries: list[str] = []
def execute_sql_query(self, sql: str) -> dict:
self.queries.append(sql)
return {"row_count": len(self._rows), "rows": self._rows}
# ─── Wildcard counting (the math layer) ────────────────────────────────
class TestCountPatternDigitStrings:
"""The estimator for how many distinct digit strings a pattern matches."""
@pytest.mark.parametrize("pattern,expected", [
("1234", 1), # all literals
("X", 10), # one wildcard digit
("XX", 100),
("XXX", 1_000),
("20878594XX", 100), # the real BMH RightFax block
("9.20878594XX", 100), # `.` is a separator, ignored
("+12085243000", 1), # E.164 literal
("[2-9]XXXXXX", 7_000_000), # NANP-ish, hits cap → returns _COUNT_CAP
("[2-9]", 8), # range
("[abc]", 3), # set
("[0-9]", 10), # equiv to X
("[02468]", 5), # explicit even digits
])
def test_bounded_patterns(self, pattern, expected):
count, unbounded, capped = _count_pattern_digit_strings(pattern)
assert unbounded is False
if expected > _COUNT_CAP:
assert count == _COUNT_CAP
assert capped is True
else:
assert count == expected
assert capped is False
@pytest.mark.parametrize("pattern", ["!", "9.!", "9.@", "@", "1!"])
def test_unbounded_patterns_flagged(self, pattern):
count, unbounded, capped = _count_pattern_digit_strings(pattern)
assert count is None
assert unbounded is True
assert capped is False
def test_empty_pattern_is_zero(self):
count, unbounded, capped = _count_pattern_digit_strings("")
assert count == 0
assert unbounded is False
assert capped is False
def test_malformed_charclass_is_unbounded(self):
# `[abc` with no closing bracket — be conservative
count, unbounded, capped = _count_pattern_digit_strings("12[abc")
assert count is None
# ─── Wildcard enumeration (the strict-bounds layer) ────────────────────
class TestEnumeratePatternDigitStrings:
"""The strict enumerator — only safe for tightly-bounded patterns."""
def test_literals_only_returns_self(self):
digits, reason = _enumerate_pattern_digit_strings("1234")
assert digits == ["1234"]
assert reason is None
def test_single_X_expands_to_ten(self):
digits, reason = _enumerate_pattern_digit_strings("12X")
assert digits == [f"12{d}" for d in "0123456789"]
assert reason is None
def test_charclass_expands(self):
digits, _ = _enumerate_pattern_digit_strings("[2-4]")
assert digits == ["2", "3", "4"]
def test_block_expansion(self):
# 100 DIDs in a /XX block
digits, _ = _enumerate_pattern_digit_strings("20878594XX")
assert len(digits) == 100
assert "2087859400" in digits
assert "2087859499" in digits
@pytest.mark.parametrize("pattern,expected_reason_fragment", [
("9.20878594XX", "'.'"), # cucx-docs explicit: no `.`
("9!", "'!'"), # unbounded
("9.@", "'@'"), # `@` is checked before `.` — either reason is valid
("@", "'@'"),
])
def test_unsafe_patterns_skipped(self, pattern, expected_reason_fragment):
digits, reason = _enumerate_pattern_digit_strings(pattern)
assert digits is None
assert expected_reason_fragment in reason
def test_oversized_pattern_skipped(self):
# 4 X's = 10,000 expansions; cap is 1,000
digits, reason = _enumerate_pattern_digit_strings("XXXX")
assert digits is None
assert "exceeds cap" in reason
# ─── The tool itself (integration with FakeAxlClient) ──────────────────
class TestPatternsTargetingDevice:
"""End-to-end through the FakeAxlClient layer."""
@pytest.fixture
def rightfax_rows(self):
"""Realistic-shape rows: 1 wildcard block + 2 internal carveouts."""
base = {
"destination_device": "RightFax-RL",
"destination_class": "Route List",
"xform_mask": None,
"prefix": None,
"pattern_type": "Route Pattern",
}
return [
{**base, "pattern": "20878594XX", "partition": "External-PT",
"description": "RightFax inbound block (100 DIDs)"},
{**base, "pattern": "5550100", "partition": "Internal-PT",
"description": "Reception → fax internal hop"},
{**base, "pattern": "5550101", "partition": "Internal-PT",
"description": "Front desk → fax internal hop"},
]
def test_default_returns_patterns_intact(self, rightfax_rows):
client = FakeAxlClient(rightfax_rows)
result = patterns_targeting_device(client, "RightFax-RL")
assert result["device_name"] == "RightFax-RL"
assert result["destination_class"] == "Route List"
assert result["pattern_count"] == 3
# No expansion fields present on default
assert "total_digit_count" not in result
for p in result["patterns"]:
assert "digit_count" not in p
assert "enumerated_digits" not in p
def test_count_mode_per_pattern_and_total(self, rightfax_rows):
client = FakeAxlClient(rightfax_rows)
result = patterns_targeting_device(
client, "RightFax-RL", expand_wildcards="count"
)
# Per-pattern counts inline (cucx-docs Q2 refinement)
counts = {p["pattern"]: p["digit_count"] for p in result["patterns"]}
assert counts == {
"20878594XX": 100,
"5550100": 1,
"5550101": 1,
}
assert result["total_digit_count"] == 102
def test_count_mode_with_unbounded_pattern(self):
rows = [
{"pattern": "9.@", "partition": "PSTN-PT", "description": "PSTN catchall",
"destination_device": "Gateway-1", "destination_class": "Gateway",
"xform_mask": None, "prefix": None, "pattern_type": "Route Pattern"},
{"pattern": "5550100", "partition": "Internal-PT", "description": "extension",
"destination_device": "Gateway-1", "destination_class": "Gateway",
"xform_mask": None, "prefix": None, "pattern_type": "Route Pattern"},
]
client = FakeAxlClient(rows)
result = patterns_targeting_device(client, "Gateway-1", expand_wildcards="count")
# Total goes None once any pattern is unbounded — auditor must see this
assert result["total_digit_count"] is None
unbounded = next(p for p in result["patterns"] if p["pattern"] == "9.@")
assert unbounded["unbounded"] is True
assert unbounded["digit_count"] is None
def test_enumerate_mode_for_safe_patterns(self, rightfax_rows):
client = FakeAxlClient(rightfax_rows)
result = patterns_targeting_device(
client, "RightFax-RL", expand_wildcards="enumerate"
)
block = next(p for p in result["patterns"] if p["pattern"] == "20878594XX")
assert len(block["enumerated_digits"]) == 100
# The internal-extension carveouts are 1:1 enumerations of themselves
carveout = next(p for p in result["patterns"] if p["pattern"] == "5550100")
assert carveout["enumerated_digits"] == ["5550100"]
def test_enumerate_mode_skips_unsafe_with_reason(self):
rows = [{
"pattern": "9.@", "partition": "PSTN", "description": "PSTN catchall",
"destination_device": "GW", "destination_class": "Gateway",
"xform_mask": None, "prefix": None, "pattern_type": "Route Pattern",
}]
client = FakeAxlClient(rows)
result = patterns_targeting_device(client, "GW", expand_wildcards="enumerate")
skipped = result["patterns"][0]
assert skipped["enumerated_digits"] is None
# `@` is checked before `.` in the impl; either is a valid reason
# for `9.@` since both render the pattern non-enumerable.
assert "'@'" in skipped["enumeration_skipped"]
def test_empty_result_returns_zero_patterns(self):
client = FakeAxlClient([])
result = patterns_targeting_device(client, "Nonexistent-Device")
assert result["pattern_count"] == 0
assert result["destination_class"] is None
assert result["patterns"] == []
def test_invalid_expand_wildcards_raises(self):
client = FakeAxlClient([])
with pytest.raises(ValueError, match="expand_wildcards"):
patterns_targeting_device(client, "Foo", expand_wildcards="enumarate") # typo
def test_device_name_is_quote_escaped(self):
# SQL injection attempt via device name; _esc should handle it
client = FakeAxlClient([])
patterns_targeting_device(client, "fake'; DROP TABLE device --")
# The query was issued; double-single-quote indicates proper escaping
assert "fake''" in client.queries[0]
def test_docstring_compose_note_present_in_response(self, rightfax_rows):
"""The transitive-reachability composition note (Q1 docstring requirement)
must be in the response, so an operator browsing JSON sees it without
reading the docstring."""
client = FakeAxlClient(rightfax_rows)
result = patterns_targeting_device(client, "RightFax-RL")
assert "_note" in result
assert "translation" in result["_note"].lower()
assert "route_translation_chain" in result["_note"]