Initial mcarchive-org MCP server

FastMCP server wrapping archive.org's public read APIs:
- search_items / scrape_items: advanced search + bulk cursor pagination
- get_item_metadata / list_files: progressive disclosure with filtering
- get_file_url / download_file: canonical URLs and streaming downloads
  with HTTP Range resume + optional MD5 verification

Smoke-tested end-to-end via claude -p headless MCP and pytest against
live archive.org endpoints.
This commit is contained in:
Ryan Malloy 2026-04-21 09:41:20 -06:00
commit 5265a6440b
10 changed files with 2295 additions and 0 deletions

14
.gitignore vendored Normal file
View File

@ -0,0 +1,14 @@
__pycache__/
*.py[cod]
*.egg-info/
.venv/
.ruff_cache/
.pytest_cache/
dist/
build/
.mypy_cache/
*.log
# downloads from test runs
downloads/
tmp/

73
README.md Normal file
View File

@ -0,0 +1,73 @@
# mcarchive-org
An MCP (Model Context Protocol) server that lets an LLM search, inspect, and download content from the [Internet Archive](https://archive.org).
Built on [FastMCP](https://gofastmcp.com) + [httpx](https://www.python-httpx.org/). No API key required — archive.org's read endpoints are public.
## Tools
| Tool | Purpose |
|------|---------|
| `search_items` | Small Solr-style search via `advancedsearch.php` (1200 rows, paginated) |
| `scrape_items` | Bulk cursor-paginated search via Scrape API (count ≥ 100) |
| `get_item_metadata` | Metadata for one item; skips the (possibly huge) files list by default |
| `list_files` | Files array with optional format / glob filtering — includes `download_url` per file |
| `get_file_url` | Build a canonical download URL without hitting the network |
| `download_file` | Stream a file to disk with resume support and optional MD5 verification |
Also exposes an MCP resource template: `archive://item/{identifier}`.
## Install & run
```bash
# From a checkout:
uv sync
uv run mcarchive-org
# Or from PyPI (once published):
uvx mcarchive-org
```
Register with Claude Code:
```bash
claude mcp add archive-org -- uvx mcarchive-org
# or, from a local checkout:
claude mcp add archive-org -- uv run --directory /path/to/mcarchive-org mcarchive-org
```
## Environment
| Variable | Default | Purpose |
|----------|---------|---------|
| `MCARCHIVE_DOWNLOAD_ROOT` | `./downloads` | Base directory for `download_file` |
## Example flow
```
search_items(query='mediatype:audio AND creator:"Grateful Dead"', sort=['downloads desc'])
→ identifier 'gd77-05-08.sbd.hicks.4982.sbeok.shnf' (among others)
list_files(identifier='gd77-05-08.sbd.hicks.4982.sbeok.shnf', formats=['VBR MP3'])
→ [{ name: 'gd1977-05-08d1t01.mp3', size: 6342912, md5: '…', download_url: '…' }, …]
download_file(identifier='gd77-…', filename='gd1977-05-08d1t01.mp3', verify_md5='…')
→ { path: './downloads/gd77-…/gd1977-…mp3', bytes: 6342912, md5_ok: True }
```
## Query syntax notes
archive.org uses a Solr/Lucene dialect:
- `mediatype:(audio OR movies)` — restrict to media types
- `collection:etree` — items in a specific collection
- `date:[1977-01-01 TO 1977-12-31]` — date ranges
- `creator:"Grateful Dead"` — phrase match
- `-subject:bootleg` — exclusion
- Sort by `downloads desc`, `date asc`, `addeddate desc`, etc.
See [archive.org's search docs](https://archive.org/advancedsearch.php) for the full grammar.
## License
MIT

54
pyproject.toml Normal file
View File

@ -0,0 +1,54 @@
[project]
name = "mcarchive-org"
version = "2026.04.21"
description = "MCP server for searching and downloading files from the Internet Archive (archive.org)"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "MIT" }
authors = [
{ name = "Ryan Malloy", email = "ryan@supported.systems" },
]
keywords = ["mcp", "archive.org", "internet-archive", "fastmcp", "llm"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Internet :: WWW/HTTP",
]
dependencies = [
"fastmcp>=3.2.4",
"httpx>=0.28.1",
]
[project.scripts]
mcarchive-org = "mcarchive_org.server:main"
[project.urls]
Homepage = "https://archive.org/developers/"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/mcarchive_org"]
[tool.ruff]
line-length = 100
target-version = "py310"
[tool.ruff.lint]
select = ["E", "F", "W", "I", "UP", "B", "SIM", "RUF"]
ignore = ["E501"]
[dependency-groups]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.23",
"ruff>=0.5",
]

View File

@ -0,0 +1,8 @@
"""MCP server for the Internet Archive (archive.org)."""
from importlib.metadata import PackageNotFoundError, version
try:
__version__ = version("mcarchive-org")
except PackageNotFoundError:
__version__ = "0.0.0"

View File

@ -0,0 +1,4 @@
from mcarchive_org.server import main
if __name__ == "__main__":
main()

196
src/mcarchive_org/client.py Normal file
View File

@ -0,0 +1,196 @@
"""Low-level archive.org HTTP client (pure httpx, no MCP dependencies)."""
from __future__ import annotations
import hashlib
from collections.abc import AsyncIterator
from pathlib import Path
from typing import Any
import httpx
ARCHIVE_BASE = "https://archive.org"
DEFAULT_UA = "mcarchive-org/2026.04.21 (+https://archive.org/developers/)"
DEFAULT_TIMEOUT = httpx.Timeout(30.0, read=60.0)
class ArchiveError(RuntimeError):
"""Raised when archive.org returns an error payload or unexpected status."""
class ArchiveClient:
"""Async client for the three archive.org endpoints we care about.
- advancedsearch.php : small Solr-style queries (<= ~10,000 rows paginated)
- services/search/v1/scrape : bulk cursor-based iteration (count >= 100)
- metadata/{id} : full item manifest including files[]
- download/{id}/{file} : byte stream with Range support
"""
def __init__(
self,
base_url: str = ARCHIVE_BASE,
user_agent: str = DEFAULT_UA,
timeout: httpx.Timeout | float = DEFAULT_TIMEOUT,
) -> None:
self._base = base_url.rstrip("/")
self._client = httpx.AsyncClient(
headers={"User-Agent": user_agent, "Accept": "application/json"},
timeout=timeout,
follow_redirects=True,
)
async def aclose(self) -> None:
await self._client.aclose()
async def __aenter__(self) -> ArchiveClient:
return self
async def __aexit__(self, *exc: object) -> None:
await self.aclose()
# ---------- search ----------
async def search(
self,
query: str,
fields: list[str] | None = None,
sort: list[str] | None = None,
rows: int = 25,
page: int = 1,
) -> dict[str, Any]:
"""Advanced search — best for small result sets (<=10k total)."""
params: list[tuple[str, str]] = [
("q", query),
("output", "json"),
("rows", str(rows)),
("page", str(page)),
]
for f in fields or ["identifier", "title", "mediatype", "creator", "date"]:
params.append(("fl[]", f))
for s in sort or []:
params.append(("sort[]", s))
r = await self._client.get(f"{self._base}/advancedsearch.php", params=params)
r.raise_for_status()
data = r.json()
resp = data.get("response", {})
return {
"num_found": resp.get("numFound", 0),
"start": resp.get("start", 0),
"page": page,
"rows": rows,
"docs": resp.get("docs", []),
}
async def scrape(
self,
query: str,
fields: list[str] | None = None,
sorts: list[str] | None = None,
count: int = 100,
cursor: str | None = None,
) -> dict[str, Any]:
"""Scrape API — cursor-paginated; count must be >= 100."""
if count < 100:
raise ValueError("scrape count must be >= 100; use search() for smaller queries")
params: dict[str, str] = {"q": query, "count": str(count)}
if fields:
params["fields"] = ",".join(fields)
if sorts:
params["sorts"] = ",".join(sorts)
if cursor:
params["cursor"] = cursor
r = await self._client.get(f"{self._base}/services/search/v1/scrape", params=params)
r.raise_for_status()
data = r.json()
if "error" in data:
raise ArchiveError(f"{data.get('errorType', 'ScrapeError')}: {data['error']}")
return data # keys: items, count, total, cursor (if more pages)
# ---------- metadata ----------
async def metadata(self, identifier: str) -> dict[str, Any]:
"""Full metadata blob for an item."""
r = await self._client.get(f"{self._base}/metadata/{identifier}")
r.raise_for_status()
data = r.json()
if not data:
raise ArchiveError(f"item not found: {identifier}")
return data
async def files(self, identifier: str) -> list[dict[str, Any]]:
"""Just the files[] slice — smaller payload when that's all you want."""
r = await self._client.get(f"{self._base}/metadata/{identifier}/files")
r.raise_for_status()
data = r.json()
if isinstance(data, dict) and "result" in data:
return data["result"]
if isinstance(data, list):
return data
raise ArchiveError(f"unexpected files response for {identifier}")
# ---------- download ----------
def download_url(self, identifier: str, filename: str) -> str:
return f"{self._base}/download/{identifier}/{filename}"
async def stream_file(
self,
identifier: str,
filename: str,
resume_from: int = 0,
) -> AsyncIterator[bytes]:
"""Async byte iterator — caller is responsible for writing to disk."""
headers = {}
if resume_from > 0:
headers["Range"] = f"bytes={resume_from}-"
url = self.download_url(identifier, filename)
async with self._client.stream("GET", url, headers=headers) as r:
r.raise_for_status()
async for chunk in r.aiter_bytes(chunk_size=1 << 16):
yield chunk
async def download_to_file(
self,
identifier: str,
filename: str,
dest: Path,
verify_md5: str | None = None,
chunk_cb=None,
) -> dict[str, Any]:
"""Download with resume support. Returns stats + md5 verification result."""
dest.parent.mkdir(parents=True, exist_ok=True)
resume_from = dest.stat().st_size if dest.exists() else 0
hasher = hashlib.md5() if verify_md5 else None
if hasher and resume_from:
# re-hash existing bytes so the final digest is correct
with dest.open("rb") as f:
while chunk := f.read(1 << 16):
hasher.update(chunk)
bytes_written = resume_from
mode = "ab" if resume_from else "wb"
with dest.open(mode) as f:
async for chunk in self.stream_file(identifier, filename, resume_from=resume_from):
f.write(chunk)
bytes_written += len(chunk)
if hasher:
hasher.update(chunk)
if chunk_cb:
chunk_cb(bytes_written)
result = {
"path": str(dest),
"bytes": bytes_written,
"resumed_from": resume_from,
}
if verify_md5 and hasher:
actual = hasher.hexdigest()
result["md5_actual"] = actual
result["md5_expected"] = verify_md5
result["md5_ok"] = actual.lower() == verify_md5.lower()
return result

258
src/mcarchive_org/server.py Normal file
View File

@ -0,0 +1,258 @@
"""FastMCP server exposing archive.org search, metadata, and download."""
from __future__ import annotations
import fnmatch
import os
from pathlib import Path
from typing import Annotated, Any
from fastmcp import FastMCP
from pydantic import Field
from mcarchive_org import __version__
from mcarchive_org.client import ArchiveClient
DEFAULT_DOWNLOAD_ROOT = Path(
os.environ.get("MCARCHIVE_DOWNLOAD_ROOT", Path.cwd() / "downloads")
).expanduser()
mcp = FastMCP(
name="mcarchive-org",
instructions=(
"Search and download files from the Internet Archive (archive.org). "
"Typical flow: search_items -> get_item_metadata -> list_files -> download_file. "
"Use scrape_items (count>=100) only for bulk cursor-paginated iteration."
),
)
# ---------- helpers (not exposed as tools) ----------
def _human_size(n: int | str | None) -> str:
try:
x = float(n) # type: ignore[arg-type]
except (TypeError, ValueError):
return "?"
for unit in ("B", "KB", "MB", "GB", "TB"):
if x < 1024:
return f"{x:.1f} {unit}" if unit != "B" else f"{int(x)} B"
x /= 1024
return f"{x:.1f} PB"
def _enrich_file(identifier: str, f: dict[str, Any]) -> dict[str, Any]:
name = f.get("name", "")
return {
"name": name,
"format": f.get("format"),
"size": int(f["size"]) if f.get("size") and str(f["size"]).isdigit() else None,
"size_human": _human_size(f.get("size")),
"md5": f.get("md5"),
"sha1": f.get("sha1"),
"mtime": f.get("mtime"),
"source": f.get("source"),
"download_url": f"https://archive.org/download/{identifier}/{name}",
}
def _matches(name: str, format_: str | None, name_glob: str | None, formats: list[str] | None) -> bool:
if name_glob and not fnmatch.fnmatchcase(name, name_glob):
return False
return not (formats and (format_ or "").lower() not in {f.lower() for f in formats})
# ---------- tools ----------
@mcp.tool
async def search_items(
query: Annotated[str, Field(description="Lucene/Solr query, e.g. 'mediatype:audio AND creator:\"Grateful Dead\"'")],
fields: Annotated[
list[str] | None,
Field(description="Which metadata fields to return per doc. Defaults to identifier,title,mediatype,creator,date."),
] = None,
sort: Annotated[
list[str] | None,
Field(description="Sort expressions like 'downloads desc' or 'date asc'."),
] = None,
rows: Annotated[int, Field(ge=1, le=200, description="Results per page (1-200).")] = 25,
page: Annotated[int, Field(ge=1, description="1-indexed page number.")] = 1,
) -> dict[str, Any]:
"""Search archive.org items. Good for small/interactive queries.
Returns up to `rows` matching items plus `num_found` (total hits) and `has_more`.
Use scrape_items for bulk iteration over large result sets.
"""
async with ArchiveClient() as c:
result = await c.search(query=query, fields=fields, sort=sort, rows=rows, page=page)
total = result["num_found"]
seen = (page - 1) * rows + len(result["docs"])
return {
"query": query,
"num_found": total,
"page": page,
"rows": rows,
"has_more": seen < total,
"docs": result["docs"],
}
@mcp.tool
async def scrape_items(
query: Annotated[str, Field(description="Lucene/Solr query.")],
fields: Annotated[list[str] | None, Field(description="Metadata fields per item.")] = None,
sorts: Annotated[list[str] | None, Field(description="Sort expressions, e.g. ['date asc'].")] = None,
count: Annotated[int, Field(ge=100, le=10000, description="Items per page (>=100 required by API).")] = 500,
cursor: Annotated[str | None, Field(description="Pass the `cursor` from a prior response to fetch next page.")] = None,
) -> dict[str, Any]:
"""Scrape API — high-throughput cursor-paginated search. count >= 100.
Response includes `cursor` (for next page) when more results exist; missing when done.
"""
async with ArchiveClient() as c:
data = await c.scrape(query=query, fields=fields, sorts=sorts, count=count, cursor=cursor)
return {
"items": data.get("items", []),
"count": data.get("count"),
"total": data.get("total"),
"next_cursor": data.get("cursor"),
}
@mcp.tool
async def get_item_metadata(
identifier: Annotated[str, Field(description="Archive.org item identifier, e.g. 'nasa'.")],
include_files: Annotated[
bool, Field(description="If true, include the full files[] array. Can be large.")
] = False,
) -> dict[str, Any]:
"""Get metadata for a single item.
By default omits the (potentially huge) files[] array call list_files for that.
"""
async with ArchiveClient() as c:
data = await c.metadata(identifier)
md = data.get("metadata", {})
out: dict[str, Any] = {
"identifier": md.get("identifier", identifier),
"title": md.get("title"),
"mediatype": md.get("mediatype"),
"collection": md.get("collection"),
"creator": md.get("creator"),
"date": md.get("date"),
"description": md.get("description"),
"publicdate": md.get("publicdate"),
"uploader": md.get("uploader"),
"subject": md.get("subject"),
"licenseurl": md.get("licenseurl"),
"item_size_bytes": data.get("item_size"),
"item_size_human": _human_size(data.get("item_size")),
"files_count": data.get("files_count"),
"server": data.get("server"),
"dir": data.get("dir"),
"item_url": f"https://archive.org/details/{identifier}",
}
if include_files:
out["files"] = [_enrich_file(identifier, f) for f in data.get("files", [])]
return out
@mcp.tool
async def list_files(
identifier: Annotated[str, Field(description="Archive.org item identifier.")],
formats: Annotated[
list[str] | None,
Field(description="Filter by format, e.g. ['MP3','VBR MP3','JPEG']. Case-insensitive."),
] = None,
name_glob: Annotated[
str | None,
Field(description="fnmatch-style glob on filename, e.g. '*.mp3' or 'cover.*'."),
] = None,
limit: Annotated[int, Field(ge=1, le=1000, description="Max files to return.")] = 100,
) -> dict[str, Any]:
"""List files in an item, with optional format/glob filtering.
Each entry includes a ready-to-use `download_url`.
"""
async with ArchiveClient() as c:
files = await c.files(identifier)
matches = [
_enrich_file(identifier, f)
for f in files
if _matches(f.get("name", ""), f.get("format"), name_glob, formats)
]
return {
"identifier": identifier,
"total_matching": len(matches),
"returned": min(len(matches), limit),
"files": matches[:limit],
}
@mcp.tool
def get_file_url(
identifier: Annotated[str, Field(description="Item identifier.")],
filename: Annotated[str, Field(description="Exact filename as shown in list_files.")],
) -> dict[str, str]:
"""Build the canonical download URL for a file without fetching anything."""
return {
"url": f"https://archive.org/download/{identifier}/{filename}",
"item_url": f"https://archive.org/details/{identifier}",
}
@mcp.tool
async def download_file(
identifier: Annotated[str, Field(description="Item identifier.")],
filename: Annotated[str, Field(description="Exact filename from list_files.")],
dest_dir: Annotated[
str | None,
Field(description="Directory to save into. Defaults to $MCARCHIVE_DOWNLOAD_ROOT/{identifier}."),
] = None,
verify_md5: Annotated[
str | None,
Field(description="Expected MD5 hex digest (from list_files). If provided, checksum is verified."),
] = None,
overwrite: Annotated[
bool,
Field(description="If false and file exists, resume the download (Range request)."),
] = False,
) -> dict[str, Any]:
"""Download a file to disk. Supports resume via HTTP Range when overwrite=false."""
target_dir = Path(dest_dir).expanduser() if dest_dir else (DEFAULT_DOWNLOAD_ROOT / identifier)
dest = target_dir / filename
if overwrite and dest.exists():
dest.unlink()
async with ArchiveClient() as c:
result = await c.download_to_file(identifier, filename, dest, verify_md5=verify_md5)
result["identifier"] = identifier
result["filename"] = filename
result["size_human"] = _human_size(result.get("bytes"))
return result
# ---------- resources ----------
@mcp.resource("archive://item/{identifier}")
async def item_resource(identifier: str) -> dict[str, Any]:
"""Expose item metadata as a readable MCP resource."""
return await get_item_metadata.fn(identifier=identifier, include_files=False) # type: ignore[attr-defined]
# ---------- entry point ----------
def main() -> None:
print(f"mcarchive-org v{__version__} — Internet Archive MCP server")
mcp.run()
if __name__ == "__main__":
main()

14
tests/conftest.py Normal file
View File

@ -0,0 +1,14 @@
import pytest
def pytest_collection_modifyitems(config, items):
for item in items:
if "asyncio" in item.keywords or item.get_closest_marker("asyncio"):
continue
pytest_plugins = ["pytest_asyncio"]
def pytest_configure(config: pytest.Config) -> None:
config.addinivalue_line("markers", "network: test hits live archive.org")

52
tests/test_client.py Normal file
View File

@ -0,0 +1,52 @@
"""End-to-end smoke tests against live archive.org (network required).
Run with: uv run pytest -v
Skip with: uv run pytest -v -m 'not network'
"""
from __future__ import annotations
from pathlib import Path
import pytest
from mcarchive_org.client import ArchiveClient
pytestmark = [pytest.mark.asyncio, pytest.mark.network]
async def test_search_nasa_item():
async with ArchiveClient() as c:
result = await c.search(query="identifier:nasa", rows=5)
assert result["num_found"] >= 1
assert any(d["identifier"] == "nasa" for d in result["docs"])
async def test_metadata_nasa():
async with ArchiveClient() as c:
data = await c.metadata("nasa")
assert data["metadata"]["identifier"] == "nasa"
assert isinstance(data["files"], list) and data["files"]
async def test_download_small_file(tmp_path: Path):
async with ArchiveClient() as c:
files = await c.files("nasa")
# pick the smallest file to keep the test fast
small = min(
(f for f in files if f.get("size") and str(f["size"]).isdigit()),
key=lambda f: int(f["size"]),
)
dest = tmp_path / small["name"]
result = await c.download_to_file(
"nasa", small["name"], dest, verify_md5=small.get("md5")
)
assert result["bytes"] > 0
if small.get("md5"):
assert result["md5_ok"] is True
async def test_scrape_requires_min_count():
async with ArchiveClient() as c:
with pytest.raises(ValueError):
await c.scrape(query="identifier:nasa", count=10)

1622
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff