mcaxl/tests/test_cache.py
Ryan Malloy dee5fdacda Hamilton review fixes: validator literal preservation, cache cluster id, CSS impact partial-failure reporting
Three findings from a margaret-hamilton-style review of the MCP server,
fixed with regression tests written first (red → green). One bonus
finding (huntpilotqueue column name) was surfaced by the third fix
itself — exactly the audit-trust failure mode that fix exists to expose.

CRITICAL #1 — sql_validator: comment-strip mutated string literals.

The cleaned query returned by validate_select() is what travels to AXL.
Previously, the comment-strip pass ran before the literal-aware pass,
so `--` or `/* */` markers inside a string literal were silently eaten:

  input:  WHERE description = 'Smith -- old line'
  to AXL: WHERE description = 'Smith    (truncated mid-literal)

The LLM saw rows that looked plausible but were not what its query
asked for. "Confidently wrong" is exactly the failure mode the review
was hunting.

Fix: only strip comments on the analysis-only copy used for keyword
detection. The cleaned output preserves the input verbatim (modulo
trailing semicolon and outer whitespace). 6 new tests covering literal
preservation across `--`, `/* */`, LIKE patterns with embedded comment
markers, and forbidden keywords inside real comments.

CRITICAL #2 — cache key omitted cluster identity.

The on-disk cache key was `method::args_json`. An operator swapping
AXL_URL between test and prod (or between two clusters) would silently
serve stale data from cluster A as if from cluster B. The audit
report would be confidently wrong with no signal anything happened.

Fix: AxlCache now takes cluster_id and prefixes all keys with it.
Server bootstrap derives cluster_id as a 12-char SHA-256 prefix of
AXL_URL. cache_stats() surfaces both the current cluster_id and a
`foreign_cluster_entries` count so an env-swap is visible. Schema
migration handles pre-fix cache files via PRAGMA table_info introspection
plus a one-shot ALTER TABLE ADD COLUMN. 5 new tests covering isolation,
shared-id sharing, stats reporting, legacy DB upgrade, and per-cluster
clear() scoping.

MAJOR #3 — find_devices_using_css summary undercounted partial failures.

The function is per-category resilient (one failed query doesn't kill
the whole impact analysis), but the resilience never propagated up to
the response. total_returned and any_truncated only reflected SUCCESSFUL
categories. An LLM consuming "47 references" had no way to know 5
categories errored and the real number was likely much higher.

Fix: response now includes complete: bool, categories_with_errors: int,
and error_categories: [list]. The LLM/auditor sees the partial-failure
state and can decide whether to act on incomplete data. 5 new tests
using a FakeAxlClient stand-in to simulate per-category failures.

BONUS finding (uncovered by Major #3 fix): huntpilotqueue join used
the wrong column. Three CSS impact categories (huntpilot_max_wait_css,
huntpilot_no_agent_css, huntpilot_queue_full_css) were silently
erroring with "Column (fknumplan) not found" because huntpilotqueue
joins via fknumplan_pilot, not fknumplan. With the Major #3 fix in
place, this surfaced immediately as `complete: False, error_categories:
[3 huntpilot_*]` against the live cluster. Fixed inline; live re-run
now reports `complete: True, total_returned: 163` for Internal-CSS.

87 unit tests passing (up from 70). Live cluster smoke test
(cucm-pub.binghammemorial.org, CUCM 15.0.1.12900-234) verifies all
three fixes plus the bonus finding work end-to-end.
2026-04-25 23:09:55 -06:00

180 lines
6.6 KiB
Python

"""Tests for the SQLite TTL cache."""
import time
from pathlib import Path
import pytest
from mcp_cucm_axl.cache import AxlCache
@pytest.fixture
def cache(tmp_path: Path) -> AxlCache:
return AxlCache(tmp_path / "test.sqlite", default_ttl=60)
def test_set_and_get(cache: AxlCache):
cache.set("getCCMVersion", {}, {"version": "15.0.1"})
assert cache.get("getCCMVersion", {}) == {"version": "15.0.1"}
def test_miss_returns_none(cache: AxlCache):
assert cache.get("nonexistent", {"x": 1}) is None
def test_kwargs_order_independent(cache: AxlCache):
cache.set("listPhone", {"name": "SEP1", "limit": 10}, {"rows": ["p1"]})
# Different order should still hit
assert cache.get("listPhone", {"limit": 10, "name": "SEP1"}) == {"rows": ["p1"]}
def test_different_args_different_keys(cache: AxlCache):
cache.set("listPhone", {"name": "SEP1"}, {"rows": ["a"]})
cache.set("listPhone", {"name": "SEP2"}, {"rows": ["b"]})
assert cache.get("listPhone", {"name": "SEP1"}) == {"rows": ["a"]}
assert cache.get("listPhone", {"name": "SEP2"}) == {"rows": ["b"]}
def test_expired_entries_not_returned(tmp_path: Path):
c = AxlCache(tmp_path / "ttl.sqlite", default_ttl=60)
c.set("foo", {}, {"x": 1}, ttl=1)
time.sleep(1.1)
assert c.get("foo", {}) is None
def test_ttl_zero_disables_caching(tmp_path: Path):
c = AxlCache(tmp_path / "off.sqlite", default_ttl=0)
c.set("foo", {}, {"x": 1})
# default_ttl=0 means writes are no-ops
assert c.get("foo", {}) is None
def test_stats_reports_breakdown(cache: AxlCache):
cache.set("listPhone", {}, {"x": 1})
cache.set("listPhone", {"a": 1}, {"x": 2})
cache.set("getCCMVersion", {}, {"v": "15"})
stats = cache.stats()
assert stats["live_entries"] == 3
assert stats["by_method"]["listPhone"] == 2
assert stats["by_method"]["getCCMVersion"] == 1
def test_clear_all(cache: AxlCache):
cache.set("a", {}, "x")
cache.set("b", {}, "y")
deleted = cache.clear()
assert deleted == 2
assert cache.stats()["live_entries"] == 0
def test_clear_by_pattern(cache: AxlCache):
cache.set("listPhone", {}, "p")
cache.set("listLine", {}, "l")
cache.set("getCCMVersion", {}, "v")
deleted = cache.clear("list*")
assert deleted == 2
assert cache.get("getCCMVersion", {}) == "v"
def test_purge_expired(tmp_path: Path):
c = AxlCache(tmp_path / "p.sqlite", default_ttl=60)
c.set("a", {}, "x", ttl=1)
c.set("b", {}, "y", ttl=60)
time.sleep(1.1)
purged = c.purge_expired()
assert purged == 1
assert c.stats()["live_entries"] == 1
class TestClusterIsolation:
"""Hamilton review CRITICAL #2: cache key omitted cluster identity.
Prior to the fix, `AXL_URL` swap (test → prod, or one cluster to another)
served stale results from cluster A as if from cluster B. The cache
couldn't tell the data came from a different mission. Now each cache
handle is bound to a cluster_id, and entries from a different cluster
must miss.
"""
def test_different_cluster_ids_isolate_get(self, tmp_path: Path):
# Both caches point at the same DB file, but bound to different
# cluster IDs. A's writes must not be visible to B.
db = tmp_path / "shared.sqlite"
a = AxlCache(db, default_ttl=60, cluster_id="cluster-A")
b = AxlCache(db, default_ttl=60, cluster_id="cluster-B")
a.set("getCCMVersion", {}, {"version": "12.5"})
assert a.get("getCCMVersion", {}) == {"version": "12.5"}
assert b.get("getCCMVersion", {}) is None, (
"cluster-B must not see cluster-A's cached value"
)
def test_same_cluster_id_shares_cache(self, tmp_path: Path):
# Two handles with the SAME cluster_id should share results.
db = tmp_path / "shared.sqlite"
a = AxlCache(db, default_ttl=60, cluster_id="cluster-X")
a.set("listPhone", {"name": "SEP1"}, {"rows": ["one"]})
b = AxlCache(db, default_ttl=60, cluster_id="cluster-X")
assert b.get("listPhone", {"name": "SEP1"}) == {"rows": ["one"]}
def test_cluster_id_in_stats(self, tmp_path: Path):
c = AxlCache(tmp_path / "s.sqlite", default_ttl=60, cluster_id="cluster-Y")
c.set("getCCMVersion", {}, {"v": "15"})
stats = c.stats()
assert stats.get("cluster_id") == "cluster-Y", (
"stats must surface cluster_id so operators can verify which cluster they're caching"
)
def test_no_cluster_id_still_works_legacy(self, tmp_path: Path):
# Backward compat: no cluster_id keeps the old (but now risky) shape.
# The cache still functions; we just don't get isolation.
c = AxlCache(tmp_path / "legacy.sqlite", default_ttl=60)
c.set("x", {}, "y")
assert c.get("x", {}) == "y"
def test_clear_only_affects_current_cluster(self, tmp_path: Path):
db = tmp_path / "shared.sqlite"
a = AxlCache(db, default_ttl=60, cluster_id="cluster-A")
b = AxlCache(db, default_ttl=60, cluster_id="cluster-B")
a.set("x", {}, "from-A")
b.set("x", {}, "from-B")
deleted = a.clear()
assert deleted == 1, "clear() must only affect this cluster's entries"
assert b.get("x", {}) == "from-B", "cluster-B's entry must survive A's clear"
def test_migrate_legacy_database(self, tmp_path: Path):
"""A cache database created before the cluster_id fix must
upgrade transparently — no `no such column` error on next INSERT.
"""
import sqlite3
db = tmp_path / "legacy.sqlite"
# Manually create the OLD schema (no cluster_id column)
conn = sqlite3.connect(db)
conn.executescript(
"""
CREATE TABLE axl_cache (
cache_key TEXT PRIMARY KEY,
method TEXT NOT NULL,
args_json TEXT NOT NULL,
result_json TEXT NOT NULL,
created_at REAL NOT NULL,
expires_at REAL NOT NULL
);
INSERT INTO axl_cache VALUES
('legacy-key', 'oldMethod', '{}', '"old-value"', 0, 9999999999);
"""
)
conn.commit()
conn.close()
# Open with the new code — must not raise, must add the column
c = AxlCache(db, default_ttl=60, cluster_id="new-cluster")
# The new client should NOT see the legacy entry (it has no cluster_id)
# — this is the cautious behavior; legacy entries are isolated to the
# "unknown cluster" bucket.
assert c.get("oldMethod", {}) is None
# And it must be able to write/read its own entries
c.set("newMethod", {"a": 1}, "new-value")
assert c.get("newMethod", {"a": 1}) == "new-value"