"""Tests for catalog merge and deduplication logic.""" import textwrap from pg_orrery_catalog.catalog import merge_records, merge_sources from pg_orrery_catalog.tle import TLERecord def _make_record(norad_id: int, epoch: float, source: str = "") -> TLERecord: """Helper to create a minimal TLERecord for testing.""" line1 = f"1 {norad_id:05d}U 98067A {epoch:014.8f} .00000000 00000-0 00000-0 0 9990" line2 = f"2 {norad_id:05d} 51.6400 100.0000 0007417 30.0000 330.1234 15.49456789999990" return TLERecord( line1=line1, line2=line2, name=f"SAT-{norad_id}", norad_id=norad_id, epoch=epoch, source=source, ) class TestMergeRecords: def test_merge_disjoint(self): a = {1: _make_record(1, 24001.0, "A")} b = {2: _make_record(2, 24002.0, "B")} merged = merge_records(a, b) assert len(merged) == 2 assert 1 in merged assert 2 in merged def test_merge_newer_wins(self): old = {1: _make_record(1, 24001.0, "old")} new = {1: _make_record(1, 24005.0, "new")} merged = merge_records(old, new) assert len(merged) == 1 assert merged[1].source == "new" assert merged[1].epoch == 24005.0 def test_merge_older_loses(self): new = {1: _make_record(1, 24005.0, "new")} old = {1: _make_record(1, 24001.0, "old")} merged = merge_records(new, old) assert merged[1].source == "new" def test_merge_three_sources(self): a = {1: _make_record(1, 24001.0, "A"), 2: _make_record(2, 24001.0, "A")} b = {1: _make_record(1, 24003.0, "B"), 3: _make_record(3, 24002.0, "B")} c = {1: _make_record(1, 24002.0, "C")} # older than B merged = merge_records(a, b, c) assert len(merged) == 3 assert merged[1].source == "B" # newest epoch class TestMergeSources: def test_merge_files(self, tmp_path): tle1 = textwrap.dedent("""\ SAT-A 1 00001U 98001A 24001.50000000 .00000000 00000-0 00000-0 0 9990 2 00001 30.0000 100.0000 0010000 45.0000 315.0000 15.00000000999990 """) tle2 = textwrap.dedent("""\ SAT-A-NEWER 1 00001U 98001A 24005.50000000 .00000000 00000-0 00000-0 0 9990 2 00001 30.0000 100.0000 0010000 45.0000 315.0000 15.00000000999990 SAT-B 1 00002U 98002A 24003.00000000 .00000000 00000-0 00000-0 0 9990 2 00002 45.0000 200.0000 0020000 90.0000 270.0000 14.50000000999990 """) f1 = tmp_path / "source1.tle" f2 = tmp_path / "source2.tle" f1.write_text(tle1) f2.write_text(tle2) merged, stats = merge_sources([f1, f2]) assert stats.total_unique == 2 assert len(stats.sources) == 2 # Source 2 should have 1 new (SAT-B) and 1 updated (SAT-A-NEWER) assert stats.sources[1].new == 1 assert stats.sources[1].updated == 1 # SAT-A should have the newer name assert merged[1].name == "SAT-A-NEWER"