"""Craft API client — stdlib-only HTTP client for space.warehack.ing. Provides satellite search, pass predictions, and real-time sky positions from the Craft orbital mechanics API. All methods are blocking — designed to be called from @work(thread=True) workers in the TUI. Uses urllib.request + json only (no requests/httpx dependency). """ import json import logging import time import urllib.error import urllib.parse import urllib.request from dataclasses import dataclass, field log = logging.getLogger(__name__) @dataclass class SearchResult: """A single result from the Craft search API.""" name: str target_type: str target_id: str # NORAD int for satellites, string for planets/stars/comets score: float = 0.0 groups: list[str] = field(default_factory=list) altitude_deg: float | None = None @dataclass class PassPrediction: """A single satellite pass prediction.""" satellite_name: str norad_id: int aos_time: str aos_az: float tca_time: str tca_alt: float tca_az: float los_time: str los_az: float max_elevation: float duration_seconds: int is_visible: bool = False magnitude: float | None = None @dataclass class TargetPosition: """Real-time position of an above-horizon object.""" name: str target_type: str target_id: str # NORAD int for satellites, string for planets/stars/comets azimuth: float altitude: float distance_km: float = 0.0 range_rate: float = 0.0 is_above_horizon: bool = True @dataclass class CraftTrackingState: """Tracking loop state mirroring TrackingState from rotctld_server.""" status: str = "IDLE" target_name: str = "" azimuth: float = 0.0 elevation: float = 0.0 distance_km: float = 0.0 range_rate: float = 0.0 moves: int = 0 rate: float = 0.0 error: str = "" _move_timestamps: list[float] = field(default_factory=list, repr=False) def record_move(self) -> None: self.moves += 1 now = time.monotonic() self._move_timestamps.append(now) cutoff = now - 30.0 self._move_timestamps = [t for t in self._move_timestamps if t > cutoff] elapsed = now - self._move_timestamps[0] if elapsed > 0: self.rate = len(self._move_timestamps) / elapsed else: self.rate = 0.0 class CraftClient: """HTTP client for the Craft orbital mechanics API. Args: base_url: Craft API base URL. timeout: HTTP request timeout in seconds. """ def __init__( self, base_url: str = "https://space.warehack.ing", timeout: float = 10.0, ) -> None: self._base_url = base_url.rstrip("/") self._timeout = timeout def _get(self, path: str, params: dict | None = None) -> dict: """Issue a GET request and return parsed JSON.""" url = f"{self._base_url}{path}" if params: url += "?" + urllib.parse.urlencode(params) req = urllib.request.Request(url, headers={"Accept": "application/json"}) with urllib.request.urlopen(req, timeout=self._timeout) as resp: return json.loads(resp.read().decode()) def health(self) -> bool: """Check if the Craft API is reachable.""" try: self._get("/api/search", {"q": "ISS", "limit": "1"}) return True except Exception: log.debug("Craft API health check failed", exc_info=True) return False def search(self, query: str, limit: int = 20) -> list[SearchResult]: """Search the Craft object catalog. Returns satellites, planets, stars, comets matching the query. """ try: data = self._get("/api/search", {"q": query, "limit": str(limit)}) except Exception: log.warning("Craft search failed for %r", query, exc_info=True) return [] results = [] for item in data.get("results", []): results.append( SearchResult( name=item.get("name", ""), target_type=item.get("target_type", ""), target_id=str(item.get("target_id", "")), score=float(item.get("score", 0)), groups=item.get("groups", []), altitude_deg=item.get("altitude_deg"), ) ) return results def get_passes(self, norad_id: int, hours: int = 24) -> list[PassPrediction]: """Get pass predictions for a satellite.""" try: data = self._get("/api/passes", {"sat": str(norad_id), "hours": str(hours)}) except Exception: log.warning("Craft passes failed for NORAD %d", norad_id, exc_info=True) return [] passes = [] for p in data.get("passes", []): passes.append( PassPrediction( satellite_name=p.get("satellite_name", ""), norad_id=int(p.get("norad_id", norad_id)), aos_time=p.get("aos_time", ""), aos_az=float(p.get("aos_az", 0)), tca_time=p.get("tca_time", ""), tca_alt=float(p.get("tca_alt", 0)), tca_az=float(p.get("tca_az", 0)), los_time=p.get("los_time", ""), los_az=float(p.get("los_az", 0)), max_elevation=float(p.get("max_elevation", 0)), duration_seconds=int(p.get("duration_seconds", 0)), is_visible=bool(p.get("is_visible", False)), magnitude=p.get("magnitude"), ) ) return passes def get_next_pass(self, norad_id: int) -> PassPrediction | None: """Get the next upcoming pass for a satellite.""" try: data = self._get("/api/passes/next", {"sat": str(norad_id)}) except Exception: log.warning("Craft next pass failed for NORAD %d", norad_id, exc_info=True) return None passes = data.get("passes", []) if not passes: return None p = passes[0] return PassPrediction( satellite_name=p.get("satellite_name", ""), norad_id=int(p.get("norad_id", norad_id)), aos_time=p.get("aos_time", ""), aos_az=float(p.get("aos_az", 0)), tca_time=p.get("tca_time", ""), tca_alt=float(p.get("tca_alt", 0)), tca_az=float(p.get("tca_az", 0)), los_time=p.get("los_time", ""), los_az=float(p.get("los_az", 0)), max_elevation=float(p.get("max_elevation", 0)), duration_seconds=int(p.get("duration_seconds", 0)), is_visible=bool(p.get("is_visible", False)), magnitude=p.get("magnitude"), ) def get_visible_targets(self, min_alt: float = 0.0) -> list[TargetPosition]: """Get all above-horizon objects with real-time AZ/EL positions. This is the primary tracking endpoint — returns 1000+ objects (satellites, planets, stars, comets) with positions computed server-side via Postgres SGP4. """ try: data = self._get("/api/sky/up", {"min_alt": str(min_alt)}) except Exception: log.warning("Craft sky/up failed", exc_info=True) return [] targets = [] for obj in data.get("objects", []): targets.append( TargetPosition( name=obj.get("name", ""), target_type=obj.get("target_type", ""), target_id=str(obj.get("target_id", "")), azimuth=float(obj.get("azimuth_deg", 0)), altitude=float(obj.get("altitude_deg", 0)), distance_km=float(obj.get("distance_km", 0)), range_rate=float(obj.get("range_rate_km_s", 0)), is_above_horizon=bool(obj.get("is_above_horizon", True)), ) ) return targets