Comprehensive guide for building MCP-powered chat assistants with SSE streaming, covering both Hamilton Archive (vanilla JS) and SpiceBook (React + Zustand) implementations. Includes Caddy routing patterns, security hardening checklist, and frontend lessons learned.
40 KiB
Reference Architecture: MCP Server + SSE Chat on FastAPI
Pattern for adding an MCP server and a streaming chat assistant to an existing FastAPI application with any frontend framework. First built for the Margaret Hamilton Digital Archive (Starlight + vanilla JS + FastAPI), then adapted for SpiceBook (Astro SSR + React 19 + FastAPI). Both are in production.
Origin Story
The Hamilton Archive needed a chat assistant that could answer questions about Apollo-era documents using RAG (retrieval-augmented generation). The requirements were:
- MCP server — so Claude Code and other MCP clients could query the archive programmatically
- Chat panel — floating widget on all pages, streaming LLM responses via SSE, aware of whatever the user was currently reading (a Starlight page, a PDF in the viewer, etc.)
- RAG pipeline — semantic search → batch SQL fetch → character-budget truncation → LLM completion
This was built as vanilla TypeScript (no framework) because the Hamilton Archive uses Starlight with static output — there's no React, no Zustand, no build-time component hydration. The chat widget is a single 1,125-line .ts file that does manual DOM manipulation, localStorage conversation management, and inline Lucide SVG icon paths.
When the same pattern was needed for SpiceBook, the architecture was adapted:
- Frontend: React 19 with Zustand for state, split across
ChatWidget.tsx+chat-store.ts+chat-api.ts - Context model:
PageContext(title, path, description)→NotebookContext(notebook_id, title, engine)— the domain changed but the shape is identical - RAG function:
_build_context(query)→_build_notebook_context(req)— this is the main customization point between deployments - Caddy routing: per-route
handleblocks → single@api.pathmatcher — simpler but less precise
What stays identical across both projects:
| Component | Identical? | Notes |
|---|---|---|
| SSE event protocol | Yes | status, token, reasoning, error, done |
| SSE client parser | Yes | parseSSEBlock() with \n\n boundary detection |
_sse_event() helper |
Yes | Compact JSON formatting |
| httpx streaming client | Yes | Same timeouts, limits, connection pooling |
_chat_completion_stream() |
Yes | Same SSE line parser for OpenAI-compatible endpoints |
| MCP mounting pattern | Yes | mcp.http_app() + combine_lifespans() + app.mount("/mcp", ...) |
| FastMCP tool conventions | Yes | Return str (JSON), never raise HTTPException |
| Conversation limits | Yes | MAX_CONVERSATIONS=20, MAX_MESSAGES=50 |
| Title derivation | Yes | First user message truncated to ~50-60 chars |
Two Frontend Variants
Variant A: Vanilla TypeScript (Hamilton Archive)
Single file: chat-widget.ts (1,125 lines) — no framework, no build-time hydration, no npm state library.
Entry point:
// ChatWidget.astro (11 lines)
import { initChatWidget } from './chat-widget.ts'
initChatWidget()
document.addEventListener('astro:after-swap', initChatWidget)
State management: Module-scoped variables + direct localStorage:
const STORAGE_KEY_INDEX = 'hamilton-chat-conversations'
const STORAGE_KEY_ACTIVE = 'hamilton-chat-active'
const STORAGE_KEY_PREFIX = 'hamilton-chat-conv-'
const STORAGE_KEY_LEGACY = 'hamilton-chat-history' // flat format, auto-migrated
Storage uses a split architecture: an index array (conversation metadata) stored separately from individual conversation message arrays (STORAGE_KEY_PREFIX + id). This avoids loading all message content when just rendering the history list.
DOM manipulation: data-open/data-view attributes on the widget root element control CSS visibility. Rendering is imperative — renderMessages(), renderHistoryList(), etc. Lucide icons are pasted as inline SVG path strings (no icon library dependency).
Key advantage: Zero JS framework overhead. The widget works in any static site (Starlight, plain HTML, Hugo) because it only needs a <script> tag.
Key disadvantage: All UI logic (event listeners, DOM updates, scroll management, thinking indicators) lives in one file. A conversation switch requires careful manual capture of streamingText before aborting the SSE stream. React's declarative model handles this more cleanly.
Variant B: React + Zustand (SpiceBook)
Three files: ChatWidget.tsx (component), chat-store.ts (state), chat-api.ts (network).
State management: Zustand with persist middleware:
export const useChatStore = create<ChatStore>()(
persist(
(set, get) => ({ /* actions */ }),
{
name: 'spicebook-chat',
partialize: (state) => ({
conversations: state.conversations,
activeConversationId: state.activeConversationId,
}),
},
),
);
The partialize function excludes transient state (panelOpen, streaming) from persistence — only conversation data survives page reloads.
SSE client: Separate async generator in chat-api.ts:
export async function* streamChat(opts: ChatStreamOptions): AsyncGenerator<SSEEvent> {
const resp = await fetch(`${API_BASE}/api/chat/stream`, { ... });
const reader = resp.body?.getReader();
const decoder = new TextDecoder();
let buffer = '';
// ... parse SSE blocks by \n\n boundary
}
Context awareness: Reads from the notebook store (useNotebookStore) to pass NotebookContext to the API:
interface ChatStreamOptions {
question: string;
notebook?: { notebook_id: string; title: string; engine: string } | null;
signal?: AbortSignal;
}
Key advantage: Declarative state updates. appendToLastAssistant(chunk) immutably updates the conversation array — no manual DOM sync needed.
Key disadvantage: Requires React hydration. Uses client:load in Astro, which means the widget JS downloads and executes on every page load.
Backend Architecture
1. MCP Server Module
Both projects use the same layout:
backend/src/myapp/
mcp/
__init__.py # FastMCP singleton + import side-effects
tools.py # @mcp.tool() functions wrapping domain logic
chat.py # (Hamilton only) RAG tool + LLM client
Hamilton registers tools in two files: mcp/tools.py (search, browse, stats) AND mcp/chat.py (the ask_hamilton RAG tool + LLM client). The RAG tool lives alongside the streaming client because they share _build_context() and _chat_completion().
SpiceBook keeps all tools in mcp/tools.py and the LLM client in a separate chat/llm.py module. The chat backend doesn't register any MCP tools — it's purely an HTTP endpoint.
# Hamilton: mcp/__init__.py
import hamilton_search.mcp.chat # Registers ask_hamilton tool
import hamilton_search.mcp.tools # Registers search_archive, get_document, etc.
# SpiceBook: mcp/__init__.py
import spicebook.mcp.tools # Registers list_all_notebooks, simulate_netlist, etc.
Hamilton MCP tools use Literal types for constrained parameters:
ContentType = Literal[
"page", "paper_summary", "source_note", "essay",
"archive", "agc_source", "apollo_context", "agc_highlight",
]
@mcp.tool()
async def search_archive(
query: str,
mode: Literal["hybrid", "semantic", "text"] = "hybrid",
content_type: ContentType | None = None,
limit: int = 10,
) -> str:
SpiceBook tools use plain str parameters for engine names because there are only two options (ngspice, ltspice) and validation happens in the engine factory.
2. Context Building — The Main Customization Point
The _build_context() function is where each deployment diverges. Everything else (SSE framing, LLM streaming, MCP mounting) is reusable.
Hamilton: _build_context(query) — RAG with Semantic Search
async def _build_context(query: str) -> tuple[str, list[dict]]:
"""Search archive and batch-fetch full document bodies for RAG context."""
async with async_session() as db:
# 1. Hybrid search (semantic + text) for top 5 results
output = await search_documents(q=query, db=db, mode="hybrid", limit=5)
# 2. Batch fetch all documents in one SQL query
slugs = [r.slug for r in output.results]
docs_result = await db.execute(
select(Document).where(Document.slug.in_(slugs))
)
docs_by_slug = {doc.slug: doc for doc in docs_result.scalars()}
# 3. Build context string with character-budget truncation
context_parts = []
chars_used = 0
for slug in slugs: # preserve search result ordering
doc = docs_by_slug.get(slug)
remaining = MAX_CONTEXT_CHARS - chars_used # MAX_CONTEXT_CHARS = 2000
if remaining <= 0:
break
body = doc.body[:remaining] + "..." if len(doc.body) > remaining else doc.body
context_parts.append(f"--- {doc.title} (/{doc.slug}) ---\n{body}")
chars_used += len(body)
return "\n\n".join(context_parts), sources
Pattern: Search → batch fetch → budget truncation. The two-phase fetch (search for slugs, then SELECT ... WHERE slug IN (...)) avoids N+1 queries. Character-budget truncation preserves document ordering from search relevance while staying within LLM context limits.
Returns: (context_text, sources_list) — the sources list is forwarded to the frontend as an SSE sources event so the chat widget can render clickable links.
SpiceBook: _build_notebook_context(req) — Notebook Content Extraction
async def _build_notebook_context(req: ChatStreamRequest) -> str:
"""Extract SPICE cells and markdown notes from the notebook."""
if not req.notebook or not req.notebook.notebook_id:
return ""
nb = await asyncio.to_thread(load_notebook, settings.notebook_dir, req.notebook.notebook_id)
if nb is None:
return ""
parts = [f'Notebook: "{nb.metadata.title}" (engine: {nb.metadata.engine})']
for i, cell in enumerate(nb.cells):
if cell.type.value == "spice" and cell.source.strip():
parts.append(f"\n--- SPICE Cell {i + 1} ---\n{cell.source.strip()}")
# Include latest simulation result summary
for output in cell.outputs:
if output.output_type in ("simulation_result", "error"):
if not output.data.get("success") and output.data.get("error"):
parts.append(f" [Simulation error: {output.data['error']}]")
elif output.data.get("success") and output.data.get("waveform"):
wf = output.data["waveform"]
var_names = [v.get("name", "") for v in wf.get("variables", [])]
parts.append(f" [Simulation OK: {wf.get('points')} points, signals: {', '.join(var_names)}]")
break
elif cell.type.value == "markdown" and cell.source.strip():
parts.append(f"\n--- Notes ---\n{cell.source.strip()[:500]}")
return "\n".join(parts)
Pattern: Load → iterate cells → extract domain content. No search step — the user is already viewing a specific notebook, so we load it directly and extract the SPICE netlists plus their latest simulation results. Markdown notes are truncated to 500 chars.
Returns: Just a str — no sources list because there's nothing to cite. The context is the notebook itself.
Async I/O: Note
asyncio.to_thread(load_notebook, ...)—load_notebook()uses synchronouspath.read_text(), which blocks the asyncio event loop. Withoutto_thread(), every chat request briefly freezes all other async handlers (health checks, notebook API, other chat streams). This was caught during live debugging when concurrent requests stalled.
3. Page Context — What the User Is Looking At
Both projects prepend context about what the user is currently viewing to the question string. The shapes differ but the pattern is identical.
Hamilton: PageContext(title, path, description)
class PageContext(BaseModel):
title: str = Field("", max_length=200)
path: str = Field("", max_length=500)
description: str = Field("", max_length=500)
@field_validator("path")
@classmethod
def path_must_be_relative(cls, v: str) -> str:
if v and not v.startswith("/"):
raise ValueError("Path must start with /")
return v
Frontend detection is viewer-aware — Hamilton has a PDF viewer page that exposes window.__hamiltonSourceMap and window.__pdfMetadata. The getPageContext() function reads the current PDF page number from the viewer DOM, appends PDF metadata (author, creation date), and falls back to standard Starlight <main h1> detection:
function getPageContext(): { title: string; path: string; description: string } | null {
if (location.pathname === '/viewer' || location.pathname === '/viewer/') {
// Read from source map + viewer state
const titleWithPage = `${docTitle} (page ${currentPage} of ${total})`;
// ... append PDF metadata (Author, Created, Subject, Pages)
return { title: titleWithPage, path, description };
}
// Standard Starlight page
const title = document.querySelector('main h1')?.textContent?.trim();
return { title, path: location.pathname, description: meta.content };
}
Backend prepends the context as a bracketed string:
if req.page and req.page.title:
page_context = f'[The user is currently reading: "{req.page.title}" ({req.page.path})'
if req.page.description:
page_context += f"\nDocument description: {req.page.description}"
page_context += "]\n\n"
question = page_context + req.question
SpiceBook: NotebookContext(notebook_id, title, engine)
class NotebookContext(BaseModel):
notebook_id: str = Field("", max_length=200)
title: str = Field("", max_length=200)
engine: str = Field("ngspice", max_length=50)
No path validation needed — notebook IDs are used for load_notebook(), which already validates against the filesystem. The backend prepends:
if req.notebook and req.notebook.title:
question = f'[User is viewing notebook: "{nb_title}" ({nb_engine})]\n\n' + req.question
4. System Prompt
Both use /no_think\n as the first line to suppress reasoning on models that support it (e.g., Qwen3). This saves tokens since the chat assistant doesn't need to show its reasoning process.
# Hamilton
SYSTEM_PROMPT = (
"/no_think\n"
"You are a knowledgeable research assistant for the Margaret Hamilton Digital Archive. "
"Answer questions using ONLY the provided context from the archive. "
"If the context doesn't contain enough information, say so clearly. "
"Cite specific documents by title when referencing information. "
"Be precise and factual — never fabricate quotes or claims."
)
# SpiceBook
SYSTEM_PROMPT = (
"/no_think\n"
"You are a circuit simulation assistant integrated into SpiceBook, "
"a notebook environment for SPICE circuit design and simulation. "
"Help users understand circuits, debug netlists, interpret simulation "
"results, and design new circuits. ..."
)
5. SSE Streaming Endpoint
The endpoint structure is identical. Hamilton adds a sources event and has a pre-search status flow; SpiceBook skips straight to streaming.
# Hamilton: yields status → sources → tokens → done
async def generate():
yield _sse_event("status", {"text": "Searching the archive…"})
context, sources = await _build_context(question)
yield _sse_event("status", {"text": f"Found {n} relevant documents…"})
yield _sse_event("sources", sources) # ← Hamilton-specific
async for kind, text in _chat_completion_stream(context, question):
yield _sse_event("reasoning" if kind == "reasoning" else "token", {"text": text})
yield _sse_event("done", {})
# SpiceBook: yields status → tokens → done
async def generate():
context = _build_notebook_context(req)
yield _sse_event("status", {"text": "Analyzing circuit context…"})
async for kind, text in chat_completion_stream(context, question):
yield _sse_event("reasoning" if kind == "reasoning" else "token", {"text": text})
yield _sse_event("done", {})
6. Mount MCP on FastAPI
from contextlib import asynccontextmanager
from fastmcp.utilities.lifespan import combine_lifespans
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
await close_client() # Clean up httpx client
mcp_app = mcp.http_app(path="/", stateless_http=True)
app = FastAPI(
title="MyApp",
lifespan=combine_lifespans(lifespan, mcp_app.lifespan),
)
# Register routers FIRST
app.include_router(chat_router)
app.include_router(other_router)
# Mount MCP LAST (catch-all)
app.mount("/mcp", mcp_app)
Critical ordering:
include_router()beforeapp.mount("/mcp", ...). FastAPI mounts are catch-all — if MCP is mounted first, it swallows routes that share a prefix.
7. Domain Error Decoupling
Domain logic must not raise HTTPException — it breaks MCP tools which don't run through FastAPI's exception handling. Use domain-specific exceptions:
# Domain layer: raises ValueError
def get_engine(name: str) -> Engine:
if name not in ENGINES:
raise UnsupportedEngineError(f"Unsupported: '{name}'")
# HTTP router: converts at boundary
try:
engine = get_engine(req.engine)
except UnsupportedEngineError as exc:
raise HTTPException(status_code=400, detail=str(exc))
# MCP tool: lets ValueError propagate — FastMCP converts to MCP error
@mcp.tool()
async def simulate(netlist: str, engine: str = "ngspice") -> str:
eng = get_engine(engine) # ValueError propagates naturally
Conversation Management Patterns
Hamilton: Manual localStorage with Legacy Migration
Hamilton stores conversations in a split format: an index array of metadata and individual conversation arrays keyed by ID. It also migrates from an older flat format:
function migrateFromLegacy(): void {
const raw = localStorage.getItem(STORAGE_KEY_LEGACY);
if (!raw) return;
// Parse flat message array → create new conversation → save to indexed format
localStorage.removeItem(STORAGE_KEY_LEGACY);
}
Title derivation uses deriveTitle() — takes the first user message, truncates to TITLE_MAX_LENGTH (60 chars), and updates the index entry on save. The title stays "New conversation" until the first saveHistory() call.
Two-click delete with auto-revert:
function handleDelete(id: string): void {
if (pendingDeleteId === id) {
// Second click within 3s — confirm deletion
clearPendingDelete();
performDelete(id);
} else {
// First click — show "Delete?" label, start 3s timer
clearPendingDelete();
pendingDeleteId = id;
pendingDeleteTimer = window.setTimeout(() => {
pendingDeleteId = null;
pendingDeleteTimer = null;
if (viewMode === 'history') renderHistoryList(); // revert UI
}, 3000);
renderHistoryList();
}
}
This prevents accidental deletions without a modal dialog. The 3-second auto-revert means the user doesn't have to click "cancel" if they change their mind.
SpiceBook: Zustand persist Middleware
SpiceBook's useChatStore handles the same patterns declaratively:
createConversation: () => {
const id = generateId();
set((s) => ({
conversations: [conv, ...s.conversations].slice(0, MAX_CONVERSATIONS),
activeConversationId: id,
}));
return id;
},
addUserMessage: (text: string) => {
set((s) => ({
conversations: s.conversations.map((c) => {
if (c.id !== convId) return c;
const title = c.messages.length === 0 ? titleFromQuestion(text) : c.title;
return { ...c, messages: [...c.messages, msg].slice(-MAX_MESSAGES), title, updatedAt: now };
}),
}));
},
Zustand's persist middleware handles serialization and localStorage automatically. The partialize function excludes transient state. Deletion is a simple filter().
SSE Stream State Capture (Hamilton-Specific Edge Case)
Hamilton captures streamingText at widget scope so conversation switches can save partial text before aborting:
// Module-level state (not per-conversation)
let streamingText = '';
let streamingSources: ChatSource[] = [];
let conversationSwitchInProgress = false;
function startNewConversation(): void {
// Capture partial streaming text BEFORE aborting
if (abortController && streamingText) {
messages.push({
role: 'assistant',
text: streamingText,
sources: streamingSources.length ? streamingSources : undefined,
});
streamingText = '';
streamingSources = [];
}
if (abortController) {
conversationSwitchInProgress = true; // suppress error handler
abortController.abort();
abortController = null;
}
saveHistory();
createNewConversation();
messages = [];
}
The conversationSwitchInProgress flag is needed because aborting the SSE stream fires the error handler. Without the flag, the error handler would try to save a partial message to the wrong conversation (the one we just switched away from).
SpiceBook handles this more simply — React's useRef for the AbortController plus Zustand's immutable updates mean the conversation ID is captured in the closure, so there's no risk of writing to the wrong conversation.
Caddy Configuration
Hamilton: Per-Route handle Blocks
Hamilton uses numbered caddy.handle_N labels with independent reverse proxy configs per route:
labels:
caddy: ${PUBLIC_DOMAIN:-hamilton.l.warehack.ing}
# Search API — standard proxy, no SSE
caddy.handle: /api/search*
caddy.handle.0_reverse_proxy: "{{upstreams 8000}}"
# Health endpoint
caddy.handle_1: /health
caddy.handle_1.0_reverse_proxy: "{{upstreams 8000}}"
# MCP endpoint — standard proxy
caddy.handle_2: /mcp*
caddy.handle_2.0_reverse_proxy: "{{upstreams 8000}}"
# Chat streaming — SSE-optimized proxy
caddy.handle_3: /api/chat*
caddy.handle_3.0_reverse_proxy: "{{upstreams 8000}}"
caddy.handle_3.0_reverse_proxy.flush_interval: "-1"
caddy.handle_3.0_reverse_proxy.transport: "http"
caddy.handle_3.0_reverse_proxy.transport.read_timeout: "0"
caddy.handle_3.0_reverse_proxy.transport.write_timeout: "0"
Advantage: SSE streaming labels (flush_interval, read_timeout, write_timeout) only apply to /api/chat*. Non-streaming routes like /api/search* and /mcp* use Caddy's default buffering and timeouts, which is more efficient for short-lived requests.
SpiceBook: Single @api.path Matcher
SpiceBook groups all backend routes under one path matcher:
labels:
caddy: "${SPICEBOOK_DOMAIN:-spicebook.localhost}"
# All backend routes go through one matcher with SSE settings
caddy.@api.path: "/api/* /health /docs /openapi.json /redoc /mcp/*"
caddy.reverse_proxy_0: "@api {{upstreams 8000}}"
caddy.reverse_proxy_0.flush_interval: "-1"
caddy.reverse_proxy_0.transport: "http"
caddy.reverse_proxy_0.transport.read_timeout: "0"
caddy.reverse_proxy_0.transport.write_timeout: "0"
caddy.reverse_proxy_0.stream_timeout: "24h"
caddy.reverse_proxy_0.stream_close_delay: "5s"
Advantage: Simpler — one block instead of four. Adding new API routes just means adding to the path list.
Disadvantage: SSE streaming settings apply to all backend routes, including /health and /docs. The read_timeout: 0 means Caddy will never close idle connections to the health endpoint, which is wasteful (though harmless in practice).
Comparison Table
| Aspect | Hamilton (handle blocks) |
SpiceBook (@api.path matcher) |
|---|---|---|
| Label count | ~12 labels across 4 handles | ~8 labels in 1 matcher |
| SSE scope | Only /api/chat* |
All backend routes |
| Adding routes | New handle_N block |
Append to path list |
| Timeout precision | Per-route control | Blanket settings |
| Complexity | Higher | Lower |
| Recommended for | Multi-protocol backends (SSE + gRPC + REST) | Simple REST + SSE backends |
SSE-Required Labels (Both Approaches)
These labels are mandatory for SSE streaming through Caddy. Without them, Caddy buffers responses and/or times out long-lived connections:
caddy.reverse_proxy.flush_interval: "-1" # Disable response buffering
caddy.reverse_proxy.transport: "http"
caddy.reverse_proxy.transport.read_timeout: "0" # No read timeout
caddy.reverse_proxy.transport.write_timeout: "0" # No write timeout
caddy.reverse_proxy.stream_timeout: "24h" # WebSocket/SSE lifetime
caddy.reverse_proxy.stream_close_delay: "5s" # Graceful close on reload
Also set these headers on the StreamingResponse:
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Disable nginx/proxy buffering
},
)
SSE Event Protocol
Both projects use this consistent event protocol:
| Event | Payload | Meaning |
|---|---|---|
status |
{"text": "..."} |
Status message (e.g., "Thinking...", "Searching the archive...") |
sources |
[{title, slug, url, score}] |
Hamilton only: search results for citation links |
token |
{"text": "..."} |
Content token from the LLM |
reasoning |
{"text": "..."} |
Reasoning/thinking token (if model supports it) |
error |
{"text": "..."} |
Error message to display to user |
done |
{} |
Stream complete |
SSE formatting: Use json.dumps(data, separators=(",",":")) (compact, no spaces) to prevent newline fragility in SSE framing. A stray \n in the JSON payload would split the SSE block.
Security Hardening
These were identified during Apollo code review of the Hamilton Archive and applied retroactively. Each fix below references the original vulnerability.
1. HTTP Client Race Condition
Hamilton vulnerability: _get_chat_client() was a sync function with no lock:
# VULNERABLE (Hamilton original)
def _get_chat_client() -> httpx.AsyncClient:
global _chat_client
if _chat_client is None or _chat_client.is_closed:
_chat_client = httpx.AsyncClient(...) # race condition here
return _chat_client
Two concurrent requests could both see _chat_client is None, both create a new client, and the first client would be leaked (never closed). Under load this causes connection pool exhaustion.
Fix: Use asyncio.Lock() with double-checked locking:
# FIXED (SpiceBook, applied back to reference)
_client_lock = asyncio.Lock()
async def _get_chat_client() -> httpx.AsyncClient:
global _chat_client
if _chat_client is not None and not _chat_client.is_closed:
return _chat_client
async with _client_lock:
if _chat_client is not None and not _chat_client.is_closed:
return _chat_client
_chat_client = httpx.AsyncClient(...)
return _chat_client
The double-check avoids acquiring the lock on every call — only the first caller (or after client closure) takes the lock.
2. Streaming Error Bodies
Hamilton vulnerability: Used resp.raise_for_status() inside client.stream() context:
# VULNERABLE (Hamilton original)
async with client.stream("POST", url, ...) as resp:
resp.raise_for_status() # ← error body is empty in streaming context
Inside a streaming context, the response body hasn't been read yet. raise_for_status() creates an HTTPStatusError with an empty body, making it impossible to diagnose the upstream error.
Fix: Read the error body explicitly before raising:
# FIXED (SpiceBook)
async with client.stream("POST", url, ...) as resp:
if resp.status_code >= 400:
body = await resp.aread()
error_text = body[:500].decode("utf-8", errors="replace")
logger.error("LLM gateway returned %d: %s", resp.status_code, error_text)
raise httpx.HTTPStatusError(
f"LLM gateway error {resp.status_code}",
request=resp.request,
response=resp,
)
3. Bare Exception Handling in Stream
Hamilton vulnerability: Used bare except Exception in the streaming endpoint:
# VULNERABLE (Hamilton original)
try:
async for kind, text in _chat_completion_stream(context, question):
yield _sse_event(...)
except Exception:
logger.exception("Chat stream completion failed")
yield _sse_event("error", {"text": "Chat completion unavailable"})
This catches asyncio.CancelledError (a BaseException subclass in Python 3.9+, but still caught by careless patterns), KeyboardInterrupt, and other exceptions that should propagate. It also swallows the traceback context for debugging.
Fix: Catch specific httpx exceptions:
# FIXED (SpiceBook)
try:
async for kind, text in chat_completion_stream(context, question):
yield _sse_event(...)
except (
httpx.HTTPStatusError,
httpx.ConnectError,
httpx.ReadTimeout,
httpx.PoolTimeout,
httpx.ConnectTimeout,
) as exc:
logger.warning("Chat stream failed: %s", exc)
yield _sse_event("error", {"text": "Chat service unavailable"})
return
except asyncio.CancelledError:
logger.debug("Chat stream cancelled by client disconnect")
return
4. Path Traversal Protection
Hamilton context: Not directly vulnerable because it uses SQL (slugs are database lookups, not file paths). But filesystem-based apps like SpiceBook that construct paths from user IDs need validation:
import re
_SAFE_ID_RE = re.compile(r"^[a-z0-9][a-z0-9\-]{0,198}[a-z0-9]$")
def validate_item_id(item_id: str) -> str:
if not _SAFE_ID_RE.match(item_id):
raise ValueError(f"Invalid item ID: {item_id!r}")
return item_id
5. Compact SSE JSON
Use json.dumps(data, separators=(",",":")) in _sse_event() to prevent newline fragility. Hamilton's original used default separators (", ", ": ") which are fine for most payloads but can introduce visual confusion in debugging.
Security Checklist
- HTTP client init:
asyncio.Lock()with double-checked locking for lazy singleton - Streaming errors:
resp.aread()before raising on HTTP errors insideclient.stream() - Specific exceptions: Catch named
httpx.*exceptions, not bareException - CancelledError handling: Explicit
except asyncio.CancelledErrorin SSE generators - Path traversal: Validate user-provided IDs with regex before constructing file paths
- Error decoupling: Domain logic raises
ValueError, notHTTPException - Compact SSE JSON:
json.dumps(data, separators=(",",":"))to prevent newline fragility - SSE response headers:
Cache-Control: no-cache+X-Accel-Buffering: no - Input validation:
max_lengthon all string fields in Pydantic models - Shutdown cleanup:
close_client()in lifespan shutdown to drain connection pool
Frontend Lessons Learned
1. Markdown Rendering: Use a Library, Not Regex
LLM responses contain unpredictable markdown: headers, nested lists, tables, blockquotes, fenced code blocks with language tags, horizontal rules. Hand-rolled regex cannot handle all of this.
Hamilton's approach — lightweight regex in renderMarkdown():
// Hamilton: handles bold, italic, inline code, links, line breaks
function renderMarkdown(text: string): string {
let html = escapeHtml(text);
html = html.replace(/\*\*(.+?)\*\*/g, '<strong>$1</strong>');
html = html.replace(/(?<!\*)\*([^*]+?)\*(?!\*)/g, '<em>$1</em>');
html = html.replace(/`([^`]+?)`/g, '<code>$1</code>');
// ... links, newlines
return html;
}
This works for the Hamilton Archive because its RAG context produces shorter, less complex responses. It fails badly for SpiceBook where the LLM generates full tutorials with headers, numbered lists, tables, and code blocks.
SpiceBook's approach — marked + DOMPurify:
import { marked } from 'marked';
import DOMPurify from 'dompurify';
marked.setOptions({ breaks: true, gfm: true });
function renderMarkdown(text: string): string {
const raw = marked.parse(text, { async: false }) as string;
return DOMPurify.sanitize(raw, { ADD_ATTR: ['target'] });
}
Why both layers: marked handles all GFM syntax (tables, task lists, fenced code). DOMPurify strips XSS vectors from the HTML output — critical because the result goes into dangerouslySetInnerHTML. The ADD_ATTR: ['target'] preserves target="_blank" on links.
Cost: marked is ~13 KB gzipped with full GFM support. Worth it for any chat widget where LLM output is unpredictable.
Recommendation: Start with marked + DOMPurify in new projects. Drop to regex only if you control the LLM output format (e.g., RAG with structured templates).
2. React 19 Automatic Batching Breaks SSE Streaming
React 19's automatic batching coalesces all setState calls within an async function into a single render. When the for await...of SSE loop processes many tokens from one reader.read() chunk, React defers rendering until the loop yields to the event loop — which means the user sees nothing until the entire stream finishes.
The symptom: Chat shows the first partial render (from the initial SSE chunk boundary), then freezes, then shows everything at once when the stream ends.
Root cause in React 19:
// This produces ONE render at stream end, not N renders per token:
for await (const evt of streamChat({ question })) {
if (evt.event === 'token') {
appendToLastAssistant(evt.data.text); // setState call
// React batches this ↑ — no re-render happens here
}
}
// React renders ONCE here, after the loop exits
Fix — requestAnimationFrame token batching:
Accumulate tokens in a useRef (no re-render per token), then flush to Zustand state once per animation frame (~60fps). This gives smooth incremental streaming without overwhelming React:
// Refs (no re-render on write)
const pendingTokensRef = useRef('');
const flushRafRef = useRef(0);
// Inside the SSE event loop:
case 'token':
pendingTokensRef.current += evt.data.text;
if (!flushRafRef.current) {
flushRafRef.current = requestAnimationFrame(() => {
if (pendingTokensRef.current) {
appendToLastAssistant(pendingTokensRef.current);
pendingTokensRef.current = '';
}
flushRafRef.current = 0;
});
}
break;
// In the finally block — flush remaining buffered tokens:
finally {
if (flushRafRef.current) {
cancelAnimationFrame(flushRafRef.current);
flushRafRef.current = 0;
}
if (pendingTokensRef.current) {
appendToLastAssistant(pendingTokensRef.current);
pendingTokensRef.current = '';
}
setStreaming(false);
}
Why this works: requestAnimationFrame fires once per display frame (~16ms at 60fps). Multiple tokens arriving within one frame get concatenated in the ref and flushed as a single appendToLastAssistant call — which triggers exactly one React render per frame. The browser gets to paint between frames, so the user sees smooth incremental text.
Why useRef instead of useState: Writing to a ref doesn't trigger a render. If we used useState for the accumulator, we'd be back to the same batching problem.
Hamilton doesn't need this because its vanilla JS sendQuestion() function writes directly to bubble.innerHTML — no framework batching layer.
3. Streaming Verification Checklist
When SSE streaming appears broken, check each layer in order. The issue is usually at exactly one layer:
| Layer | Check | Symptom if broken |
|---|---|---|
| GPU gateway | curl -N $GPU_BASE_URL/chat/completions with "stream": true |
No tokens arrive at all |
| Backend httpx | print() inside aiter_lines() loop |
Tokens arrive at backend but not forwarded |
| Backend sync I/O | Check for path.read_text(), open().read() in async functions |
First token delayed by seconds; concurrent requests stall |
| FastAPI StreamingResponse | curl -N http://localhost:8099/api/chat/stream |
Tokens stream from backend but not through proxy |
| Caddy proxy | Check flush_interval: "-1" label |
All tokens arrive at once when stream ends |
| Frontend fetch/reader | console.log() in reader.read() loop |
Tokens arrive in browser but UI doesn't update |
| React rendering | Check for RAF batching pattern | UI updates once at stream end (React 19 batching) |
The asyncio.to_thread() gotcha: Any synchronous I/O in an async def function blocks the entire event loop. This doesn't just delay the current request — it freezes all concurrent async handlers (health checks, other chat streams, notebook API). Wrap sync I/O with asyncio.to_thread():
# BROKEN: blocks event loop during file read
async def _build_notebook_context(req):
nb = load_notebook(settings.notebook_dir, req.notebook.notebook_id) # sync!
# FIXED: runs sync I/O in a thread pool worker
async def _build_notebook_context(req):
nb = await asyncio.to_thread(load_notebook, settings.notebook_dir, req.notebook.notebook_id)
Configuration
Environment Variables
# GPU LLM gateway
GPU_API_KEY=your-api-key
GPU_BASE_URL=https://your-app.gpu.supported.systems/v1
GPU_CHAT_MODEL=qwen3
CHAT_MAX_TOKENS=8192
Docker Compose: environment vs env_file
env_file passes values literally from the .env file — no variable interpolation. The environment section supports ${VAR} interpolation from the host environment and the .env file.
services:
frontend:
env_file: .env # Passes GPU_API_KEY=abc123 literally
environment:
# Interpolates SPICEBOOK_DOMAIN from .env, with fallback
- PUBLIC_API_URL=https://${SPICEBOOK_DOMAIN:-localhost:4321}
This matters when a build-time variable (like PUBLIC_API_URL) needs to be constructed from a runtime variable (like SPICEBOOK_DOMAIN). You can't do string interpolation inside env_file — you need the environment section.
Dependencies
Add to pyproject.toml:
dependencies = [
"fastmcp>=3.0.0",
"httpx>=0.28.0",
]
File Summary Template
| File | Purpose |
|---|---|
backend/src/myapp/mcp/__init__.py |
FastMCP singleton + tool import side-effects |
backend/src/myapp/mcp/tools.py |
MCP tool definitions wrapping domain logic |
backend/src/myapp/chat/llm.py |
httpx LLM client with connection pooling + streaming |
backend/src/myapp/models/chat.py |
Pydantic request models (ViewContext, ChatStreamRequest) |
backend/src/myapp/routers/chat.py |
SSE streaming endpoint + context builder |
backend/src/myapp/main.py |
Lifespan + MCP mount + router registration |
frontend/src/lib/chat-api.ts |
SSE async generator client |
frontend/src/lib/chat-store.ts |
Zustand store with localStorage persistence |
frontend/src/components/chat/ChatWidget.tsx |
React floating chat panel (SpiceBook) |
frontend/src/components/chat-widget.ts |
Vanilla TS chat widget (Hamilton) |
docker-compose.yml |
Caddy labels for MCP + SSE routing |
.env |
GPU gateway config |
Verification Commands
# 1. MCP protocol test
curl -X POST http://localhost:8099/mcp \
-H 'Content-Type: application/json'
# 2. Chat SSE test
curl -N -X POST http://localhost:8099/api/chat/stream \
-H 'Content-Type: application/json' \
-d '{"question":"Hello, what can you help with?"}'
# 3. Register MCP server with Claude Code
claude mcp add myapp-local -- \
uv run --directory /path/to/backend myapp
# 4. Test MCP tools via Claude Code
claude -p "List all items" \
--mcp-config .mcp.json \
--allowedTools "mcp__myapp-local__*"