Implement pg-orrery-catalog: TLE catalog builder for pg_orrery

Core modules:
- tle.py: NORAD decoding (Alpha-5 + Super-5, matching get_el.c),
  3LE/2LE parsing, TLERecord dataclass with epoch-based dedup
- config.py: TOML config + env var overlay (XDG-compliant paths)
- cache.py: File-based cache with staleness checking
- catalog.py: Multi-source merge with MergeStats tracking
- regime.py: LEO/MEO/GEO/HEO classification by mean motion

Source downloaders (httpx):
- celestrak.py: Active catalog + supplemental GP groups
- satnogs.py: JSON API with 3LE conversion
- spacetrack.py: POST auth flow, bulk GP download

Output formatters:
- sql.py: pg_orrery-compatible INSERT generation (E'' strings)
- tle_file.py: Standard 3LE text output
- json_out.py: JSON with orbital metadata and regime

CLI (Click + Rich):
- download: Cache TLEs from all sources
- build: Merge + output SQL/3LE/JSON (pipes to psql)
- load: Direct DB load via psycopg (optional [pg] extra)
- info: Cache stats and configuration display

58 tests covering NORAD decoding (all 4 encoding cases),
parsing, merge/dedup, SQL escaping, regime classification.
This commit is contained in:
Ryan Malloy 2026-02-18 00:31:46 -07:00
parent 23eb205fe8
commit 1d36729bed
18 changed files with 2079 additions and 0 deletions

80
README.md Normal file
View File

@ -0,0 +1,80 @@
# pg-orrery-catalog
TLE catalog builder for [pg_orrery](https://pg-orrery.warehack.ing) — download, merge, and load satellite catalogs from Space-Track, CelesTrak, and SatNOGS into PostgreSQL.
## Install
```bash
uv pip install pg-orrery-catalog
# or
uvx pg-orrery-catalog --help
```
For direct database loading:
```bash
uv pip install "pg-orrery-catalog[pg]"
```
## Quick Start
```bash
# Download TLE data from all sources
pg-orrery-catalog download
# Build SQL and pipe to psql
pg-orrery-catalog build | psql -d mydb
# Or load directly (requires [pg] extra)
pg-orrery-catalog load --database-url postgresql:///mydb --create-index
# Export as 3LE file
pg-orrery-catalog build --format 3le -o merged.tle
# Check cache status
pg-orrery-catalog info --cache
```
## Configuration
Create `~/.config/pg-orrery-catalog/config.toml`:
```toml
[spacetrack]
user = "you@example.com"
password = "secret"
[celestrak]
proxy = "localhost:1080"
supgp_groups = ["starlink", "oneweb", "planet", "orbcomm"]
[output]
table = "satellites"
[database]
url = "postgresql://localhost/mydb"
```
Environment variables (`SPACETRACK_USER`, `SPACETRACK_PASSWORD`, `SOCKS_PROXY`, `DATABASE_URL`) override config file values.
## Sources
| Source | Auth | Coverage |
|--------|------|----------|
| [Space-Track](https://www.space-track.org) | Login required | Full catalog (~30k+ on-orbit) |
| [CelesTrak](https://celestrak.org) | None | Active sats + operator SupGP |
| [SatNOGS](https://db.satnogs.org) | None | Community-tracked objects |
When the same NORAD ID appears in multiple sources, the entry with the newest epoch wins — CelesTrak SupGP data (operator-provided, fresher epochs) overrides stale Space-Track entries.
## NORAD ID Decoding
Handles all four encoding schemes from Bill Gray's sat_code:
- **Traditional**: 5-digit (0000199999)
- **Alpha-5**: letter + 4 digits (A0000Z9999 → 100000339999)
- **Super-5 case 3**: base-64 with uppercase last char (340000906309663)
- **Super-5 case 4**: base-64 with non-digit 4th char (906309664+)
## License
MIT

View File

@ -0,0 +1,90 @@
"""XDG-compliant file cache with staleness checking.
Files are stored in ~/.cache/pg-orrery-catalog/ with source-based subdirectories.
Staleness is determined by file modification time vs a configurable max age.
"""
import time
from pathlib import Path
from .config import DEFAULT_CACHE_DIR
# Default max age: 24 hours
DEFAULT_MAX_AGE_SECONDS = 86400
class Cache:
"""File-based cache under XDG_CACHE_HOME."""
def __init__(self, cache_dir: Path | None = None, max_age: int = DEFAULT_MAX_AGE_SECONDS):
self.cache_dir = cache_dir or DEFAULT_CACHE_DIR
self.max_age = max_age
self.cache_dir.mkdir(parents=True, exist_ok=True)
def path_for(self, source: str, filename: str) -> Path:
"""Return the cache path for a source/filename pair, creating dirs as needed."""
source_dir = self.cache_dir / source
source_dir.mkdir(parents=True, exist_ok=True)
return source_dir / filename
def is_fresh(self, source: str, filename: str) -> bool:
"""Check if a cached file exists and is younger than max_age."""
p = self.path_for(source, filename)
if not p.exists():
return False
age = time.time() - p.stat().st_mtime
return age < self.max_age
def write(self, source: str, filename: str, data: str | bytes) -> Path:
"""Write data to cache, return the path."""
p = self.path_for(source, filename)
if isinstance(data, bytes):
p.write_bytes(data)
else:
p.write_text(data)
return p
def read(self, source: str, filename: str) -> str | None:
"""Read cached text file, or None if missing."""
p = self.path_for(source, filename)
if p.exists():
return p.read_text(errors="replace")
return None
def list_files(self) -> list[tuple[str, Path, float]]:
"""List all cached files as (source, path, age_seconds) tuples."""
results = []
if not self.cache_dir.exists():
return results
for source_dir in sorted(self.cache_dir.iterdir()):
if not source_dir.is_dir():
continue
source = source_dir.name
for f in sorted(source_dir.iterdir()):
if f.is_file():
age = time.time() - f.stat().st_mtime
results.append((source, f, age))
return results
def total_size(self) -> int:
"""Total size of all cached files in bytes."""
total = 0
for _, f, _ in self.list_files():
total += f.stat().st_size
return total
def clear(self, source: str | None = None) -> int:
"""Remove cached files. Returns count of files removed."""
count = 0
if source:
source_dir = self.cache_dir / source
if source_dir.exists():
for f in source_dir.iterdir():
if f.is_file():
f.unlink()
count += 1
else:
for _, f, _ in self.list_files():
f.unlink()
count += 1
return count

View File

@ -0,0 +1,74 @@
"""Catalog merging with epoch-based deduplication.
When the same NORAD ID appears in multiple sources, the entry with the
newest epoch wins. This lets CelesTrak SupGP (fresher epochs) override
stale Space-Track entries.
"""
from dataclasses import dataclass, field
from pathlib import Path
from .tle import TLERecord, parse_3le_file
@dataclass
class SourceStats:
"""Per-source merge statistics."""
source: str
total: int = 0
new: int = 0
updated: int = 0
@dataclass
class MergeStats:
"""Overall merge result."""
sources: list[SourceStats] = field(default_factory=list)
total_unique: int = 0
def merge_sources(
files: list[Path | str],
source_names: list[str] | None = None,
) -> tuple[dict[int, TLERecord], MergeStats]:
"""Merge multiple TLE files with epoch-based deduplication.
Returns (merged_catalog, stats). The catalog is keyed by NORAD ID.
Later files can override earlier entries if they have a newer epoch.
"""
merged: dict[int, TLERecord] = {}
stats = MergeStats()
for idx, filepath in enumerate(files):
filepath = Path(filepath)
source = (source_names[idx] if source_names and idx < len(source_names) else filepath.stem)
objects = parse_3le_file(filepath, source=source)
src_stats = SourceStats(source=source, total=len(objects))
for norad_id, rec in objects.items():
if norad_id not in merged:
src_stats.new += 1
merged[norad_id] = rec
elif rec.epoch > merged[norad_id].epoch:
src_stats.updated += 1
merged[norad_id] = rec
stats.sources.append(src_stats)
stats.total_unique = len(merged)
return merged, stats
def merge_records(
*record_dicts: dict[int, TLERecord],
) -> dict[int, TLERecord]:
"""Merge multiple pre-parsed record dicts, newest epoch wins."""
merged: dict[int, TLERecord] = {}
for records in record_dicts:
for norad_id, rec in records.items():
if norad_id not in merged or rec.epoch > merged[norad_id].epoch:
merged[norad_id] = rec
return merged

View File

@ -0,0 +1,291 @@
"""CLI interface for pg-orrery-catalog.
Commands:
download Download TLE data from sources into local cache
build Merge cached/given TLE files and output SQL/3LE/JSON
load Build + pipe directly into PostgreSQL via psycopg
info Show cache status, config, and version info
"""
import sys
from pathlib import Path
import click
from rich.console import Console
from rich.table import Table
from . import __version__
from .cache import Cache
from .catalog import merge_sources
from .config import Config, load_config
from .output.json_out import generate_json
from .output.sql import generate_sql
from .output.tle_file import generate_3le
from .regime import regime_summary
from .sources import celestrak, satnogs, spacetrack
console = Console(stderr=True)
def _find_cached_files(cache: Cache) -> list[Path]:
"""Find all .tle files in the cache, ordered for merge priority."""
files = []
# Merge order: spacetrack (base), celestrak active, satnogs, then supgp (freshest)
priority = ["spacetrack", "celestrak", "satnogs"]
seen_dirs = set()
for source in priority:
source_dir = cache.cache_dir / source
if source_dir.is_dir():
seen_dirs.add(source)
for f in sorted(source_dir.glob("*.tle")):
files.append(f)
# Any remaining source dirs
if cache.cache_dir.exists():
for source_dir in sorted(cache.cache_dir.iterdir()):
if source_dir.is_dir() and source_dir.name not in seen_dirs:
for f in sorted(source_dir.glob("*.tle")):
files.append(f)
return files
@click.group()
@click.version_option(version=__version__, prog_name="pg-orrery-catalog")
@click.option(
"--config", "config_path",
type=click.Path(exists=True, path_type=Path), default=None,
)
@click.pass_context
def main(ctx, config_path):
"""TLE catalog builder for pg_orrery."""
ctx.ensure_object(dict)
ctx.obj["config"] = load_config(config_path)
@main.command()
@click.option(
"--source", "sources",
multiple=True,
type=click.Choice(["spacetrack", "celestrak", "satnogs"], case_sensitive=False),
help="Download from specific source(s). Default: all configured.",
)
@click.option("--force", is_flag=True, help="Re-download even if cache is fresh.")
@click.pass_context
def download(ctx, sources, force):
"""Download TLE data from remote sources into local cache."""
cfg: Config = ctx.obj["config"]
cache = Cache(cfg.cache_dir)
if not sources:
sources = ("celestrak", "satnogs", "spacetrack")
console.print("[bold]Downloading TLE sources...[/bold]")
for source in sources:
if source == "celestrak":
celestrak.download_all(cfg.celestrak, cache, force=force)
elif source == "satnogs":
satnogs.download(cache, force=force, proxy=cfg.celestrak.proxy)
elif source == "spacetrack":
spacetrack.download(cfg.spacetrack, cache, force=force)
console.print("[bold green]Done.[/bold green]")
@main.command()
@click.argument("files", nargs=-1, type=click.Path(exists=True, path_type=Path))
@click.option(
"--format", "fmt",
type=click.Choice(["sql", "3le", "json"], case_sensitive=False),
default="sql",
help="Output format.",
)
@click.option("--table", default=None, help="SQL table name (default: from config).")
@click.option("-o", "--output", "output_path", type=click.Path(path_type=Path), default=None)
@click.pass_context
def build(ctx, files, fmt, table, output_path):
"""Merge TLE files and output catalog.
If no FILES given, uses cached downloads. Output goes to stdout unless -o is specified.
"""
cfg: Config = ctx.obj["config"]
cache = Cache(cfg.cache_dir)
table = table or cfg.output.table
# Determine input files
if files:
file_list = list(files)
else:
file_list = _find_cached_files(cache)
if not file_list:
console.print("[red]No cached TLE files found. Run 'download' first.[/red]")
raise SystemExit(1)
# Merge
merged, stats = merge_sources(file_list)
for src in stats.sources:
console.print(
f" {src.source}: {src.total} objects "
f"({src.new} new, {src.updated} updated)"
)
console.print(f" [bold]Total: {stats.total_unique} unique objects[/bold]")
# Regime summary
regimes = regime_summary(merged)
regime_parts = [f"{k}: {v}" for k, v in sorted(regimes.items(), key=lambda x: -x[1]) if v > 0]
console.print(f" Regimes: {', '.join(regime_parts)}")
# Generate output
if output_path:
with open(output_path, "w") as f:
if fmt == "sql":
generate_sql(merged, table=table, stream=f)
elif fmt == "3le":
generate_3le(merged, stream=f)
elif fmt == "json":
generate_json(merged, stream=f)
console.print(f" Written to {output_path}")
else:
if fmt == "sql":
sys.stdout.write(generate_sql(merged, table=table))
elif fmt == "3le":
sys.stdout.write(generate_3le(merged))
elif fmt == "json":
sys.stdout.write(generate_json(merged))
@main.command()
@click.argument("files", nargs=-1, type=click.Path(exists=True, path_type=Path))
@click.option("--table", default=None, help="SQL table name.")
@click.option("--database-url", default=None, help="PostgreSQL connection URL.")
@click.option("--create-index", is_flag=True, help="Create GiST and SP-GiST indexes after load.")
@click.pass_context
def load(ctx, files, table, database_url, create_index):
"""Build catalog and load directly into PostgreSQL.
Requires the [pg] extra: pip install pg-orrery-catalog[pg]
"""
try:
import psycopg
except ImportError:
console.print(
"[red]psycopg not installed. Install with:[/red]\n"
" uv pip install pg-orrery-catalog[pg]"
)
raise SystemExit(1) from None
cfg: Config = ctx.obj["config"]
cache = Cache(cfg.cache_dir)
table = table or cfg.output.table
db_url = database_url or cfg.database.url
if not db_url:
console.print("[red]No database URL. Use --database-url or set DATABASE_URL.[/red]")
raise SystemExit(1)
# Determine input files
if files:
file_list = list(files)
else:
file_list = _find_cached_files(cache)
if not file_list:
console.print("[red]No cached TLE files. Run 'download' first.[/red]")
raise SystemExit(1)
# Merge
merged, stats = merge_sources(file_list)
for src in stats.sources:
console.print(
f" {src.source}: {src.total} objects "
f"({src.new} new, {src.updated} updated)"
)
console.print(f" [bold]Total: {stats.total_unique} unique objects[/bold]")
# Generate SQL and execute
sql = generate_sql(merged, table=table)
console.print(f" Loading into {db_url}...")
with psycopg.connect(db_url) as conn:
conn.execute(sql)
if create_index:
console.print(" Creating indexes...")
conn.execute(
f"CREATE INDEX IF NOT EXISTS {table}_spgist_idx"
f" ON {table} USING spgist (tle tle_spgist_ops)"
)
conn.execute(
f"CREATE INDEX IF NOT EXISTS {table}_gist_idx"
f" ON {table} USING gist (tle)"
)
conn.commit()
row = conn.execute(f"SELECT count(*) FROM {table}").fetchone()
console.print(f" [bold green]{row[0]} objects loaded[/bold green]")
@main.command()
@click.option("--cache", "show_cache", is_flag=True, help="Show cache file details.")
@click.pass_context
def info(ctx, show_cache):
"""Show version, configuration, and cache info."""
cfg: Config = ctx.obj["config"]
cache = Cache(cfg.cache_dir)
console.print(f"[bold]pg-orrery-catalog[/bold] v{__version__}")
console.print(f" Config: {cfg.config_dir / 'config.toml'}")
console.print(f" Cache: {cfg.cache_dir}")
if cfg.spacetrack.user:
console.print(f" Space-Track: {cfg.spacetrack.user}")
else:
console.print(" Space-Track: [yellow]not configured[/yellow]")
if cfg.celestrak.proxy:
console.print(f" Proxy: {cfg.celestrak.proxy}")
if cfg.database.url:
console.print(f" Database: {cfg.database.url}")
console.print(f" Default table: {cfg.output.table}")
console.print(f" SupGP groups: {', '.join(cfg.celestrak.supgp_groups)}")
if show_cache:
files = cache.list_files()
if not files:
console.print("\n [dim]Cache is empty[/dim]")
else:
table = Table(title="Cached Files")
table.add_column("Source", style="cyan")
table.add_column("File", style="green")
table.add_column("Size", justify="right")
table.add_column("Age", justify="right")
for source, path, age in files:
size = path.stat().st_size
if size > 1_000_000:
size_str = f"{size / 1_000_000:.1f} MB"
elif size > 1000:
size_str = f"{size / 1000:.1f} KB"
else:
size_str = f"{size} B"
if age > 86400:
age_str = f"{age / 86400:.1f}d"
elif age > 3600:
age_str = f"{age / 3600:.1f}h"
else:
age_str = f"{age / 60:.0f}m"
table.add_row(source, path.name, size_str, age_str)
console.print()
console.print(table)
total = cache.total_size()
if total > 1_000_000:
console.print(f" Total: {total / 1_000_000:.1f} MB across {len(files)} files")
else:
console.print(f" Total: {total / 1000:.1f} KB across {len(files)} files")

View File

@ -0,0 +1,111 @@
"""Configuration: TOML config file + environment variable overlay.
Precedence (highest first): CLI flags > env vars > config file > defaults.
Config file: ~/.config/pg-orrery-catalog/config.toml (XDG-compliant).
"""
import os
import tomllib
from dataclasses import dataclass, field
from pathlib import Path
def _xdg_config_home() -> Path:
return Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config"))
def _xdg_cache_home() -> Path:
return Path(os.environ.get("XDG_CACHE_HOME", Path.home() / ".cache"))
APP_NAME = "pg-orrery-catalog"
DEFAULT_CONFIG_DIR = _xdg_config_home() / APP_NAME
DEFAULT_CACHE_DIR = _xdg_cache_home() / APP_NAME
DEFAULT_CONFIG_FILE = DEFAULT_CONFIG_DIR / "config.toml"
DEFAULT_TABLE = "bench_catalog"
DEFAULT_CELESTRAK_SUPGP_GROUPS = ["starlink", "oneweb", "planet", "orbcomm"]
CELESTRAK_GP_URL = "https://celestrak.org/NORAD/elements/gp.php"
CELESTRAK_SUPGP_URL = "https://celestrak.org/NORAD/elements/supplemental/sup-gp.php"
SATNOGS_API_URL = "https://db.satnogs.org/api/tle/"
SPACETRACK_LOGIN_URL = "https://www.space-track.org/ajaxauth/login"
SPACETRACK_GP_URL = (
"https://www.space-track.org/basicspacedata/query"
"/class/gp/orderby/NORAD_CAT_ID%20asc/format/3le/emptyresult/show"
)
@dataclass
class SpaceTrackConfig:
user: str = ""
password: str = ""
@dataclass
class CelesTrakConfig:
proxy: str = ""
supgp_groups: list[str] = field(default_factory=lambda: list(DEFAULT_CELESTRAK_SUPGP_GROUPS))
@dataclass
class OutputConfig:
table: str = DEFAULT_TABLE
@dataclass
class DatabaseConfig:
url: str = ""
@dataclass
class Config:
spacetrack: SpaceTrackConfig = field(default_factory=SpaceTrackConfig)
celestrak: CelesTrakConfig = field(default_factory=CelesTrakConfig)
output: OutputConfig = field(default_factory=OutputConfig)
database: DatabaseConfig = field(default_factory=DatabaseConfig)
cache_dir: Path = field(default_factory=lambda: DEFAULT_CACHE_DIR)
config_dir: Path = field(default_factory=lambda: DEFAULT_CONFIG_DIR)
def load_config(config_path: Path | None = None) -> Config:
"""Load config from TOML file, then overlay environment variables."""
cfg = Config()
# Load TOML if it exists
path = config_path or DEFAULT_CONFIG_FILE
if path.exists():
with open(path, "rb") as f:
data = tomllib.load(f)
if "spacetrack" in data:
st = data["spacetrack"]
cfg.spacetrack.user = st.get("user", "")
cfg.spacetrack.password = st.get("password", "")
if "celestrak" in data:
ct = data["celestrak"]
cfg.celestrak.proxy = ct.get("proxy", "")
if "supgp_groups" in ct:
cfg.celestrak.supgp_groups = ct["supgp_groups"]
if "output" in data:
cfg.output.table = data["output"].get("table", DEFAULT_TABLE)
if "database" in data:
cfg.database.url = data["database"].get("url", "")
# Environment variable overlay
if env_user := os.environ.get("SPACETRACK_USER"):
cfg.spacetrack.user = env_user
if env_pass := os.environ.get("SPACETRACK_PASSWORD"):
cfg.spacetrack.password = env_pass
if env_proxy := os.environ.get("SOCKS_PROXY"):
cfg.celestrak.proxy = env_proxy
if env_db := os.environ.get("DATABASE_URL"):
cfg.database.url = env_db
if env_table := os.environ.get("PG_ORRERY_TABLE"):
cfg.output.table = env_table
return cfg

View File

@ -0,0 +1,47 @@
"""JSON output for catalog data.
Outputs an array of objects with NORAD ID, name, TLE lines, epoch, source,
and orbital regime classification.
"""
import io
import json
from ..regime import classify_record
from ..tle import TLERecord
def generate_json(
records: dict[int, TLERecord],
stream: io.TextIOBase | None = None,
indent: int = 2,
) -> str | None:
"""Generate JSON array from records, sorted by NORAD ID.
If stream is provided, writes directly to it and returns None.
Otherwise returns the JSON as a string.
"""
sorted_ids = sorted(records.keys())
entries = []
for norad_id in sorted_ids:
rec = records[norad_id]
entries.append({
"norad_id": norad_id,
"name": rec.name or f"NORAD {norad_id}",
"line1": rec.line1,
"line2": rec.line2,
"epoch": rec.epoch,
"mean_motion": rec.mean_motion,
"inclination": rec.inclination,
"eccentricity": rec.eccentricity,
"regime": classify_record(rec),
"source": rec.source,
})
text = json.dumps(entries, indent=indent) + "\n"
if stream is not None:
stream.write(text)
return None
return text

View File

@ -0,0 +1,58 @@
"""SQL INSERT generation for pg_orrery's tle type.
Output format matches the existing convention from bench/build_catalog.py:
INSERT INTO {table} (name, tle) VALUES ('NAME', E'line1\\nline2');
Uses E'' escape strings for the embedded newline between TLE lines.
"""
import io
from ..tle import TLERecord
def escape_sql_string(s: str) -> str:
"""Escape a string for use in a PostgreSQL single-quoted literal."""
return s.replace("\\", "\\\\").replace("'", "''")
def generate_sql(
records: dict[int, TLERecord],
table: str = "bench_catalog",
drop_existing: bool = True,
stream: io.TextIOBase | None = None,
) -> str | None:
"""Generate SQL to create table and INSERT all records.
If stream is provided, writes directly to it and returns None.
Otherwise returns the SQL as a string.
"""
buf = stream or io.StringIO()
sorted_ids = sorted(records.keys())
sources = sorted({rec.source for rec in records.values() if rec.source})
source_str = ", ".join(sources) if sources else "unknown"
buf.write(f"-- pg_orrery catalog ({len(records)} objects)\n")
buf.write(f"-- Sources: {source_str}\n\n")
if drop_existing:
buf.write(f"DROP TABLE IF EXISTS {table};\n")
buf.write(f"CREATE TABLE IF NOT EXISTS {table} (\n")
buf.write(" id serial,\n")
buf.write(" name text,\n")
buf.write(" tle tle\n")
buf.write(");\n\n")
for norad_id in sorted_ids:
rec = records[norad_id]
name = rec.name or f"NORAD {norad_id}"
name_sql = escape_sql_string(name)
tle_str = f"{rec.line1}\\n{rec.line2}"
buf.write(f"INSERT INTO {table} (name, tle) VALUES ('{name_sql}', E'{tle_str}');\n")
buf.write(f"\n-- Loaded {len(records)} objects\n")
if stream is None:
return buf.getvalue()
return None

View File

@ -0,0 +1,38 @@
"""3LE text file output.
Writes standard 3-line element format:
0 OBJECT NAME
1 NNNNN...
2 NNNNN...
"""
import io
from ..tle import TLERecord
def generate_3le(
records: dict[int, TLERecord],
stream: io.TextIOBase | None = None,
) -> str | None:
"""Generate 3LE text from records, sorted by NORAD ID.
If stream is provided, writes directly to it and returns None.
Otherwise returns the text as a string.
"""
buf = stream or io.StringIO()
sorted_ids = sorted(records.keys())
for norad_id in sorted_ids:
rec = records[norad_id]
name = rec.name or f"0 NORAD {norad_id}"
# Ensure name line starts with "0 " for proper 3LE format
if not name.startswith("0 "):
name = f"0 {name}"
buf.write(f"{name}\n")
buf.write(f"{rec.line1}\n")
buf.write(f"{rec.line2}\n")
if stream is None:
return buf.getvalue()
return None

View File

@ -0,0 +1,35 @@
"""Orbital regime classification based on mean motion.
Thresholds match the bench/load_bench.sh SQL query:
LEO: mean_motion > 11.25 rev/day (period < 128 min, alt < ~2000 km)
MEO: mean_motion > 1.8 rev/day (period < 800 min)
GEO: mean_motion > 0.9 rev/day (near-synchronous)
HEO: everything else (Molniya, tundra, GTO, etc.)
"""
from .tle import TLERecord
def classify_regime(mean_motion: float) -> str:
"""Classify orbital regime from mean motion (revs/day)."""
if mean_motion > 11.25:
return "LEO"
if mean_motion > 1.8:
return "MEO"
if mean_motion > 0.9:
return "GEO"
return "HEO"
def classify_record(rec: TLERecord) -> str:
"""Classify a TLERecord's orbital regime."""
return classify_regime(rec.mean_motion)
def regime_summary(records: dict[int, TLERecord]) -> dict[str, int]:
"""Count objects per regime."""
counts: dict[str, int] = {"LEO": 0, "MEO": 0, "GEO": 0, "HEO": 0}
for rec in records.values():
regime = classify_record(rec)
counts[regime] = counts.get(regime, 0) + 1
return counts

View File

@ -0,0 +1,88 @@
"""CelesTrak GP and supplemental GP (SupGP) downloader.
CelesTrak provides free TLE data without authentication:
- Active catalog: gp.php?GROUP=active&FORMAT=3le
- SupGP groups: sup-gp.php?FILE={group}&FORMAT=3le (operator-provided, fresher epochs)
SupGP data is particularly valuable operators like SpaceX (Starlink) and
OneWeb submit their own ephemerides, which are often hours newer than
Space-Track's catalog.
"""
import httpx
from rich.console import Console
from ..cache import Cache
from ..config import CELESTRAK_GP_URL, CELESTRAK_SUPGP_URL, CelesTrakConfig
console = Console(stderr=True)
def _build_client(cfg: CelesTrakConfig, timeout: float = 120.0) -> httpx.Client:
"""Build an httpx client, optionally with SOCKS proxy."""
kwargs: dict = {"timeout": timeout, "follow_redirects": True}
if cfg.proxy:
kwargs["proxy"] = f"socks5://{cfg.proxy}"
return httpx.Client(**kwargs)
def download_active(cfg: CelesTrakConfig, cache: Cache, force: bool = False) -> str | None:
"""Download the CelesTrak active satellite catalog (3LE format)."""
filename = "celestrak_active.tle"
if not force and cache.is_fresh("celestrak", filename):
console.print(" [dim]celestrak/active: cached (fresh)[/dim]")
return cache.read("celestrak", filename)
console.print(" celestrak/active...", end=" ")
try:
with _build_client(cfg) as client:
resp = client.get(CELESTRAK_GP_URL, params={"GROUP": "active", "FORMAT": "3le"})
resp.raise_for_status()
data = resp.text
cache.write("celestrak", filename, data)
n_objects = data.count("\n1 ") + (1 if data.startswith("1 ") else 0)
console.print(f"[green]{n_objects} objects[/green]")
return data
except httpx.HTTPError as e:
console.print(f"[red]FAILED: {e}[/red]")
return cache.read("celestrak", filename)
def download_supgp(
cfg: CelesTrakConfig, cache: Cache, group: str, force: bool = False
) -> str | None:
"""Download a CelesTrak supplemental GP group (3LE format)."""
filename = f"supgp_{group}.tle"
if not force and cache.is_fresh("celestrak", filename):
console.print(f" [dim]celestrak/supgp-{group}: cached (fresh)[/dim]")
return cache.read("celestrak", filename)
console.print(f" celestrak/supgp-{group}...", end=" ")
try:
with _build_client(cfg) as client:
resp = client.get(CELESTRAK_SUPGP_URL, params={"FILE": group, "FORMAT": "3le"})
resp.raise_for_status()
data = resp.text
cache.write("celestrak", filename, data)
n_objects = data.count("\n1 ") + (1 if data.startswith("1 ") else 0)
console.print(f"[green]{n_objects} objects[/green]")
return data
except httpx.HTTPError as e:
console.print(f"[red]FAILED: {e}[/red]")
return cache.read("celestrak", filename)
def download_all(cfg: CelesTrakConfig, cache: Cache, force: bool = False) -> list[str]:
"""Download active catalog + all configured SupGP groups. Returns list of TLE text."""
results = []
data = download_active(cfg, cache, force=force)
if data:
results.append(data)
for group in cfg.supgp_groups:
data = download_supgp(cfg, cache, group, force=force)
if data:
results.append(data)
return results

View File

@ -0,0 +1,61 @@
"""SatNOGS DB REST API downloader.
SatNOGS provides TLE data as JSON via their REST API. The response is
paginated we request a large page_size to minimize requests. The JSON
contains tle0 (name line), tle1, tle2 fields which we convert to 3LE text.
"""
import json
import httpx
from rich.console import Console
from ..cache import Cache
from ..config import SATNOGS_API_URL
console = Console(stderr=True)
# SatNOGS API returns JSON — convert to 3LE text
_PAGE_SIZE = 5000
def download(cache: Cache, force: bool = False, proxy: str = "") -> str | None:
"""Download TLE data from SatNOGS DB API, convert JSON to 3LE text."""
filename = "satnogs_full.tle"
if not force and cache.is_fresh("satnogs", filename):
console.print(" [dim]satnogs: cached (fresh)[/dim]")
return cache.read("satnogs", filename)
console.print(" satnogs...", end=" ")
try:
kwargs: dict = {"timeout": 60.0, "follow_redirects": True}
if proxy:
kwargs["proxy"] = f"socks5://{proxy}"
with httpx.Client(**kwargs) as client:
resp = client.get(SATNOGS_API_URL, params={"format": "json", "page_size": _PAGE_SIZE})
resp.raise_for_status()
raw = resp.json()
# SatNOGS may return {results: [...]} or a bare list
entries = raw.get("results", []) if isinstance(raw, dict) else raw
lines = []
for entry in entries:
tle0 = entry.get("tle0", "0 UNKNOWN").strip()
tle1 = entry.get("tle1", "").strip()
tle2 = entry.get("tle2", "").strip()
if tle1 and tle2:
lines.append(tle0)
lines.append(tle1)
lines.append(tle2)
data = "\n".join(lines) + "\n"
cache.write("satnogs", filename, data)
n_objects = len(lines) // 3
console.print(f"[green]{n_objects} objects[/green]")
return data
except (httpx.HTTPError, json.JSONDecodeError) as e:
console.print(f"[red]FAILED: {e}[/red]")
return cache.read("satnogs", filename)

View File

@ -0,0 +1,68 @@
"""Space-Track.org API downloader.
Space-Track requires authentication via POST to /ajaxauth/login, which sets
a session cookie. Subsequent requests use that cookie. This replaces the bash
script's curl cookie-file approach with httpx session management.
Credentials come from config.toml [spacetrack] section or SPACETRACK_USER /
SPACETRACK_PASSWORD environment variables.
"""
import httpx
from rich.console import Console
from ..cache import Cache
from ..config import SPACETRACK_GP_URL, SPACETRACK_LOGIN_URL, SpaceTrackConfig
console = Console(stderr=True)
def download(cfg: SpaceTrackConfig, cache: Cache, force: bool = False) -> str | None:
"""Download the full Space-Track GP catalog (3LE format).
Authenticates via POST, then fetches the bulk catalog ordered by NORAD ID.
"""
filename = "spacetrack_everything.tle"
if not force and cache.is_fresh("spacetrack", filename):
console.print(" [dim]spacetrack: cached (fresh)[/dim]")
return cache.read("spacetrack", filename)
if not cfg.user or not cfg.password:
console.print(
" [yellow]spacetrack: skipped (no credentials)[/yellow]\n"
" Set SPACETRACK_USER and SPACETRACK_PASSWORD, or configure in config.toml"
)
return cache.read("spacetrack", filename)
console.print(" spacetrack...", end=" ")
try:
with httpx.Client(timeout=300.0, follow_redirects=True) as client:
# Authenticate — Space-Track sets session cookie
login_resp = client.post(
SPACETRACK_LOGIN_URL,
data={"identity": cfg.user, "password": cfg.password},
)
login_resp.raise_for_status()
# Check for failed login (Space-Track returns 200 with error text)
if "failed" in login_resp.text.lower() or "invalid" in login_resp.text.lower():
console.print("[red]FAILED: authentication rejected[/red]")
return cache.read("spacetrack", filename)
# Fetch bulk catalog
resp = client.get(SPACETRACK_GP_URL)
resp.raise_for_status()
data = resp.text
if not data.strip() or data.strip().startswith("{"):
console.print("[red]FAILED: unexpected response format[/red]")
return cache.read("spacetrack", filename)
cache.write("spacetrack", filename, data)
n_objects = data.count("\n1 ") + (1 if data.startswith("1 ") else 0)
console.print(f"[green]{n_objects} objects[/green]")
return data
except httpx.HTTPError as e:
console.print(f"[red]FAILED: {e}[/red]")
return cache.read("spacetrack", filename)

View File

@ -0,0 +1,255 @@
"""TLE parsing, NORAD ID decoding (Alpha-5 + Super-5), and record types.
The NORAD decoding mirrors get_norad_number() in pg_orrery's src/sgp4/get_el.c
(lines 245-275). Four encoding cases:
(1) ddddd traditional 5-digit (0 to 99,999)
(2) Ldddd Alpha-5: letter + 4 digits (100,000 to 339,999), I/O skipped
(3) xxxxX Super-5: last char is uppercase letter (340,000 to 906,309,663)
(4) xxxXd Super-5: 4th char is non-digit (906,309,664 to 1,047,867,423)
The base64 alphabet: 0-9 (0-9), A-Z (10-35), a-z (36-61), + (62), - (63).
"""
import sys
from dataclasses import dataclass
from pathlib import Path
# Alpha-5 letters skip I and O (look like 1 and 0)
_ALPHA5_SKIP = {"I", "O"}
def _base64_to_int(c: str) -> int:
"""Convert a single character to its base-64 value, matching get_el.c."""
if "A" <= c <= "Z":
return ord(c) - ord("A") + 10
if "a" <= c <= "z":
return ord(c) - ord("a") + 36
if "0" <= c <= "9":
return ord(c) - ord("0")
if c == " ":
return 0
if c == "+":
return 62
if c == "-":
return 63
return -1
def decode_norad(s: str) -> int | None:
"""Decode a 5-character NORAD catalog field to an integer.
Handles all four encoding cases from Bill Gray's get_el.c:
traditional 5-digit, Alpha-5, and Super-5 (cases 3 and 4).
"""
if not s or len(s) < 5:
return None
# Use the raw 5-char field (space-padded), matching C's get_norad_number()
digits = [_base64_to_int(c) for c in s[:5]]
if any(d == -1 for d in digits):
return None
# Case (3): last char is uppercase letter (non-digit base64)
if digits[4] > 9:
return (
340000
+ (digits[4] - 10)
+ 54 * (digits[3] + (digits[2] << 6) + (digits[1] << 12) + (digits[0] << 18))
)
# Case (4): 4th char is non-digit base64
if digits[3] > 9:
return (
340000
+ 905969664
+ digits[4]
+ (digits[3] - 10) * 10
+ 540 * (digits[2] + (digits[1] << 6) + (digits[0] << 12))
)
# Cases (1) and (2): last four chars are 0-9 (or space=0)
# Matches C: rval = digits[0] * 10000 + atoi(buff + 1)
first = s[0]
if first.isdigit() or first == " ":
# Case (1): traditional 5-digit (space-padded on left)
# C does: digits[0] * 10000 + atoi(buff+1)
try:
tail = int(s[1:5].lstrip() or "0")
except ValueError:
return None
return digits[0] * 10000 + tail
if first.isalpha() and first.isupper():
# Case (2): Alpha-5 — letter + 4 digits, I and O skipped
# C code: digits[0] with skip decrements, then digits[0]*10000 + atoi(buff+1)
# base64 A=10, so 10*10000=100000 gives the Alpha-5 offset naturally
val = digits[0]
if first > "I":
val -= 1
if first > "O":
val -= 1
try:
return val * 10000 + int(s[1:5])
except ValueError:
return None
return None
@dataclass
class TLERecord:
"""A parsed TLE entry with metadata."""
line1: str
line2: str
name: str
norad_id: int
epoch: float
source: str = ""
@property
def norad_field(self) -> str:
"""The raw 5-char NORAD field from line 1."""
return self.line1[2:7]
@property
def intl_designator(self) -> str:
"""International designator (columns 9-16 of line 1)."""
return self.line1[9:17].strip()
@property
def mean_motion(self) -> float:
"""Mean motion in revs/day from line 2 (columns 52-63)."""
try:
return float(self.line2[52:63].strip())
except (ValueError, IndexError):
return 0.0
@property
def inclination(self) -> float:
"""Inclination in degrees from line 2 (columns 8-16)."""
try:
return float(self.line2[8:16].strip())
except (ValueError, IndexError):
return 0.0
@property
def eccentricity(self) -> float:
"""Eccentricity from line 2 (columns 26-33, implied leading 0.)."""
try:
return float("0." + self.line2[26:33].strip())
except (ValueError, IndexError):
return 0.0
def parse_3le_file(filepath: str | Path, source: str = "") -> dict[int, TLERecord]:
"""Parse a 3LE (or 2LE) file into a dict of norad_id -> TLERecord.
When the same NORAD ID appears multiple times, the entry with the
newest epoch wins.
"""
filepath = Path(filepath)
objects: dict[int, TLERecord] = {}
try:
lines = filepath.read_text(errors="replace").splitlines()
except FileNotFoundError:
print(f"# SKIP {filepath}: not found", file=sys.stderr)
return objects
src = source or filepath.stem
i = 0
while i < len(lines):
line = lines[i].rstrip()
if line.startswith("1 ") and i + 1 < len(lines) and lines[i + 1].rstrip().startswith("2 "):
line1 = line.rstrip()
line2 = lines[i + 1].rstrip()
# Look back for name line (3LE format)
name = ""
if i > 0:
prev = lines[i - 1].rstrip()
if prev and not prev.startswith(("1 ", "2 ")):
name = prev.strip()
norad_field = line1[2:7]
norad_id = decode_norad(norad_field)
if norad_id is None:
i += 2
continue
# Epoch: columns 18-32 of line 1 (YYDDD.DDDDDDDD)
try:
epoch = float(line1[18:32].strip())
except (ValueError, IndexError):
epoch = 0.0
rec = TLERecord(
line1=line1,
line2=line2,
name=name,
norad_id=norad_id,
epoch=epoch,
source=src,
)
# Newest epoch wins
if norad_id not in objects or epoch > objects[norad_id].epoch:
objects[norad_id] = rec
i += 2
else:
i += 1
return objects
def parse_3le_text(text: str, source: str = "") -> dict[int, TLERecord]:
"""Parse 3LE content from a string (same logic as parse_3le_file)."""
objects: dict[int, TLERecord] = {}
lines = text.splitlines()
i = 0
while i < len(lines):
line = lines[i].rstrip()
if line.startswith("1 ") and i + 1 < len(lines) and lines[i + 1].rstrip().startswith("2 "):
line1 = line.rstrip()
line2 = lines[i + 1].rstrip()
name = ""
if i > 0:
prev = lines[i - 1].rstrip()
if prev and not prev.startswith(("1 ", "2 ")):
name = prev.strip()
norad_field = line1[2:7]
norad_id = decode_norad(norad_field)
if norad_id is None:
i += 2
continue
try:
epoch = float(line1[18:32].strip())
except (ValueError, IndexError):
epoch = 0.0
rec = TLERecord(
line1=line1,
line2=line2,
name=name,
norad_id=norad_id,
epoch=epoch,
source=source,
)
if norad_id not in objects or epoch > objects[norad_id].epoch:
objects[norad_id] = rec
i += 2
else:
i += 1
return objects

78
tests/test_catalog.py Normal file
View File

@ -0,0 +1,78 @@
"""Tests for catalog merge and deduplication logic."""
import textwrap
from pg_orrery_catalog.catalog import merge_records, merge_sources
from pg_orrery_catalog.tle import TLERecord
def _make_record(norad_id: int, epoch: float, source: str = "") -> TLERecord:
"""Helper to create a minimal TLERecord for testing."""
line1 = f"1 {norad_id:05d}U 98067A {epoch:014.8f} .00000000 00000-0 00000-0 0 9990"
line2 = f"2 {norad_id:05d} 51.6400 100.0000 0007417 30.0000 330.1234 15.49456789999990"
return TLERecord(
line1=line1, line2=line2, name=f"SAT-{norad_id}",
norad_id=norad_id, epoch=epoch, source=source,
)
class TestMergeRecords:
def test_merge_disjoint(self):
a = {1: _make_record(1, 24001.0, "A")}
b = {2: _make_record(2, 24002.0, "B")}
merged = merge_records(a, b)
assert len(merged) == 2
assert 1 in merged
assert 2 in merged
def test_merge_newer_wins(self):
old = {1: _make_record(1, 24001.0, "old")}
new = {1: _make_record(1, 24005.0, "new")}
merged = merge_records(old, new)
assert len(merged) == 1
assert merged[1].source == "new"
assert merged[1].epoch == 24005.0
def test_merge_older_loses(self):
new = {1: _make_record(1, 24005.0, "new")}
old = {1: _make_record(1, 24001.0, "old")}
merged = merge_records(new, old)
assert merged[1].source == "new"
def test_merge_three_sources(self):
a = {1: _make_record(1, 24001.0, "A"), 2: _make_record(2, 24001.0, "A")}
b = {1: _make_record(1, 24003.0, "B"), 3: _make_record(3, 24002.0, "B")}
c = {1: _make_record(1, 24002.0, "C")} # older than B
merged = merge_records(a, b, c)
assert len(merged) == 3
assert merged[1].source == "B" # newest epoch
class TestMergeSources:
def test_merge_files(self, tmp_path):
tle1 = textwrap.dedent("""\
SAT-A
1 00001U 98001A 24001.50000000 .00000000 00000-0 00000-0 0 9990
2 00001 30.0000 100.0000 0010000 45.0000 315.0000 15.00000000999990
""")
tle2 = textwrap.dedent("""\
SAT-A-NEWER
1 00001U 98001A 24005.50000000 .00000000 00000-0 00000-0 0 9990
2 00001 30.0000 100.0000 0010000 45.0000 315.0000 15.00000000999990
SAT-B
1 00002U 98002A 24003.00000000 .00000000 00000-0 00000-0 0 9990
2 00002 45.0000 200.0000 0020000 90.0000 270.0000 14.50000000999990
""")
f1 = tmp_path / "source1.tle"
f2 = tmp_path / "source2.tle"
f1.write_text(tle1)
f2.write_text(tle2)
merged, stats = merge_sources([f1, f2])
assert stats.total_unique == 2
assert len(stats.sources) == 2
# Source 2 should have 1 new (SAT-B) and 1 updated (SAT-A-NEWER)
assert stats.sources[1].new == 1
assert stats.sources[1].updated == 1
# SAT-A should have the newer name
assert merged[1].name == "SAT-A-NEWER"

124
tests/test_output.py Normal file
View File

@ -0,0 +1,124 @@
"""Tests for output formatters (SQL, 3LE, JSON)."""
import json
from pg_orrery_catalog.output.json_out import generate_json
from pg_orrery_catalog.output.sql import escape_sql_string, generate_sql
from pg_orrery_catalog.output.tle_file import generate_3le
from pg_orrery_catalog.tle import TLERecord
def _make_record(norad_id: int, name: str = "", epoch: float = 24001.0) -> TLERecord:
line1 = f"1 {norad_id:05d}U 98067A {epoch:014.8f} .00000000 00000-0 00000-0 0 9990"
line2 = f"2 {norad_id:05d} 51.6400 100.0000 0007417 30.0000 330.1234 15.49456789999990"
return TLERecord(
line1=line1, line2=line2, name=name or f"SAT-{norad_id}",
norad_id=norad_id, epoch=epoch,
)
class TestSQLEscaping:
def test_single_quote(self):
assert escape_sql_string("it's") == "it''s"
def test_backslash(self):
assert escape_sql_string("a\\b") == "a\\\\b"
def test_both(self):
assert escape_sql_string("it's a\\b") == "it''s a\\\\b"
def test_clean(self):
assert escape_sql_string("HELLO") == "HELLO"
class TestSQLGeneration:
def test_basic_output(self):
records = {25544: _make_record(25544, "ISS (ZARYA)")}
sql = generate_sql(records, table="test_table")
assert "DROP TABLE IF EXISTS test_table;" in sql
assert "CREATE TABLE IF NOT EXISTS test_table" in sql
assert "INSERT INTO test_table" in sql
assert "ISS (ZARYA)" in sql
assert "E'" in sql # escape string syntax
assert "\\n" in sql # newline between TLE lines
def test_no_drop(self):
records = {1: _make_record(1)}
sql = generate_sql(records, table="t", drop_existing=False)
assert "DROP TABLE" not in sql
def test_sorted_by_norad(self):
records = {
99999: _make_record(99999),
1: _make_record(1),
25544: _make_record(25544),
}
sql = generate_sql(records)
lines = [line for line in sql.split("\n") if line.startswith("INSERT")]
assert len(lines) == 3
# First INSERT should be NORAD 1 (lowest)
assert "SAT-1" in lines[0]
def test_name_escaping(self):
records = {1: _make_record(1, "O'BRIEN SAT")}
sql = generate_sql(records)
assert "O''BRIEN SAT" in sql
def test_default_name(self):
"""When TLERecord has no name, SQL output uses 'NORAD {id}'."""
rec = _make_record(1)
rec.name = "" # clear the name to test fallback
records = {1: rec}
sql = generate_sql(records)
assert "NORAD 1" in sql
class TestTLEFileGeneration:
def test_3le_format(self):
records = {25544: _make_record(25544, "ISS (ZARYA)")}
text = generate_3le(records)
lines = text.strip().split("\n")
assert len(lines) == 3
assert lines[0].startswith("0 ")
assert lines[1].startswith("1 ")
assert lines[2].startswith("2 ")
def test_sorted_output(self):
records = {
2: _make_record(2, "B"),
1: _make_record(1, "A"),
}
text = generate_3le(records)
lines = text.strip().split("\n")
assert "A" in lines[0]
def test_name_prefix(self):
records = {1: _make_record(1, "0 ALREADY PREFIXED")}
text = generate_3le(records)
# Should not double-prefix
assert "0 0 ALREADY PREFIXED" not in text
assert "0 ALREADY PREFIXED" in text
class TestJSONGeneration:
def test_valid_json(self):
records = {25544: _make_record(25544, "ISS")}
text = generate_json(records)
data = json.loads(text)
assert isinstance(data, list)
assert len(data) == 1
assert data[0]["norad_id"] == 25544
assert data[0]["name"] == "ISS"
def test_fields_present(self):
records = {1: _make_record(1)}
data = json.loads(generate_json(records))
entry = data[0]
for field in ("norad_id", "name", "line1", "line2", "epoch", "mean_motion",
"inclination", "eccentricity", "regime", "source"):
assert field in entry
def test_regime_classification(self):
records = {1: _make_record(1)} # mean_motion ~15.49 → LEO
data = json.loads(generate_json(records))
assert data[0]["regime"] == "LEO"

53
tests/test_regime.py Normal file
View File

@ -0,0 +1,53 @@
"""Tests for orbital regime classification."""
from pg_orrery_catalog.regime import classify_regime, regime_summary
from pg_orrery_catalog.tle import TLERecord
def _make_record_with_mm(norad_id: int, mean_motion: float) -> TLERecord:
"""Create a TLERecord with a specific mean motion in line2."""
line1 = f"1 {norad_id:05d}U 98067A 24001.50000000 .00000000 00000-0 00000-0 0 9990"
mm_str = f"{mean_motion:011.8f}"
line2 = f"2 {norad_id:05d} 51.6400 100.0000 0007417 30.0000 330.1234 {mm_str}999990"
return TLERecord(
line1=line1, line2=line2, name=f"SAT-{norad_id}",
norad_id=norad_id, epoch=24001.0,
)
class TestClassifyRegime:
def test_leo(self):
assert classify_regime(15.5) == "LEO" # ISS-like
assert classify_regime(11.26) == "LEO" # boundary
def test_meo(self):
assert classify_regime(2.0) == "MEO" # GPS-like
assert classify_regime(11.25) == "MEO" # just below LEO
def test_geo(self):
assert classify_regime(1.0) == "GEO" # near-synchronous
assert classify_regime(0.91) == "GEO"
def test_heo(self):
assert classify_regime(0.5) == "HEO" # Molniya-like
assert classify_regime(0.9) == "HEO" # boundary
assert classify_regime(0.1) == "HEO" # deep space
class TestRegimeSummary:
def test_mixed(self):
records = {
1: _make_record_with_mm(1, 15.5), # LEO
2: _make_record_with_mm(2, 2.0), # MEO
3: _make_record_with_mm(3, 1.0), # GEO
4: _make_record_with_mm(4, 0.5), # HEO
}
summary = regime_summary(records)
assert summary["LEO"] == 1
assert summary["MEO"] == 1
assert summary["GEO"] == 1
assert summary["HEO"] == 1
def test_empty(self):
summary = regime_summary({})
assert summary == {"LEO": 0, "MEO": 0, "GEO": 0, "HEO": 0}

234
tests/test_tle.py Normal file
View File

@ -0,0 +1,234 @@
"""Tests for TLE parsing and NORAD ID decoding.
decode_norad() must match get_norad_number() in pg_orrery's src/sgp4/get_el.c.
Test vectors verified against the C implementation.
"""
import textwrap
import pytest
from pg_orrery_catalog.tle import (
_base64_to_int,
decode_norad,
parse_3le_file,
parse_3le_text,
)
# ── base64_to_int ────────────────────────────────────────────
class TestBase64ToInt:
def test_digits(self):
for i in range(10):
assert _base64_to_int(str(i)) == i
def test_uppercase(self):
assert _base64_to_int("A") == 10
assert _base64_to_int("Z") == 35
def test_lowercase(self):
assert _base64_to_int("a") == 36
assert _base64_to_int("z") == 61
def test_special(self):
assert _base64_to_int(" ") == 0
assert _base64_to_int("+") == 62
assert _base64_to_int("-") == 63
def test_invalid(self):
assert _base64_to_int("!") == -1
assert _base64_to_int("@") == -1
# ── decode_norad: Case (1) traditional 5-digit ──────────────
class TestDecodeNoradTraditional:
def test_simple(self):
assert decode_norad("00001") == 1
assert decode_norad("25544") == 25544
assert decode_norad("99999") == 99999
def test_with_spaces(self):
assert decode_norad(" 5") == 5
assert decode_norad(" 405") == 405
def test_zero(self):
assert decode_norad("00000") == 0
# ── decode_norad: Case (2) Alpha-5 ──────────────────────────
class TestDecodeNoradAlpha5:
"""Alpha-5 encoding: letter + 4 digits, I and O skipped.
A=10, B=11, ..., H=17, J=18 (I skipped), ..., N=22, P=23 (O skipped), ..., Z=33
value = (letter_value - 10) * 10000 + digits + 100000
But the skip logic means: A0, B1, ..., H7, J8, K9, ..., N12, P13, ..., Z23
So: A0000 = 100000, Z9999 = 339999
"""
def test_a0001(self):
# A → val=0, result = 0*10000 + 1 + 100000 = 100001
assert decode_norad("A0001") == 100001
def test_a0000(self):
assert decode_norad("A0000") == 100000
def test_t0002(self):
# T → ord('T')-ord('A') = 19, minus 1 for I, minus 1 for O = 17
# 17 * 10000 + 2 + 100000 = 270002
assert decode_norad("T0002") == 270002
def test_z9999(self):
# Z → ord('Z')-ord('A') = 25, minus 1 for I, minus 1 for O = 23
# 23 * 10000 + 9999 + 100000 = 339999
assert decode_norad("Z9999") == 339999
def test_b0000(self):
assert decode_norad("B0000") == 110000
def test_h9999(self):
# H → 7, skip nothing before I
assert decode_norad("H9999") == 179999
def test_j0000(self):
# J → ord('J')-ord('A')=9, minus 1 for I = 8
assert decode_norad("J0000") == 180000
def test_p0000(self):
# P → ord('P')-ord('A')=15, minus 1 for I, minus 1 for O = 13
assert decode_norad("P0000") == 230000
# ── decode_norad: Case (3) Super-5 (last char uppercase) ────
class TestDecodeNoradSuper5Case3:
"""Case (3): xxxxX — last character is non-digit base64.
rval = 340000 + (digits[4] - 10) + 54 * (d3 + d2*64 + d1*64^2 + d0*64^3)
"""
def test_0000A(self):
# digits = [0, 0, 0, 0, 10]
# rval = 340000 + (10-10) + 54*(0 + 0 + 0 + 0) = 340000
assert decode_norad("0000A") == 340000
def test_0000B(self):
# digits = [0, 0, 0, 0, 11]
# rval = 340000 + 1 = 340001
assert decode_norad("0000B") == 340001
def test_0001A(self):
# digits = [0, 0, 0, 1, 10]
# rval = 340000 + 0 + 54*(1) = 340054
assert decode_norad("0001A") == 340054
# ── decode_norad: Case (4) Super-5 (4th char non-digit) ─────
class TestDecodeNoradSuper5Case4:
"""Case (4): xxxXd — 4th character is non-digit.
rval = 340000 + 905969664 + d4 + (d3-10)*10 + 540*(d2 + d1*64 + d0*64^2)
"""
def test_000A0(self):
# digits = [0, 0, 0, 10, 0]
# rval = 340000 + 905969664 + 0 + 0 + 0 = 906309664
assert decode_norad("000A0") == 906309664
# ── decode_norad: edge cases ────────────────────────────────
class TestDecodeNoradEdgeCases:
def test_empty(self):
assert decode_norad("") is None
assert decode_norad(" ") is None
def test_short(self):
assert decode_norad("123") is None
def test_invalid_chars(self):
assert decode_norad("!@#$%") is None
# ── parse_3le_text ──────────────────────────────────────────
class TestParse3LE:
SAMPLE_3LE = textwrap.dedent("""\
ISS (ZARYA)
1 25544U 98067A 24001.50000000 .00016717 00000-0 10270-3 0 9018
2 25544 51.6400 100.0000 0007417 30.0000 330.1234 15.49456789999990
STARLINK-1234
1 T0002U 20001A 24002.50000000 .00001234 00000-0 12345-4 0 9990
2 T0002 53.0000 200.0000 0001234 45.0000 315.0000 15.12345678123456
""")
def test_parses_standard_norad(self):
records = parse_3le_text(self.SAMPLE_3LE)
assert 25544 in records
assert records[25544].name == "ISS (ZARYA)"
def test_parses_alpha5_norad(self):
records = parse_3le_text(self.SAMPLE_3LE)
# T0002 → 270002
assert 270002 in records
assert records[270002].name == "STARLINK-1234"
def test_epoch_extraction(self):
records = parse_3le_text(self.SAMPLE_3LE)
assert records[25544].epoch == pytest.approx(24001.5, rel=1e-6)
def test_record_properties(self):
records = parse_3le_text(self.SAMPLE_3LE)
iss = records[25544]
assert iss.mean_motion == pytest.approx(15.49456789, rel=1e-6)
assert iss.inclination == pytest.approx(51.64, rel=1e-2)
def test_2le_format(self):
"""2LE format (no name lines) should still parse."""
tle_2le = textwrap.dedent("""\
1 25544U 98067A 24001.50000000 .00016717 00000-0 10270-3 0 9018
2 25544 51.6400 100.0000 0007417 30.0000 330.1234 15.49456789999990
""")
records = parse_3le_text(tle_2le)
assert 25544 in records
assert records[25544].name == ""
def test_epoch_dedup(self):
"""When same NORAD ID appears twice, newest epoch wins."""
tle = textwrap.dedent("""\
OLD
1 25544U 98067A 24001.50000000 .00016717 00000-0 10270-3 0 9018
2 25544 51.6400 100.0000 0007417 30.0000 330.1234 15.49456789999990
NEW
1 25544U 98067A 24005.50000000 .00016717 00000-0 10270-3 0 9018
2 25544 51.6400 100.0000 0007417 30.0000 330.1234 15.49456789999990
""")
records = parse_3le_text(tle)
assert records[25544].name == "NEW"
assert records[25544].epoch == pytest.approx(24005.5, rel=1e-6)
def test_source_propagation(self):
records = parse_3le_text(self.SAMPLE_3LE, source="test-source")
assert records[25544].source == "test-source"
# ── parse_3le_file ──────────────────────────────────────────
class TestParse3LEFile:
def test_missing_file(self, capsys):
records = parse_3le_file("/nonexistent/file.tle")
assert records == {}
def test_file_roundtrip(self, tmp_path):
tle_content = textwrap.dedent("""\
TEST SAT
1 00001U 58001A 24001.50000000 .00000000 00000-0 00000-0 0 9990
2 00001 30.0000 100.0000 0010000 45.0000 315.0000 15.00000000999990
""")
p = tmp_path / "test.tle"
p.write_text(tle_content)
records = parse_3le_file(p)
assert 1 in records
assert records[1].name == "TEST SAT"

294
uv.lock generated Normal file
View File

@ -0,0 +1,294 @@
version = 1
revision = 3
requires-python = ">=3.11"
[[package]]
name = "anyio"
version = "4.12.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "idna" },
{ name = "typing-extensions", marker = "python_full_version < '3.13'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/96/f0/5eb65b2bb0d09ac6776f2eb54adee6abe8228ea05b20a5ad0e4945de8aac/anyio-4.12.1.tar.gz", hash = "sha256:41cfcc3a4c85d3f05c932da7c26d0201ac36f72abd4435ba90d0464a3ffed703", size = 228685, upload-time = "2026-01-06T11:45:21.246Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/38/0e/27be9fdef66e72d64c0cdc3cc2823101b80585f8119b5c112c2e8f5f7dab/anyio-4.12.1-py3-none-any.whl", hash = "sha256:d405828884fc140aa80a3c667b8beed277f1dfedec42ba031bd6ac3db606ab6c", size = 113592, upload-time = "2026-01-06T11:45:19.497Z" },
]
[[package]]
name = "certifi"
version = "2026.1.4"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/e0/2d/a891ca51311197f6ad14a7ef42e2399f36cf2f9bd44752b3dc4eab60fdc5/certifi-2026.1.4.tar.gz", hash = "sha256:ac726dd470482006e014ad384921ed6438c457018f4b3d204aea4281258b2120", size = 154268, upload-time = "2026-01-04T02:42:41.825Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e6/ad/3cc14f097111b4de0040c83a525973216457bbeeb63739ef1ed275c1c021/certifi-2026.1.4-py3-none-any.whl", hash = "sha256:9943707519e4add1115f44c2bc244f782c0249876bf51b6599fee1ffbedd685c", size = 152900, upload-time = "2026-01-04T02:42:40.15Z" },
]
[[package]]
name = "click"
version = "8.3.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/3d/fa/656b739db8587d7b5dfa22e22ed02566950fbfbcdc20311993483657a5c0/click-8.3.1.tar.gz", hash = "sha256:12ff4785d337a1bb490bb7e9c2b1ee5da3112e94a8622f26a6c77f5d2fc6842a", size = 295065, upload-time = "2025-11-15T20:45:42.706Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/98/78/01c019cdb5d6498122777c1a43056ebb3ebfeef2076d9d026bfe15583b2b/click-8.3.1-py3-none-any.whl", hash = "sha256:981153a64e25f12d547d3426c367a4857371575ee7ad18df2a6183ab0545b2a6", size = 108274, upload-time = "2025-11-15T20:45:41.139Z" },
]
[[package]]
name = "colorama"
version = "0.4.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "h11"
version = "0.16.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/01/ee/02a2c011bdab74c6fb3c75474d40b3052059d95df7e73351460c8588d963/h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1", size = 101250, upload-time = "2025-04-24T03:35:25.427Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" },
]
[[package]]
name = "httpcore"
version = "1.0.9"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "h11" },
]
sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484, upload-time = "2025-04-24T22:06:22.219Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784, upload-time = "2025-04-24T22:06:20.566Z" },
]
[[package]]
name = "httpx"
version = "0.28.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "certifi" },
{ name = "httpcore" },
{ name = "idna" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406, upload-time = "2024-12-06T15:37:23.222Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" },
]
[package.optional-dependencies]
socks = [
{ name = "socksio" },
]
[[package]]
name = "idna"
version = "3.11"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/6f/6d/0703ccc57f3a7233505399edb88de3cbd678da106337b9fcde432b65ed60/idna-3.11.tar.gz", hash = "sha256:795dafcc9c04ed0c1fb032c2aa73654d8e8c5023a7df64a53f39190ada629902", size = 194582, upload-time = "2025-10-12T14:55:20.501Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0e/61/66938bbb5fc52dbdf84594873d5b51fb1f7c7794e9c0f5bd885f30bc507b/idna-3.11-py3-none-any.whl", hash = "sha256:771a87f49d9defaf64091e6e6fe9c18d4833f140bd19464795bc32d966ca37ea", size = 71008, upload-time = "2025-10-12T14:55:18.883Z" },
]
[[package]]
name = "iniconfig"
version = "2.3.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/72/34/14ca021ce8e5dfedc35312d08ba8bf51fdd999c576889fc2c24cb97f4f10/iniconfig-2.3.0.tar.gz", hash = "sha256:c76315c77db068650d49c5b56314774a7804df16fee4402c1f19d6d15d8c4730", size = 20503, upload-time = "2025-10-18T21:55:43.219Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl", hash = "sha256:f631c04d2c48c52b84d0d0549c99ff3859c98df65b3101406327ecc7d53fbf12", size = 7484, upload-time = "2025-10-18T21:55:41.639Z" },
]
[[package]]
name = "markdown-it-py"
version = "4.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "mdurl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/5b/f5/4ec618ed16cc4f8fb3b701563655a69816155e79e24a17b651541804721d/markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3", size = 73070, upload-time = "2025-08-11T12:57:52.854Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" },
]
[[package]]
name = "mdurl"
version = "0.1.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729, upload-time = "2022-08-14T12:40:10.846Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" },
]
[[package]]
name = "packaging"
version = "26.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/65/ee/299d360cdc32edc7d2cf530f3accf79c4fca01e96ffc950d8a52213bd8e4/packaging-26.0.tar.gz", hash = "sha256:00243ae351a257117b6a241061796684b084ed1c516a08c48a3f7e147a9d80b4", size = 143416, upload-time = "2026-01-21T20:50:39.064Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/b9/c538f279a4e237a006a2c98387d081e9eb060d203d8ed34467cc0f0b9b53/packaging-26.0-py3-none-any.whl", hash = "sha256:b36f1fef9334a5588b4166f8bcd26a14e521f2b55e6b9de3aaa80d3ff7a37529", size = 74366, upload-time = "2026-01-21T20:50:37.788Z" },
]
[[package]]
name = "pg-orrery-catalog"
version = "2025.2.18"
source = { editable = "." }
dependencies = [
{ name = "click" },
{ name = "httpx", extra = ["socks"] },
{ name = "rich" },
]
[package.optional-dependencies]
dev = [
{ name = "pytest" },
{ name = "pytest-asyncio" },
{ name = "ruff" },
]
pg = [
{ name = "psycopg" },
]
[package.metadata]
requires-dist = [
{ name = "click", specifier = ">=8.1" },
{ name = "httpx", specifier = ">=0.27" },
{ name = "httpx", extras = ["socks"] },
{ name = "psycopg", marker = "extra == 'pg'", specifier = ">=3.1" },
{ name = "pytest", marker = "extra == 'dev'" },
{ name = "pytest-asyncio", marker = "extra == 'dev'" },
{ name = "rich", specifier = ">=13.0" },
{ name = "ruff", marker = "extra == 'dev'" },
]
provides-extras = ["pg", "dev"]
[[package]]
name = "pluggy"
version = "1.6.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
]
[[package]]
name = "psycopg"
version = "3.3.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "typing-extensions", marker = "python_full_version < '3.13'" },
{ name = "tzdata", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/e0/1a/7d9ef4fdc13ef7f15b934c393edc97a35c281bb7d3c3329fbfcbe915a7c2/psycopg-3.3.2.tar.gz", hash = "sha256:707a67975ee214d200511177a6a80e56e654754c9afca06a7194ea6bbfde9ca7", size = 165630, upload-time = "2025-12-06T17:34:53.899Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/8c/51/2779ccdf9305981a06b21a6b27e8547c948d85c41c76ff434192784a4c93/psycopg-3.3.2-py3-none-any.whl", hash = "sha256:3e94bc5f4690247d734599af56e51bae8e0db8e4311ea413f801fef82b14a99b", size = 212774, upload-time = "2025-12-06T17:31:41.414Z" },
]
[[package]]
name = "pygments"
version = "2.19.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" },
]
[[package]]
name = "pytest"
version = "9.0.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "iniconfig" },
{ name = "packaging" },
{ name = "pluggy" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d1/db/7ef3487e0fb0049ddb5ce41d3a49c235bf9ad299b6a25d5780a89f19230f/pytest-9.0.2.tar.gz", hash = "sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11", size = 1568901, upload-time = "2025-12-06T21:30:51.014Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" },
]
[[package]]
name = "pytest-asyncio"
version = "1.3.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pytest" },
{ name = "typing-extensions", marker = "python_full_version < '3.13'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/90/2c/8af215c0f776415f3590cac4f9086ccefd6fd463befeae41cd4d3f193e5a/pytest_asyncio-1.3.0.tar.gz", hash = "sha256:d7f52f36d231b80ee124cd216ffb19369aa168fc10095013c6b014a34d3ee9e5", size = 50087, upload-time = "2025-11-10T16:07:47.256Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl", hash = "sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5", size = 15075, upload-time = "2025-11-10T16:07:45.537Z" },
]
[[package]]
name = "rich"
version = "14.3.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/74/99/a4cab2acbb884f80e558b0771e97e21e939c5dfb460f488d19df485e8298/rich-14.3.2.tar.gz", hash = "sha256:e712f11c1a562a11843306f5ed999475f09ac31ffb64281f73ab29ffdda8b3b8", size = 230143, upload-time = "2026-02-01T16:20:47.908Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ef/45/615f5babd880b4bd7d405cc0dc348234c5ffb6ed1ea33e152ede08b2072d/rich-14.3.2-py3-none-any.whl", hash = "sha256:08e67c3e90884651da3239ea668222d19bea7b589149d8014a21c633420dbb69", size = 309963, upload-time = "2026-02-01T16:20:46.078Z" },
]
[[package]]
name = "ruff"
version = "0.15.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/04/dc/4e6ac71b511b141cf626357a3946679abeba4cf67bc7cc5a17920f31e10d/ruff-0.15.1.tar.gz", hash = "sha256:c590fe13fb57c97141ae975c03a1aedb3d3156030cabd740d6ff0b0d601e203f", size = 4540855, upload-time = "2026-02-12T23:09:09.998Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/23/bf/e6e4324238c17f9d9120a9d60aa99a7daaa21204c07fcd84e2ef03bb5fd1/ruff-0.15.1-py3-none-linux_armv6l.whl", hash = "sha256:b101ed7cf4615bda6ffe65bdb59f964e9f4a0d3f85cbf0e54f0ab76d7b90228a", size = 10367819, upload-time = "2026-02-12T23:09:03.598Z" },
{ url = "https://files.pythonhosted.org/packages/b3/ea/c8f89d32e7912269d38c58f3649e453ac32c528f93bb7f4219258be2e7ed/ruff-0.15.1-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:939c995e9277e63ea632cc8d3fae17aa758526f49a9a850d2e7e758bfef46602", size = 10798618, upload-time = "2026-02-12T23:09:22.928Z" },
{ url = "https://files.pythonhosted.org/packages/5e/0f/1d0d88bc862624247d82c20c10d4c0f6bb2f346559d8af281674cf327f15/ruff-0.15.1-py3-none-macosx_11_0_arm64.whl", hash = "sha256:1d83466455fdefe60b8d9c8df81d3c1bbb2115cede53549d3b522ce2bc703899", size = 10148518, upload-time = "2026-02-12T23:08:58.339Z" },
{ url = "https://files.pythonhosted.org/packages/f5/c8/291c49cefaa4a9248e986256df2ade7add79388fe179e0691be06fae6f37/ruff-0.15.1-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a9457e3c3291024866222b96108ab2d8265b477e5b1534c7ddb1810904858d16", size = 10518811, upload-time = "2026-02-12T23:09:31.865Z" },
{ url = "https://files.pythonhosted.org/packages/c3/1a/f5707440e5ae43ffa5365cac8bbb91e9665f4a883f560893829cf16a606b/ruff-0.15.1-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:92c92b003e9d4f7fbd33b1867bb15a1b785b1735069108dfc23821ba045b29bc", size = 10196169, upload-time = "2026-02-12T23:09:17.306Z" },
{ url = "https://files.pythonhosted.org/packages/2a/ff/26ddc8c4da04c8fd3ee65a89c9fb99eaa5c30394269d424461467be2271f/ruff-0.15.1-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fe5c41ab43e3a06778844c586251eb5a510f67125427625f9eb2b9526535779", size = 10990491, upload-time = "2026-02-12T23:09:25.503Z" },
{ url = "https://files.pythonhosted.org/packages/fc/00/50920cb385b89413f7cdb4bb9bc8fc59c1b0f30028d8bccc294189a54955/ruff-0.15.1-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:66a6dd6df4d80dc382c6484f8ce1bcceb55c32e9f27a8b94c32f6c7331bf14fb", size = 11843280, upload-time = "2026-02-12T23:09:19.88Z" },
{ url = "https://files.pythonhosted.org/packages/5d/6d/2f5cad8380caf5632a15460c323ae326f1e1a2b5b90a6ee7519017a017ca/ruff-0.15.1-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:6a4a42cbb8af0bda9bcd7606b064d7c0bc311a88d141d02f78920be6acb5aa83", size = 11274336, upload-time = "2026-02-12T23:09:14.907Z" },
{ url = "https://files.pythonhosted.org/packages/a3/1d/5f56cae1d6c40b8a318513599b35ea4b075d7dc1cd1d04449578c29d1d75/ruff-0.15.1-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4ab064052c31dddada35079901592dfba2e05f5b1e43af3954aafcbc1096a5b2", size = 11137288, upload-time = "2026-02-12T23:09:07.475Z" },
{ url = "https://files.pythonhosted.org/packages/cd/20/6f8d7d8f768c93b0382b33b9306b3b999918816da46537d5a61635514635/ruff-0.15.1-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:5631c940fe9fe91f817a4c2ea4e81f47bee3ca4aa646134a24374f3c19ad9454", size = 11070681, upload-time = "2026-02-12T23:08:55.43Z" },
{ url = "https://files.pythonhosted.org/packages/9a/67/d640ac76069f64cdea59dba02af2e00b1fa30e2103c7f8d049c0cff4cafd/ruff-0.15.1-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:68138a4ba184b4691ccdc39f7795c66b3c68160c586519e7e8444cf5a53e1b4c", size = 10486401, upload-time = "2026-02-12T23:09:27.927Z" },
{ url = "https://files.pythonhosted.org/packages/65/3d/e1429f64a3ff89297497916b88c32a5cc88eeca7e9c787072d0e7f1d3e1e/ruff-0.15.1-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:518f9af03bfc33c03bdb4cb63fabc935341bb7f54af500f92ac309ecfbba6330", size = 10197452, upload-time = "2026-02-12T23:09:12.147Z" },
{ url = "https://files.pythonhosted.org/packages/78/83/e2c3bade17dad63bf1e1c2ffaf11490603b760be149e1419b07049b36ef2/ruff-0.15.1-py3-none-musllinux_1_2_i686.whl", hash = "sha256:da79f4d6a826caaea95de0237a67e33b81e6ec2e25fc7e1993a4015dffca7c61", size = 10693900, upload-time = "2026-02-12T23:09:34.418Z" },
{ url = "https://files.pythonhosted.org/packages/a1/27/fdc0e11a813e6338e0706e8b39bb7a1d61ea5b36873b351acee7e524a72a/ruff-0.15.1-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:3dd86dccb83cd7d4dcfac303ffc277e6048600dfc22e38158afa208e8bf94a1f", size = 11227302, upload-time = "2026-02-12T23:09:36.536Z" },
{ url = "https://files.pythonhosted.org/packages/f6/58/ac864a75067dcbd3b95be5ab4eb2b601d7fbc3d3d736a27e391a4f92a5c1/ruff-0.15.1-py3-none-win32.whl", hash = "sha256:660975d9cb49b5d5278b12b03bb9951d554543a90b74ed5d366b20e2c57c2098", size = 10462555, upload-time = "2026-02-12T23:09:29.899Z" },
{ url = "https://files.pythonhosted.org/packages/e0/5e/d4ccc8a27ecdb78116feac4935dfc39d1304536f4296168f91ed3ec00cd2/ruff-0.15.1-py3-none-win_amd64.whl", hash = "sha256:c820fef9dd5d4172a6570e5721704a96c6679b80cf7be41659ed439653f62336", size = 11599956, upload-time = "2026-02-12T23:09:01.157Z" },
{ url = "https://files.pythonhosted.org/packages/2a/07/5bda6a85b220c64c65686bc85bd0bbb23b29c62b3a9f9433fa55f17cda93/ruff-0.15.1-py3-none-win_arm64.whl", hash = "sha256:5ff7d5f0f88567850f45081fac8f4ec212be8d0b963e385c3f7d0d2eb4899416", size = 10874604, upload-time = "2026-02-12T23:09:05.515Z" },
]
[[package]]
name = "socksio"
version = "1.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f8/5c/48a7d9495be3d1c651198fd99dbb6ce190e2274d0f28b9051307bdec6b85/socksio-1.0.0.tar.gz", hash = "sha256:f88beb3da5b5c38b9890469de67d0cb0f9d494b78b106ca1845f96c10b91c4ac", size = 19055, upload-time = "2020-04-17T15:50:34.664Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/37/c3/6eeb6034408dac0fa653d126c9204ade96b819c936e136c5e8a6897eee9c/socksio-1.0.0-py3-none-any.whl", hash = "sha256:95dc1f15f9b34e8d7b16f06d74b8ccf48f609af32ab33c608d08761c5dcbb1f3", size = 12763, upload-time = "2020-04-17T15:50:31.878Z" },
]
[[package]]
name = "typing-extensions"
version = "4.15.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/72/94/1a15dd82efb362ac84269196e94cf00f187f7ed21c242792a923cdb1c61f/typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466", size = 109391, upload-time = "2025-08-25T13:49:26.313Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" },
]
[[package]]
name = "tzdata"
version = "2025.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/5e/a7/c202b344c5ca7daf398f3b8a477eeb205cf3b6f32e7ec3a6bac0629ca975/tzdata-2025.3.tar.gz", hash = "sha256:de39c2ca5dc7b0344f2eba86f49d614019d29f060fc4ebc8a417896a620b56a7", size = 196772, upload-time = "2025-12-13T17:45:35.667Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/b0/003792df09decd6849a5e39c28b513c06e84436a54440380862b5aeff25d/tzdata-2025.3-py2.py3-none-any.whl", hash = "sha256:06a47e5700f3081aab02b2e513160914ff0694bce9947d6b76ebd6bf57cfc5d1", size = 348521, upload-time = "2025-12-13T17:45:33.889Z" },
]