Compare commits
2 Commits
38307aad67
...
9427e3d4df
| Author | SHA1 | Date | |
|---|---|---|---|
| 9427e3d4df | |||
| cd08a7ec76 |
@ -254,7 +254,19 @@ class AxlClient:
|
|||||||
if cached is not None:
|
if cached is not None:
|
||||||
return {**cached, "_cache": "hit"}
|
return {**cached, "_cache": "hit"}
|
||||||
self._ensure_connected()
|
self._ensure_connected()
|
||||||
resp = self._service.executeSQLQuery(sql=cleaned)
|
try:
|
||||||
|
resp = self._service.executeSQLQuery(sql=cleaned)
|
||||||
|
except Exception as e:
|
||||||
|
# Anti-pattern hints — if the AXL error matches a known operator
|
||||||
|
# mistake (numplan.fkdevice, FROM sipdestination, etc.), append
|
||||||
|
# the right-path hint to the error message. Saves the 3-5 turns
|
||||||
|
# an LLM would otherwise spend rediscovering the schema. See
|
||||||
|
# axl/agent-threads/cucx-prompt-suggestions/ for the source
|
||||||
|
# observations that motivated each hint.
|
||||||
|
augmented = _augment_axl_error(str(e), cleaned)
|
||||||
|
if augmented != str(e):
|
||||||
|
raise RuntimeError(augmented) from e
|
||||||
|
raise
|
||||||
rows = _parse_sql_rows(resp)
|
rows = _parse_sql_rows(resp)
|
||||||
result = {"row_count": len(rows), "rows": rows, "query": cleaned}
|
result = {"row_count": len(rows), "rows": rows, "query": cleaned}
|
||||||
self._response_cache.set("executeSQLQuery", {"sql": cleaned}, result)
|
self._response_cache.set("executeSQLQuery", {"sql": cleaned}, result)
|
||||||
@ -431,3 +443,52 @@ def _stringify(v: Any) -> Any:
|
|||||||
if v is None or isinstance(v, (str, int, float, bool)):
|
if v is None or isinstance(v, (str, int, float, bool)):
|
||||||
return v
|
return v
|
||||||
return str(v)
|
return str(v)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── AXL anti-pattern error hints ──────────────────────────────────────
|
||||||
|
#
|
||||||
|
# When AXL returns a specific class of error AND the query contains the
|
||||||
|
# trigger phrase, append a hint to the error explaining the right path.
|
||||||
|
# These are surgical (not fuzzy) because targeted hints have lower false-
|
||||||
|
# positive risk than generic "did you mean ..." suggestions.
|
||||||
|
#
|
||||||
|
# Source: cucx-docs handoff (msg 003 in
|
||||||
|
# axl/agent-threads/cucx-prompt-suggestions/) — three patterns recurring
|
||||||
|
# enough during cucx-docs CUCM 15 audits to warrant a structural fix.
|
||||||
|
|
||||||
|
_AXL_ERROR_HINTS: list[dict[str, str]] = [
|
||||||
|
{
|
||||||
|
"error_fragment": "Column (fkdevice) not found",
|
||||||
|
"query_must_contain": "numplan",
|
||||||
|
"hint": (
|
||||||
|
"Hint: numplan has no fkdevice column. The numplan-to-device "
|
||||||
|
"link is M:N through devicenumplanmap. Try:\n"
|
||||||
|
" JOIN devicenumplanmap m ON m.fknumplan = numplan.pkid\n"
|
||||||
|
" JOIN device d ON m.fkdevice = d.pkid"
|
||||||
|
),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"error_fragment": "not in database",
|
||||||
|
"query_must_contain": "sipdestination",
|
||||||
|
"hint": (
|
||||||
|
"Hint: `sipdestination` does not exist as a table. SIP trunk "
|
||||||
|
"destinations live on `device` joined with `sipdestinationgroup` "
|
||||||
|
"and `sipprofile`. Try `axl_list_tables(pattern='sip%')` to see "
|
||||||
|
"the actual SIP-related tables."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _augment_axl_error(error_text: str, query: str) -> str:
|
||||||
|
"""Append a hint to the AXL error message if any anti-pattern matches.
|
||||||
|
|
||||||
|
Returns `error_text` unchanged when no hint applies — caller can compare
|
||||||
|
identity to decide whether to wrap the original exception.
|
||||||
|
"""
|
||||||
|
upper_query = query.upper()
|
||||||
|
for entry in _AXL_ERROR_HINTS:
|
||||||
|
if (entry["error_fragment"] in error_text
|
||||||
|
and entry["query_must_contain"].upper() in upper_query):
|
||||||
|
return f"{error_text}\n\n{entry['hint']}"
|
||||||
|
return error_text
|
||||||
|
|||||||
@ -261,6 +261,332 @@ def inspect_pattern(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def device_grep(
|
||||||
|
client: "AxlClient",
|
||||||
|
pattern: str,
|
||||||
|
classes: list[str] | None = None,
|
||||||
|
) -> dict:
|
||||||
|
"""Fuzzy device discovery — match `pattern` against name OR description,
|
||||||
|
optionally filtered by device class.
|
||||||
|
|
||||||
|
Surfaces the kind of "wait, there are TWO of these?" findings that
|
||||||
|
cucx-docs hit at Bingham (parallel ZetaFax + RightFax fax servers, etc.):
|
||||||
|
devices whose name or description contains a vendor/role keyword,
|
||||||
|
grouped by class so trunks-vs-phones-vs-route-lists is visible at a
|
||||||
|
glance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
pattern: substring to match (case-insensitive) against `device.name`
|
||||||
|
OR `device.description`. CUCM-style `%` wildcards work — pass
|
||||||
|
`"FAX"` for substring, `"FAX%"` for prefix-only, etc.
|
||||||
|
classes: optional `tkclass.name` filter — e.g.
|
||||||
|
`["Phone", "Trunk"]`, `["Route List"]`, `["Gateway", "H323 Gateway"]`.
|
||||||
|
If None, all classes returned.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
`{pattern, match_count, groups: {class_name: [{name, description, type, pool}, ...]}}`
|
||||||
|
"""
|
||||||
|
if not pattern or not pattern.strip():
|
||||||
|
raise ValueError("pattern must be a non-empty string")
|
||||||
|
|
||||||
|
safe_pat = _esc(pattern)
|
||||||
|
class_filter = ""
|
||||||
|
if classes:
|
||||||
|
escaped = ", ".join(f"'{_esc(c)}'" for c in classes)
|
||||||
|
class_filter = f"AND tc.name IN ({escaped})"
|
||||||
|
|
||||||
|
sql = f"""
|
||||||
|
SELECT
|
||||||
|
d.name,
|
||||||
|
d.description,
|
||||||
|
tc.name AS class_name,
|
||||||
|
dt.name AS device_type,
|
||||||
|
dp.name AS pool_name
|
||||||
|
FROM device d
|
||||||
|
LEFT OUTER JOIN typeclass tc ON d.tkclass = tc.enum
|
||||||
|
LEFT OUTER JOIN typemodel dt ON d.tkmodel = dt.enum
|
||||||
|
LEFT OUTER JOIN devicepool dp ON d.fkdevicepool = dp.pkid
|
||||||
|
WHERE (UPPER(d.name) LIKE UPPER('%{safe_pat}%')
|
||||||
|
OR UPPER(d.description) LIKE UPPER('%{safe_pat}%'))
|
||||||
|
{class_filter}
|
||||||
|
ORDER BY tc.name, d.name
|
||||||
|
"""
|
||||||
|
result = client.execute_sql_query(sql)
|
||||||
|
rows = result["rows"]
|
||||||
|
|
||||||
|
groups: dict[str, list] = {}
|
||||||
|
for row in rows:
|
||||||
|
cls = row.get("class_name") or "Unknown"
|
||||||
|
groups.setdefault(cls, []).append({
|
||||||
|
"name": row.get("name"),
|
||||||
|
"description": row.get("description"),
|
||||||
|
"type": row.get("device_type"),
|
||||||
|
"pool": row.get("pool_name"),
|
||||||
|
})
|
||||||
|
|
||||||
|
return {
|
||||||
|
"pattern": pattern,
|
||||||
|
"classes_filter": classes,
|
||||||
|
"match_count": len(rows),
|
||||||
|
"groups": groups,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def patterns_targeting_device(
|
||||||
|
client: "AxlClient",
|
||||||
|
device_name: str,
|
||||||
|
expand_wildcards: str | bool = False,
|
||||||
|
) -> dict:
|
||||||
|
"""Inverse of `list_route_lists_and_groups` — given a destination device,
|
||||||
|
return every numplan whose direct target is that device.
|
||||||
|
|
||||||
|
Walks `device → devicenumplanmap → numplan` (the M:N join — `numplan` does
|
||||||
|
NOT have a direct `fkdevice`; that's a common schema gotcha).
|
||||||
|
|
||||||
|
For "what numbers can ultimately reach this device" (including
|
||||||
|
translation-pattern hops and route-list intermediaries), compose this
|
||||||
|
tool with `list_route_lists_and_groups` and `route_translation_chain`.
|
||||||
|
This returns only direct-target patterns.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
device_name: exact device name. Route Lists, SIP Trunks, CTI Route
|
||||||
|
Points, Phones — any device that can be a direct numplan target.
|
||||||
|
expand_wildcards: how to handle wildcard patterns in the result.
|
||||||
|
- `False` (default): patterns returned as-is, syntax intact
|
||||||
|
- `"count"`: each pattern annotated with a per-pattern digit-string
|
||||||
|
count estimate; total surface across all patterns also returned.
|
||||||
|
Estimates capped at `_COUNT_CAP` per pattern; unbounded patterns
|
||||||
|
(`!`, `@`) reported as `None` count with `unbounded: true`
|
||||||
|
- `"enumerate"`: actually list digit strings — only safe for
|
||||||
|
tightly bounded patterns (no `!`, no `@`, no `.`, ≤
|
||||||
|
`_ENUMERATE_CAP` total digit strings). Patterns that violate
|
||||||
|
either constraint are returned with `enumerated_digits: null` +
|
||||||
|
`enumeration_skipped: <reason>`
|
||||||
|
"""
|
||||||
|
if expand_wildcards not in (False, "count", "enumerate"):
|
||||||
|
raise ValueError(
|
||||||
|
f"expand_wildcards must be False, 'count', or 'enumerate'; "
|
||||||
|
f"got {expand_wildcards!r}"
|
||||||
|
)
|
||||||
|
|
||||||
|
sql = f"""
|
||||||
|
SELECT
|
||||||
|
n.dnorpattern AS pattern,
|
||||||
|
rp.name AS partition,
|
||||||
|
n.description,
|
||||||
|
n.calledpartytransformationmask AS xform_mask,
|
||||||
|
n.prefixdigitsout AS prefix,
|
||||||
|
tp.name AS pattern_type,
|
||||||
|
d.name AS destination_device,
|
||||||
|
tc.name AS destination_class
|
||||||
|
FROM device d
|
||||||
|
JOIN typeclass tc ON d.tkclass = tc.enum
|
||||||
|
JOIN devicenumplanmap m ON m.fkdevice = d.pkid
|
||||||
|
JOIN numplan n ON m.fknumplan = n.pkid
|
||||||
|
LEFT OUTER JOIN routepartition rp ON n.fkroutepartition = rp.pkid
|
||||||
|
LEFT OUTER JOIN typepatternusage tp ON n.tkpatternusage = tp.enum
|
||||||
|
WHERE d.name = '{_esc(device_name)}'
|
||||||
|
ORDER BY n.dnorpattern
|
||||||
|
"""
|
||||||
|
result = client.execute_sql_query(sql)
|
||||||
|
rows = result["rows"]
|
||||||
|
|
||||||
|
patterns: list[dict] = []
|
||||||
|
total_count: int | None = 0 # None once we hit an unbounded pattern in 'count' mode
|
||||||
|
truncated = False
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
entry: dict = {
|
||||||
|
"pattern": row.get("pattern"),
|
||||||
|
"partition": row.get("partition"),
|
||||||
|
"description": row.get("description"),
|
||||||
|
"pattern_type": row.get("pattern_type"),
|
||||||
|
"xform_mask": row.get("xform_mask"),
|
||||||
|
"prefix": row.get("prefix"),
|
||||||
|
}
|
||||||
|
pattern = entry["pattern"] or ""
|
||||||
|
|
||||||
|
if expand_wildcards == "count":
|
||||||
|
count, unbounded, capped = _count_pattern_digit_strings(pattern)
|
||||||
|
entry["digit_count"] = count
|
||||||
|
if unbounded:
|
||||||
|
entry["unbounded"] = True
|
||||||
|
total_count = None
|
||||||
|
if capped:
|
||||||
|
entry["count_capped"] = True
|
||||||
|
truncated = True
|
||||||
|
if total_count is not None and count is not None:
|
||||||
|
total_count += count
|
||||||
|
elif expand_wildcards == "enumerate":
|
||||||
|
digits, reason = _enumerate_pattern_digit_strings(pattern)
|
||||||
|
entry["enumerated_digits"] = digits
|
||||||
|
if digits is None:
|
||||||
|
entry["enumeration_skipped"] = reason
|
||||||
|
|
||||||
|
patterns.append(entry)
|
||||||
|
|
||||||
|
out = {
|
||||||
|
"device_name": device_name,
|
||||||
|
"destination_class": rows[0]["destination_class"] if rows else None,
|
||||||
|
"pattern_count": len(patterns),
|
||||||
|
"patterns": patterns,
|
||||||
|
"_note": (
|
||||||
|
"Direct-target patterns only. For transitive reachability "
|
||||||
|
"(translation patterns, route-list intermediaries), compose "
|
||||||
|
"with route_lists_and_groups + route_translation_chain."
|
||||||
|
),
|
||||||
|
}
|
||||||
|
if expand_wildcards == "count":
|
||||||
|
out["total_digit_count"] = total_count
|
||||||
|
if truncated:
|
||||||
|
out["truncated"] = True
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
# Wildcard expansion bounds — per cucx-docs Q2 reply (msg 003 in
|
||||||
|
# axl/agent-threads/cucx-prompt-suggestions/). 'count' caps per-pattern
|
||||||
|
# estimates at _COUNT_CAP to avoid runaway computation on patterns like
|
||||||
|
# X{8} (1e8); 'enumerate' caps total digit strings at _ENUMERATE_CAP and
|
||||||
|
# refuses unbounded or `.`-containing patterns entirely.
|
||||||
|
_COUNT_CAP = 10_000
|
||||||
|
_ENUMERATE_CAP = 1_000
|
||||||
|
|
||||||
|
|
||||||
|
def _count_pattern_digit_strings(pattern: str) -> tuple[int | None, bool, bool]:
|
||||||
|
"""Estimate the number of distinct digit strings a pattern can match.
|
||||||
|
|
||||||
|
Returns (count, unbounded, capped):
|
||||||
|
- count: estimate, or None if unbounded
|
||||||
|
- unbounded: True if pattern contains `!` or `@` (treated as
|
||||||
|
infinite — the @ NANP/international dial plan is bounded but
|
||||||
|
too large to enumerate meaningfully)
|
||||||
|
- capped: True if the estimate hit _COUNT_CAP and was truncated
|
||||||
|
"""
|
||||||
|
if not pattern:
|
||||||
|
return 0, False, False
|
||||||
|
|
||||||
|
# `+` is a literal in CUCM (E.164 prefix); strip for analysis but it
|
||||||
|
# doesn't change the count. Same for `.` (purely visual separator).
|
||||||
|
p = pattern.replace("+", "").replace(".", "")
|
||||||
|
|
||||||
|
if "!" in p or "@" in p:
|
||||||
|
return None, True, False
|
||||||
|
|
||||||
|
count = 1
|
||||||
|
i = 0
|
||||||
|
while i < len(p):
|
||||||
|
ch = p[i]
|
||||||
|
if ch == "X":
|
||||||
|
count *= 10
|
||||||
|
elif ch == "[":
|
||||||
|
close = p.find("]", i)
|
||||||
|
if close == -1:
|
||||||
|
# Malformed — be conservative
|
||||||
|
return None, False, False
|
||||||
|
spec = p[i + 1 : close]
|
||||||
|
count *= _count_charclass(spec)
|
||||||
|
i = close
|
||||||
|
# other characters are literals contributing factor 1
|
||||||
|
if count > _COUNT_CAP:
|
||||||
|
return _COUNT_CAP, False, True
|
||||||
|
i += 1
|
||||||
|
return count, False, False
|
||||||
|
|
||||||
|
|
||||||
|
def _count_charclass(spec: str) -> int:
|
||||||
|
"""Count the size of a CUCM character class like '0-9' or 'abc' or '2-9a-c'.
|
||||||
|
|
||||||
|
Negation (`^`) is technically supported in CUCM but rare; treat as
|
||||||
|
its set size (the negated set has size 10 - len(set)).
|
||||||
|
"""
|
||||||
|
if not spec:
|
||||||
|
return 1
|
||||||
|
negated = spec.startswith("^")
|
||||||
|
body = spec[1:] if negated else spec
|
||||||
|
chars: set[str] = set()
|
||||||
|
i = 0
|
||||||
|
while i < len(body):
|
||||||
|
if i + 2 < len(body) and body[i + 1] == "-":
|
||||||
|
lo, hi = body[i], body[i + 2]
|
||||||
|
if lo.isdigit() and hi.isdigit() and lo <= hi:
|
||||||
|
chars.update(chr(c) for c in range(ord(lo), ord(hi) + 1))
|
||||||
|
i += 3
|
||||||
|
else:
|
||||||
|
chars.add(body[i])
|
||||||
|
i += 1
|
||||||
|
size = len(chars) or 1
|
||||||
|
return (10 - size) if negated else size
|
||||||
|
|
||||||
|
|
||||||
|
def _enumerate_pattern_digit_strings(pattern: str) -> tuple[list[str] | None, str | None]:
|
||||||
|
"""Actually enumerate the digit strings a pattern matches.
|
||||||
|
|
||||||
|
Returns (digits, skip_reason):
|
||||||
|
- digits: list of digit strings, or None if skipped
|
||||||
|
- skip_reason: human-readable explanation when skipped
|
||||||
|
|
||||||
|
Only safe for tightly-bounded patterns. Per cucx-docs:
|
||||||
|
no `!`, no `@`, no `.`, total ≤ _ENUMERATE_CAP.
|
||||||
|
"""
|
||||||
|
if not pattern:
|
||||||
|
return [""], None
|
||||||
|
if "!" in pattern:
|
||||||
|
return None, "contains '!' (unbounded)"
|
||||||
|
if "@" in pattern:
|
||||||
|
return None, "contains '@' (NANP/international dial plan; too large)"
|
||||||
|
if "." in pattern:
|
||||||
|
return None, "contains '.' (separator semantics; not enumerable per spec)"
|
||||||
|
|
||||||
|
p = pattern.replace("+", "")
|
||||||
|
count, _, _ = _count_pattern_digit_strings(p)
|
||||||
|
if count is None:
|
||||||
|
return None, "estimated count is unbounded"
|
||||||
|
if count > _ENUMERATE_CAP:
|
||||||
|
return None, f"estimated count {count} exceeds cap {_ENUMERATE_CAP}; use 'count' mode"
|
||||||
|
|
||||||
|
out = [""]
|
||||||
|
i = 0
|
||||||
|
while i < len(p):
|
||||||
|
ch = p[i]
|
||||||
|
if ch == "X":
|
||||||
|
out = [s + d for s in out for d in "0123456789"]
|
||||||
|
i += 1
|
||||||
|
elif ch == "[":
|
||||||
|
close = p.find("]", i)
|
||||||
|
if close == -1:
|
||||||
|
return None, "malformed character class"
|
||||||
|
spec = p[i + 1 : close]
|
||||||
|
chars = _expand_charclass(spec)
|
||||||
|
out = [s + c for s in out for c in chars]
|
||||||
|
i = close + 1
|
||||||
|
else:
|
||||||
|
out = [s + ch for s in out]
|
||||||
|
i += 1
|
||||||
|
return out, None
|
||||||
|
|
||||||
|
|
||||||
|
def _expand_charclass(spec: str) -> list[str]:
|
||||||
|
"""Expand a character class like '0-9' or 'abc' to its member list."""
|
||||||
|
if not spec:
|
||||||
|
return [""]
|
||||||
|
negated = spec.startswith("^")
|
||||||
|
body = spec[1:] if negated else spec
|
||||||
|
chars: set[str] = set()
|
||||||
|
i = 0
|
||||||
|
while i < len(body):
|
||||||
|
if i + 2 < len(body) and body[i + 1] == "-":
|
||||||
|
lo, hi = body[i], body[i + 2]
|
||||||
|
if lo.isdigit() and hi.isdigit() and lo <= hi:
|
||||||
|
chars.update(chr(c) for c in range(ord(lo), ord(hi) + 1))
|
||||||
|
i += 3
|
||||||
|
else:
|
||||||
|
chars.add(body[i])
|
||||||
|
i += 1
|
||||||
|
if negated:
|
||||||
|
chars = set("0123456789") - chars
|
||||||
|
return sorted(chars)
|
||||||
|
|
||||||
|
|
||||||
def list_route_lists_and_groups(client: "AxlClient", name: str | None = None) -> dict:
|
def list_route_lists_and_groups(client: "AxlClient", name: str | None = None) -> dict:
|
||||||
"""Route lists with their ordered route groups and member gateways/trunks.
|
"""Route lists with their ordered route groups and member gateways/trunks.
|
||||||
|
|
||||||
|
|||||||
@ -239,6 +239,63 @@ def route_inspect_pattern(pattern: str, partition: str | None = None) -> dict:
|
|||||||
return route_plan.inspect_pattern(_client(), pattern, partition)
|
return route_plan.inspect_pattern(_client(), pattern, partition)
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool
|
||||||
|
def device_grep(
|
||||||
|
pattern: str,
|
||||||
|
classes: list[str] | None = None,
|
||||||
|
) -> dict:
|
||||||
|
"""Fuzzy device discovery — match `pattern` against name OR description,
|
||||||
|
optionally filtered by device class.
|
||||||
|
|
||||||
|
Surfaces "wait, there are TWO of these?" findings — vendor systems
|
||||||
|
deployed twice and forgotten once (parallel fax servers, duplicate
|
||||||
|
CUBEs, vestigial conference bridges, etc.). Grouped by class so the
|
||||||
|
structure of what matched is visible at a glance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
pattern: substring (case-insensitive) matched against device name
|
||||||
|
OR description. CUCM-style `%` wildcards work — `"FAX"` is
|
||||||
|
substring, `"FAX%"` is prefix-only.
|
||||||
|
classes: optional list of `tkclass.name` values to filter — e.g.
|
||||||
|
`["Phone", "Trunk"]`, `["Route List"]`, `["Gateway", "H323 Gateway"]`.
|
||||||
|
"""
|
||||||
|
return route_plan.device_grep(_client(), pattern, classes)
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool
|
||||||
|
def route_patterns_targeting(
|
||||||
|
device_name: str,
|
||||||
|
expand_wildcards: str | bool = False,
|
||||||
|
) -> dict:
|
||||||
|
"""Inverse of `route_lists_and_groups`: given a destination device name,
|
||||||
|
return every numplan whose direct target is that device.
|
||||||
|
|
||||||
|
Answers questions like *"which DIDs route to this fax server?"* in a
|
||||||
|
single call instead of a multi-attempt schema-discovery walk through
|
||||||
|
`numplan` ↔ `devicenumplanmap` ↔ `device`.
|
||||||
|
|
||||||
|
Direct-target only — does NOT walk through translation patterns or
|
||||||
|
route-list intermediaries. For full transitive reachability, compose
|
||||||
|
with `route_lists_and_groups` and `route_translation_chain`.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
device_name: exact device name (case-sensitive). Route Lists, SIP
|
||||||
|
trunks, CTI Route Points, Phones — any device that can be a
|
||||||
|
direct numplan target.
|
||||||
|
expand_wildcards: how to handle wildcard patterns:
|
||||||
|
False (default) — patterns returned as-is.
|
||||||
|
"count" — annotate each pattern with a digit-string count
|
||||||
|
estimate; total surface across all patterns also returned.
|
||||||
|
Patterns containing `!` or `@` are flagged unbounded.
|
||||||
|
"enumerate" — actually list digit strings. Only safe for
|
||||||
|
tightly-bounded patterns; returns a skip reason for any
|
||||||
|
pattern with `!`, `@`, `.`, or estimated >1000 expansions.
|
||||||
|
"""
|
||||||
|
return route_plan.patterns_targeting_device(
|
||||||
|
_client(), device_name, expand_wildcards
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool
|
@mcp.tool
|
||||||
def route_lists_and_groups(name: str | None = None) -> dict:
|
def route_lists_and_groups(name: str | None = None) -> dict:
|
||||||
"""Route lists with their ordered route groups and member devices.
|
"""Route lists with their ordered route groups and member devices.
|
||||||
|
|||||||
126
tests/test_axl_error_hints.py
Normal file
126
tests/test_axl_error_hints.py
Normal file
@ -0,0 +1,126 @@
|
|||||||
|
"""Tests for the AXL anti-pattern error-hint enhancement.
|
||||||
|
|
||||||
|
Source: cucx-docs handoff (msg 003 in
|
||||||
|
axl/agent-threads/cucx-prompt-suggestions/) — three recurring operator
|
||||||
|
mistakes with cluster-side error messages that don't suggest the right
|
||||||
|
path. Hints are surgical: only fire when both the error fragment AND
|
||||||
|
the query trigger phrase match.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from mcaxl.client import _augment_axl_error
|
||||||
|
|
||||||
|
|
||||||
|
class TestNumplanFkdeviceHint:
|
||||||
|
def test_fires_on_matching_error_and_query(self):
|
||||||
|
err = "Column (fkdevice) not found in any table in the query (or SLV is undefined)."
|
||||||
|
query = "SELECT name FROM numplan WHERE fkdevice = 'foo'"
|
||||||
|
out = _augment_axl_error(err, query)
|
||||||
|
assert err in out
|
||||||
|
assert "devicenumplanmap" in out
|
||||||
|
assert "M:N" in out
|
||||||
|
|
||||||
|
def test_no_fire_on_error_alone(self):
|
||||||
|
# Same error fragment but query doesn't mention numplan
|
||||||
|
err = "Column (fkdevice) not found in any table in the query."
|
||||||
|
query = "SELECT name FROM device WHERE fkdevice = 'foo'"
|
||||||
|
assert _augment_axl_error(err, query) == err
|
||||||
|
|
||||||
|
def test_no_fire_on_unrelated_error(self):
|
||||||
|
err = "Some completely different error about a different column."
|
||||||
|
query = "SELECT name FROM numplan"
|
||||||
|
assert _augment_axl_error(err, query) == err
|
||||||
|
|
||||||
|
def test_case_insensitive_query_match(self):
|
||||||
|
# User's query in mixed case should still trigger
|
||||||
|
err = "Column (fkdevice) not found in any table in the query."
|
||||||
|
query = "SELECT name FROM NumPlan np WHERE np.fkdevice IS NOT NULL"
|
||||||
|
out = _augment_axl_error(err, query)
|
||||||
|
assert "devicenumplanmap" in out
|
||||||
|
|
||||||
|
|
||||||
|
class TestSipDestinationHint:
|
||||||
|
def test_fires_on_matching_error_and_query(self):
|
||||||
|
err = "Table (sipdestination) is not in database."
|
||||||
|
query = "SELECT * FROM sipdestination"
|
||||||
|
out = _augment_axl_error(err, query)
|
||||||
|
assert "sipdestinationgroup" in out
|
||||||
|
assert "axl_list_tables" in out
|
||||||
|
|
||||||
|
def test_no_fire_when_query_doesnt_reference_table(self):
|
||||||
|
err = "Table (xyz) is not in database."
|
||||||
|
query = "SELECT * FROM xyz"
|
||||||
|
assert _augment_axl_error(err, query) == err
|
||||||
|
|
||||||
|
|
||||||
|
class TestNoMatch:
|
||||||
|
def test_unrelated_error_returns_unchanged(self):
|
||||||
|
err = "Permission denied for user 'CCMSysUser'."
|
||||||
|
query = "SELECT * FROM device"
|
||||||
|
assert _augment_axl_error(err, query) == err
|
||||||
|
|
||||||
|
def test_empty_error_returns_empty(self):
|
||||||
|
assert _augment_axl_error("", "SELECT 1") == ""
|
||||||
|
|
||||||
|
def test_identity_check_for_no_hint(self):
|
||||||
|
"""Caller compares identity to decide whether to wrap the original
|
||||||
|
exception. Make sure no-hint path returns the literal input."""
|
||||||
|
err = "Random AXL error"
|
||||||
|
query = "SELECT 1"
|
||||||
|
out = _augment_axl_error(err, query)
|
||||||
|
assert out is err # not just equal — same object
|
||||||
|
|
||||||
|
|
||||||
|
class TestEndToEnd:
|
||||||
|
"""Confirm the augmentation propagates through execute_sql_query.
|
||||||
|
|
||||||
|
Uses a mock that raises the AXL exception from inside zeep's call;
|
||||||
|
the augmenter should wrap it as a RuntimeError with the hint
|
||||||
|
appended.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_execute_sql_query_wraps_with_hint(self):
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
from mcaxl.client import AxlClient
|
||||||
|
from mcaxl.cache import AxlCache
|
||||||
|
|
||||||
|
# Real cache, but in a temp location so tests don't pollute
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
cache = AxlCache(Path(td) / "test.sqlite", default_ttl=0, cluster_id="test")
|
||||||
|
client = AxlClient(cache)
|
||||||
|
# Bypass _ensure_connected by setting service directly
|
||||||
|
mock_service = MagicMock()
|
||||||
|
mock_service.executeSQLQuery.side_effect = RuntimeError(
|
||||||
|
"Server raised fault: Column (fkdevice) not found in any table"
|
||||||
|
)
|
||||||
|
client._service = mock_service
|
||||||
|
client._connected_at = 0.0
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError, match="devicenumplanmap"):
|
||||||
|
client.execute_sql_query(
|
||||||
|
"SELECT name FROM numplan WHERE fkdevice IS NOT NULL"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_execute_sql_query_passes_through_unrelated_error(self):
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
from mcaxl.client import AxlClient
|
||||||
|
from mcaxl.cache import AxlCache
|
||||||
|
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
cache = AxlCache(Path(td) / "test.sqlite", default_ttl=0, cluster_id="test")
|
||||||
|
client = AxlClient(cache)
|
||||||
|
mock_service = MagicMock()
|
||||||
|
mock_service.executeSQLQuery.side_effect = RuntimeError(
|
||||||
|
"Some unrelated cluster error"
|
||||||
|
)
|
||||||
|
client._service = mock_service
|
||||||
|
client._connected_at = 0.0
|
||||||
|
|
||||||
|
# Original exception type + message preserved when no hint applies
|
||||||
|
with pytest.raises(RuntimeError, match="Some unrelated cluster error"):
|
||||||
|
client.execute_sql_query("SELECT * FROM device")
|
||||||
94
tests/test_device_grep.py
Normal file
94
tests/test_device_grep.py
Normal file
@ -0,0 +1,94 @@
|
|||||||
|
"""Tests for device_grep — fuzzy device discovery by name/description.
|
||||||
|
|
||||||
|
The "wait there are TWO of these?" finding shape from cucx-docs's
|
||||||
|
ZetaFax-vs-RightFax discovery (msg 001 in
|
||||||
|
axl/agent-threads/cucx-prompt-suggestions/).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from mcaxl.route_plan import device_grep
|
||||||
|
|
||||||
|
|
||||||
|
class FakeAxlClient:
|
||||||
|
def __init__(self, rows):
|
||||||
|
self._rows = rows
|
||||||
|
self.queries = []
|
||||||
|
|
||||||
|
def execute_sql_query(self, sql):
|
||||||
|
self.queries.append(sql)
|
||||||
|
return {"row_count": len(self._rows), "rows": self._rows}
|
||||||
|
|
||||||
|
|
||||||
|
def _make_row(name, description, class_name, device_type="SIP Trunk", pool="DP-1"):
|
||||||
|
return {
|
||||||
|
"name": name,
|
||||||
|
"description": description,
|
||||||
|
"class_name": class_name,
|
||||||
|
"device_type": device_type,
|
||||||
|
"pool_name": pool,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class TestDeviceGrepBasics:
|
||||||
|
def test_groups_by_class(self):
|
||||||
|
rows = [
|
||||||
|
_make_row("RightFax-Trunk", "RightFax inbound", "Trunk"),
|
||||||
|
_make_row("ZetaFax-Trunk", "ZetaFax internal", "Trunk"),
|
||||||
|
_make_row("SEPABCDFAXX01", "RightFax desk test", "Phone", "Cisco 8841"),
|
||||||
|
_make_row("RightFax-RL", "RightFax route list", "Route List"),
|
||||||
|
]
|
||||||
|
client = FakeAxlClient(rows)
|
||||||
|
result = device_grep(client, "FAX")
|
||||||
|
assert result["match_count"] == 4
|
||||||
|
assert set(result["groups"].keys()) == {"Trunk", "Phone", "Route List"}
|
||||||
|
assert len(result["groups"]["Trunk"]) == 2
|
||||||
|
|
||||||
|
def test_class_filter_passed_to_sql(self):
|
||||||
|
client = FakeAxlClient([])
|
||||||
|
device_grep(client, "FAX", classes=["Trunk", "Route List"])
|
||||||
|
# Both class names appear escaped in the SQL IN clause
|
||||||
|
sql = client.queries[0]
|
||||||
|
assert "tc.name IN" in sql
|
||||||
|
assert "'Trunk'" in sql
|
||||||
|
assert "'Route List'" in sql
|
||||||
|
|
||||||
|
def test_no_class_filter_omits_in_clause(self):
|
||||||
|
client = FakeAxlClient([])
|
||||||
|
device_grep(client, "FAX")
|
||||||
|
assert "tc.name IN" not in client.queries[0]
|
||||||
|
|
||||||
|
def test_empty_pattern_raises(self):
|
||||||
|
client = FakeAxlClient([])
|
||||||
|
with pytest.raises(ValueError, match="non-empty"):
|
||||||
|
device_grep(client, "")
|
||||||
|
with pytest.raises(ValueError, match="non-empty"):
|
||||||
|
device_grep(client, " ")
|
||||||
|
|
||||||
|
def test_pattern_quote_escaped(self):
|
||||||
|
client = FakeAxlClient([])
|
||||||
|
device_grep(client, "fake'; DROP TABLE device --")
|
||||||
|
# SQL injection via pattern is escaped (doubled single quotes)
|
||||||
|
assert "fake''" in client.queries[0]
|
||||||
|
|
||||||
|
def test_class_filter_quote_escaped(self):
|
||||||
|
client = FakeAxlClient([])
|
||||||
|
device_grep(client, "FAX", classes=["Phone'; DROP TABLE device --"])
|
||||||
|
assert "Phone''" in client.queries[0]
|
||||||
|
|
||||||
|
def test_unknown_class_grouped_separately(self):
|
||||||
|
# If tkclass enum doesn't resolve (LEFT JOIN miss), class_name is NULL
|
||||||
|
rows = [
|
||||||
|
{"name": "WeirdDevice", "description": "?", "class_name": None,
|
||||||
|
"device_type": None, "pool_name": None},
|
||||||
|
]
|
||||||
|
client = FakeAxlClient(rows)
|
||||||
|
result = device_grep(client, "weird")
|
||||||
|
assert "Unknown" in result["groups"]
|
||||||
|
assert result["groups"]["Unknown"][0]["name"] == "WeirdDevice"
|
||||||
|
|
||||||
|
def test_response_includes_filter_metadata(self):
|
||||||
|
client = FakeAxlClient([])
|
||||||
|
result = device_grep(client, "FAX", classes=["Trunk"])
|
||||||
|
assert result["pattern"] == "FAX"
|
||||||
|
assert result["classes_filter"] == ["Trunk"]
|
||||||
247
tests/test_patterns_targeting.py
Normal file
247
tests/test_patterns_targeting.py
Normal file
@ -0,0 +1,247 @@
|
|||||||
|
"""Tests for patterns_targeting_device + wildcard count/enumerate helpers.
|
||||||
|
|
||||||
|
Single-call tool that answers "what numplan rows directly target this
|
||||||
|
device" — the inverse of `list_route_lists_and_groups`. Direct-target
|
||||||
|
only per cucx-docs Q1 (msg 003 in axl/agent-threads/cucx-prompt-suggestions/).
|
||||||
|
|
||||||
|
Three wildcard-expansion modes per cucx-docs Q2:
|
||||||
|
- False: patterns returned as-is
|
||||||
|
- "count": digit-string estimates per pattern + total
|
||||||
|
- "enumerate": actual digit-string list, only for tightly-bounded patterns
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from mcaxl.route_plan import (
|
||||||
|
_COUNT_CAP,
|
||||||
|
_ENUMERATE_CAP,
|
||||||
|
_count_pattern_digit_strings,
|
||||||
|
_enumerate_pattern_digit_strings,
|
||||||
|
patterns_targeting_device,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class FakeAxlClient:
|
||||||
|
"""Returns canned SQL results — no network, no zeep."""
|
||||||
|
|
||||||
|
def __init__(self, rows: list[dict]):
|
||||||
|
self._rows = rows
|
||||||
|
self.queries: list[str] = []
|
||||||
|
|
||||||
|
def execute_sql_query(self, sql: str) -> dict:
|
||||||
|
self.queries.append(sql)
|
||||||
|
return {"row_count": len(self._rows), "rows": self._rows}
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Wildcard counting (the math layer) ────────────────────────────────
|
||||||
|
|
||||||
|
class TestCountPatternDigitStrings:
|
||||||
|
"""The estimator for how many distinct digit strings a pattern matches."""
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("pattern,expected", [
|
||||||
|
("1234", 1), # all literals
|
||||||
|
("X", 10), # one wildcard digit
|
||||||
|
("XX", 100),
|
||||||
|
("XXX", 1_000),
|
||||||
|
("20878594XX", 100), # the real BMH RightFax block
|
||||||
|
("9.20878594XX", 100), # `.` is a separator, ignored
|
||||||
|
("+12085243000", 1), # E.164 literal
|
||||||
|
("[2-9]XXXXXX", 7_000_000), # NANP-ish, hits cap → returns _COUNT_CAP
|
||||||
|
("[2-9]", 8), # range
|
||||||
|
("[abc]", 3), # set
|
||||||
|
("[0-9]", 10), # equiv to X
|
||||||
|
("[02468]", 5), # explicit even digits
|
||||||
|
])
|
||||||
|
def test_bounded_patterns(self, pattern, expected):
|
||||||
|
count, unbounded, capped = _count_pattern_digit_strings(pattern)
|
||||||
|
assert unbounded is False
|
||||||
|
if expected > _COUNT_CAP:
|
||||||
|
assert count == _COUNT_CAP
|
||||||
|
assert capped is True
|
||||||
|
else:
|
||||||
|
assert count == expected
|
||||||
|
assert capped is False
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("pattern", ["!", "9.!", "9.@", "@", "1!"])
|
||||||
|
def test_unbounded_patterns_flagged(self, pattern):
|
||||||
|
count, unbounded, capped = _count_pattern_digit_strings(pattern)
|
||||||
|
assert count is None
|
||||||
|
assert unbounded is True
|
||||||
|
assert capped is False
|
||||||
|
|
||||||
|
def test_empty_pattern_is_zero(self):
|
||||||
|
count, unbounded, capped = _count_pattern_digit_strings("")
|
||||||
|
assert count == 0
|
||||||
|
assert unbounded is False
|
||||||
|
assert capped is False
|
||||||
|
|
||||||
|
def test_malformed_charclass_is_unbounded(self):
|
||||||
|
# `[abc` with no closing bracket — be conservative
|
||||||
|
count, unbounded, capped = _count_pattern_digit_strings("12[abc")
|
||||||
|
assert count is None
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Wildcard enumeration (the strict-bounds layer) ────────────────────
|
||||||
|
|
||||||
|
class TestEnumeratePatternDigitStrings:
|
||||||
|
"""The strict enumerator — only safe for tightly-bounded patterns."""
|
||||||
|
|
||||||
|
def test_literals_only_returns_self(self):
|
||||||
|
digits, reason = _enumerate_pattern_digit_strings("1234")
|
||||||
|
assert digits == ["1234"]
|
||||||
|
assert reason is None
|
||||||
|
|
||||||
|
def test_single_X_expands_to_ten(self):
|
||||||
|
digits, reason = _enumerate_pattern_digit_strings("12X")
|
||||||
|
assert digits == [f"12{d}" for d in "0123456789"]
|
||||||
|
assert reason is None
|
||||||
|
|
||||||
|
def test_charclass_expands(self):
|
||||||
|
digits, _ = _enumerate_pattern_digit_strings("[2-4]")
|
||||||
|
assert digits == ["2", "3", "4"]
|
||||||
|
|
||||||
|
def test_block_expansion(self):
|
||||||
|
# 100 DIDs in a /XX block
|
||||||
|
digits, _ = _enumerate_pattern_digit_strings("20878594XX")
|
||||||
|
assert len(digits) == 100
|
||||||
|
assert "2087859400" in digits
|
||||||
|
assert "2087859499" in digits
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("pattern,expected_reason_fragment", [
|
||||||
|
("9.20878594XX", "'.'"), # cucx-docs explicit: no `.`
|
||||||
|
("9!", "'!'"), # unbounded
|
||||||
|
("9.@", "'@'"), # `@` is checked before `.` — either reason is valid
|
||||||
|
("@", "'@'"),
|
||||||
|
])
|
||||||
|
def test_unsafe_patterns_skipped(self, pattern, expected_reason_fragment):
|
||||||
|
digits, reason = _enumerate_pattern_digit_strings(pattern)
|
||||||
|
assert digits is None
|
||||||
|
assert expected_reason_fragment in reason
|
||||||
|
|
||||||
|
def test_oversized_pattern_skipped(self):
|
||||||
|
# 4 X's = 10,000 expansions; cap is 1,000
|
||||||
|
digits, reason = _enumerate_pattern_digit_strings("XXXX")
|
||||||
|
assert digits is None
|
||||||
|
assert "exceeds cap" in reason
|
||||||
|
|
||||||
|
|
||||||
|
# ─── The tool itself (integration with FakeAxlClient) ──────────────────
|
||||||
|
|
||||||
|
class TestPatternsTargetingDevice:
|
||||||
|
"""End-to-end through the FakeAxlClient layer."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def rightfax_rows(self):
|
||||||
|
"""Realistic-shape rows: 1 wildcard block + 2 internal carveouts."""
|
||||||
|
base = {
|
||||||
|
"destination_device": "RightFax-RL",
|
||||||
|
"destination_class": "Route List",
|
||||||
|
"xform_mask": None,
|
||||||
|
"prefix": None,
|
||||||
|
"pattern_type": "Route Pattern",
|
||||||
|
}
|
||||||
|
return [
|
||||||
|
{**base, "pattern": "20878594XX", "partition": "External-PT",
|
||||||
|
"description": "RightFax inbound block (100 DIDs)"},
|
||||||
|
{**base, "pattern": "5550100", "partition": "Internal-PT",
|
||||||
|
"description": "Reception → fax internal hop"},
|
||||||
|
{**base, "pattern": "5550101", "partition": "Internal-PT",
|
||||||
|
"description": "Front desk → fax internal hop"},
|
||||||
|
]
|
||||||
|
|
||||||
|
def test_default_returns_patterns_intact(self, rightfax_rows):
|
||||||
|
client = FakeAxlClient(rightfax_rows)
|
||||||
|
result = patterns_targeting_device(client, "RightFax-RL")
|
||||||
|
assert result["device_name"] == "RightFax-RL"
|
||||||
|
assert result["destination_class"] == "Route List"
|
||||||
|
assert result["pattern_count"] == 3
|
||||||
|
# No expansion fields present on default
|
||||||
|
assert "total_digit_count" not in result
|
||||||
|
for p in result["patterns"]:
|
||||||
|
assert "digit_count" not in p
|
||||||
|
assert "enumerated_digits" not in p
|
||||||
|
|
||||||
|
def test_count_mode_per_pattern_and_total(self, rightfax_rows):
|
||||||
|
client = FakeAxlClient(rightfax_rows)
|
||||||
|
result = patterns_targeting_device(
|
||||||
|
client, "RightFax-RL", expand_wildcards="count"
|
||||||
|
)
|
||||||
|
# Per-pattern counts inline (cucx-docs Q2 refinement)
|
||||||
|
counts = {p["pattern"]: p["digit_count"] for p in result["patterns"]}
|
||||||
|
assert counts == {
|
||||||
|
"20878594XX": 100,
|
||||||
|
"5550100": 1,
|
||||||
|
"5550101": 1,
|
||||||
|
}
|
||||||
|
assert result["total_digit_count"] == 102
|
||||||
|
|
||||||
|
def test_count_mode_with_unbounded_pattern(self):
|
||||||
|
rows = [
|
||||||
|
{"pattern": "9.@", "partition": "PSTN-PT", "description": "PSTN catchall",
|
||||||
|
"destination_device": "Gateway-1", "destination_class": "Gateway",
|
||||||
|
"xform_mask": None, "prefix": None, "pattern_type": "Route Pattern"},
|
||||||
|
{"pattern": "5550100", "partition": "Internal-PT", "description": "extension",
|
||||||
|
"destination_device": "Gateway-1", "destination_class": "Gateway",
|
||||||
|
"xform_mask": None, "prefix": None, "pattern_type": "Route Pattern"},
|
||||||
|
]
|
||||||
|
client = FakeAxlClient(rows)
|
||||||
|
result = patterns_targeting_device(client, "Gateway-1", expand_wildcards="count")
|
||||||
|
# Total goes None once any pattern is unbounded — auditor must see this
|
||||||
|
assert result["total_digit_count"] is None
|
||||||
|
unbounded = next(p for p in result["patterns"] if p["pattern"] == "9.@")
|
||||||
|
assert unbounded["unbounded"] is True
|
||||||
|
assert unbounded["digit_count"] is None
|
||||||
|
|
||||||
|
def test_enumerate_mode_for_safe_patterns(self, rightfax_rows):
|
||||||
|
client = FakeAxlClient(rightfax_rows)
|
||||||
|
result = patterns_targeting_device(
|
||||||
|
client, "RightFax-RL", expand_wildcards="enumerate"
|
||||||
|
)
|
||||||
|
block = next(p for p in result["patterns"] if p["pattern"] == "20878594XX")
|
||||||
|
assert len(block["enumerated_digits"]) == 100
|
||||||
|
# The internal-extension carveouts are 1:1 enumerations of themselves
|
||||||
|
carveout = next(p for p in result["patterns"] if p["pattern"] == "5550100")
|
||||||
|
assert carveout["enumerated_digits"] == ["5550100"]
|
||||||
|
|
||||||
|
def test_enumerate_mode_skips_unsafe_with_reason(self):
|
||||||
|
rows = [{
|
||||||
|
"pattern": "9.@", "partition": "PSTN", "description": "PSTN catchall",
|
||||||
|
"destination_device": "GW", "destination_class": "Gateway",
|
||||||
|
"xform_mask": None, "prefix": None, "pattern_type": "Route Pattern",
|
||||||
|
}]
|
||||||
|
client = FakeAxlClient(rows)
|
||||||
|
result = patterns_targeting_device(client, "GW", expand_wildcards="enumerate")
|
||||||
|
skipped = result["patterns"][0]
|
||||||
|
assert skipped["enumerated_digits"] is None
|
||||||
|
# `@` is checked before `.` in the impl; either is a valid reason
|
||||||
|
# for `9.@` since both render the pattern non-enumerable.
|
||||||
|
assert "'@'" in skipped["enumeration_skipped"]
|
||||||
|
|
||||||
|
def test_empty_result_returns_zero_patterns(self):
|
||||||
|
client = FakeAxlClient([])
|
||||||
|
result = patterns_targeting_device(client, "Nonexistent-Device")
|
||||||
|
assert result["pattern_count"] == 0
|
||||||
|
assert result["destination_class"] is None
|
||||||
|
assert result["patterns"] == []
|
||||||
|
|
||||||
|
def test_invalid_expand_wildcards_raises(self):
|
||||||
|
client = FakeAxlClient([])
|
||||||
|
with pytest.raises(ValueError, match="expand_wildcards"):
|
||||||
|
patterns_targeting_device(client, "Foo", expand_wildcards="enumarate") # typo
|
||||||
|
|
||||||
|
def test_device_name_is_quote_escaped(self):
|
||||||
|
# SQL injection attempt via device name; _esc should handle it
|
||||||
|
client = FakeAxlClient([])
|
||||||
|
patterns_targeting_device(client, "fake'; DROP TABLE device --")
|
||||||
|
# The query was issued; double-single-quote indicates proper escaping
|
||||||
|
assert "fake''" in client.queries[0]
|
||||||
|
|
||||||
|
def test_docstring_compose_note_present_in_response(self, rightfax_rows):
|
||||||
|
"""The transitive-reachability composition note (Q1 docstring requirement)
|
||||||
|
must be in the response, so an operator browsing JSON sees it without
|
||||||
|
reading the docstring."""
|
||||||
|
client = FakeAxlClient(rightfax_rows)
|
||||||
|
result = patterns_targeting_device(client, "RightFax-RL")
|
||||||
|
assert "_note" in result
|
||||||
|
assert "translation" in result["_note"].lower()
|
||||||
|
assert "route_translation_chain" in result["_note"]
|
||||||
Loading…
x
Reference in New Issue
Block a user