Files
Sportstime/Scripts/cloudkit_import.py
Trey t 5a08659837 feat(05-02): add individual record management commands
Add commands for managing individual CloudKit records:
- --get TYPE ID: Retrieve and display single record
- --list TYPE [--count]: List all recordNames for a type
- --update-record TYPE ID FIELD=VALUE: Update fields with conflict handling
- --delete-record TYPE ID [--force]: Delete with confirmation

Features:
- Type validation against VALID_RECORD_TYPES
- Triple lookup fallback: direct -> deterministic UUID -> canonicalId query
- Automatic type parsing for numeric field values
- Conflict detection with automatic forceReplace retry
- Deletion confirmation (skip with --force)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 10:17:40 -06:00

2225 lines
95 KiB
Python
Executable File

#!/usr/bin/env python3
"""
CloudKit Import Script
======================
Imports canonical JSON data into CloudKit. Run after canonicalization pipeline.
Expected input files (from canonicalization pipeline):
- stadiums_canonical.json
- teams_canonical.json
- games_canonical.json OR canonical/games/*.json (new structure)
- stadium_aliases.json
- league_structure.json
- team_aliases.json
File Structure (Option B - by sport/season):
data/
games/ # Raw scraped games
mlb_2025.json
nba_2025.json
...
canonical/ # Canonicalized data
games/
mlb_2025.json
nba_2025.json
...
stadiums.json
games_canonical.json # Combined (backward compatibility)
stadiums_canonical.json
teams_canonical.json
Setup:
1. CloudKit Dashboard > Tokens & Keys > Server-to-Server Keys
2. Create key with Read/Write access to public database
3. Download .p8 file and note Key ID
Usage:
python cloudkit_import.py # Interactive menu
python cloudkit_import.py --dry-run # Preview first
python cloudkit_import.py --key-id XX --key-file key.p8 # Import all
python cloudkit_import.py --stadiums-only # Stadiums first
python cloudkit_import.py --games-only # All games
python cloudkit_import.py --games-files mlb_2025.json # Specific game file
python cloudkit_import.py --games-files mlb_2025.json,nba_2025.json # Multiple files
python cloudkit_import.py --stadium-aliases-only # Stadium aliases only
python cloudkit_import.py --delete-all # Delete then import
python cloudkit_import.py --delete-only # Delete only (no import)
"""
import argparse, json, time, os, sys, hashlib, base64, requests
from datetime import datetime, timezone
from pathlib import Path
try:
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
HAS_CRYPTO = True
except ImportError:
HAS_CRYPTO = False
CONTAINER = "iCloud.com.sportstime.app"
HOST = "https://api.apple-cloudkit.com"
BATCH_SIZE = 200
# Hardcoded credentials
DEFAULT_KEY_ID = "152be0715e0276e31aaea5cbfe79dc872f298861a55c70fae14e5fe3e026cff9"
DEFAULT_KEY_FILE = "eckey.pem"
def show_game_files_menu(data_dir: Path) -> list[str]:
"""Show available game files and let user select which to import."""
canonical_games_dir = data_dir / 'canonical' / 'games'
if not canonical_games_dir.exists():
print("\n No canonical/games/ directory found.")
return []
game_files = sorted(canonical_games_dir.glob('*.json'))
if not game_files:
print("\n No game files found in canonical/games/")
return []
print("\n" + "="*50)
print("Select Game Files to Import")
print("="*50)
print("\n Available files:")
for i, f in enumerate(game_files, 1):
# Count games in file
with open(f) as fp:
games = json.load(fp)
print(f" {i}. {f.name} ({len(games):,} games)")
print(f"\n a. All files")
print(f" 0. Cancel")
print()
while True:
try:
choice = input("Enter file numbers (comma-separated), 'a' for all, or 0 to cancel: ").strip().lower()
if choice == '0':
return []
if choice == 'a':
return [f.name for f in game_files]
# Parse comma-separated numbers
indices = [int(x.strip()) for x in choice.split(',')]
selected = []
for idx in indices:
if 1 <= idx <= len(game_files):
selected.append(game_files[idx-1].name)
else:
print(f"Invalid selection: {idx}")
continue
if selected:
return selected
print("No valid selections. Try again.")
except (ValueError, EOFError, KeyboardInterrupt):
print("\nCancelled.")
return []
def show_menu():
"""Show interactive menu and return selected action."""
print("\n" + "="*50)
print("CloudKit Import - Select Action")
print("="*50)
print("\n 1. Import all (stadiums, teams, games, league structure, team aliases, stadium aliases)")
print(" 2. Stadiums only")
print(" 3. Games only (all files)")
print(" 4. Games - select specific files")
print(" 5. League structure only")
print(" 6. Team aliases only")
print(" 7. Stadium aliases only")
print(" 8. Canonical only (league structure + team aliases + stadium aliases)")
print(" 9. Delete all then import")
print(" 10. Delete only (no import)")
print(" 11. Dry run (preview only)")
print(" 12. Smart sync (diff-based, only upload changes)")
print(" 13. Smart sync + delete orphans")
print(" 14. Verify sync (quick)")
print(" 15. Verify sync (deep)")
print(" 0. Exit")
print()
while True:
try:
choice = input("Enter choice [1-15, 0 to exit]: ").strip()
if choice == '0':
return None
if choice in ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15']:
return int(choice)
print("Invalid choice. Please enter 1-15 or 0.")
except (EOFError, KeyboardInterrupt):
print("\nExiting.")
return None
def deterministic_uuid(string: str) -> str:
"""
Generate a deterministic UUID from a string using SHA256.
Matches the StubDataProvider.deterministicUUID() implementation in Swift.
"""
# SHA256 hash of the string
hash_bytes = hashlib.sha256(string.encode('utf-8')).digest()
# Use first 16 bytes
uuid_bytes = bytearray(hash_bytes[:16])
# Set UUID version (4) and variant bits to match Swift implementation
uuid_bytes[6] = (uuid_bytes[6] & 0x0F) | 0x40
uuid_bytes[8] = (uuid_bytes[8] & 0x3F) | 0x80
# Format as UUID string
return f"{uuid_bytes[0:4].hex()}-{uuid_bytes[4:6].hex()}-{uuid_bytes[6:8].hex()}-{uuid_bytes[8:10].hex()}-{uuid_bytes[10:16].hex()}".upper()
class CloudKit:
def __init__(self, key_id, private_key, container, env):
self.key_id = key_id
self.private_key = private_key
self.path_base = f"/database/1/{container}/{env}/public"
def _sign(self, date, body, path):
key = serialization.load_pem_private_key(self.private_key, None, default_backend())
body_hash = base64.b64encode(hashlib.sha256(body.encode()).digest()).decode()
sig = key.sign(f"{date}:{body_hash}:{path}".encode(), ec.ECDSA(hashes.SHA256()))
return base64.b64encode(sig).decode()
def modify(self, operations):
path = f"{self.path_base}/records/modify"
body = json.dumps({'operations': operations})
date = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
headers = {
'Content-Type': 'application/json',
'X-Apple-CloudKit-Request-KeyID': self.key_id,
'X-Apple-CloudKit-Request-ISO8601Date': date,
'X-Apple-CloudKit-Request-SignatureV1': self._sign(date, body, path),
}
r = requests.post(f"{HOST}{path}", headers=headers, data=body, timeout=60)
if r.status_code == 200:
return r.json()
else:
try:
err = r.json()
reason = err.get('reason', 'Unknown')
code = err.get('serverErrorCode', r.status_code)
return {'error': f"{code}: {reason}"}
except:
return {'error': f"{r.status_code}: {r.text[:200]}"}
def query(self, record_type, limit=200, verbose=False):
"""Query records of a given type."""
path = f"{self.path_base}/records/query"
body = json.dumps({
'query': {'recordType': record_type},
'resultsLimit': limit
})
date = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
headers = {
'Content-Type': 'application/json',
'X-Apple-CloudKit-Request-KeyID': self.key_id,
'X-Apple-CloudKit-Request-ISO8601Date': date,
'X-Apple-CloudKit-Request-SignatureV1': self._sign(date, body, path),
}
if verbose:
print(f" Querying {record_type}...")
try:
r = requests.post(f"{HOST}{path}", headers=headers, data=body, timeout=30)
if verbose:
print(f" Response: {r.status_code}")
if r.status_code == 200:
result = r.json()
if verbose:
print(f" Found {len(result.get('records', []))} records")
return result
return {'error': f"{r.status_code}: {r.text[:200]}"}
except requests.exceptions.Timeout:
return {'error': 'Request timed out after 30s'}
except Exception as e:
return {'error': f"Request failed: {e}"}
def query_all(self, record_type, verbose=False):
"""Query all records of a given type with pagination."""
all_records = {}
continuation_marker = None
while True:
path = f"{self.path_base}/records/query"
query_body = {
'query': {'recordType': record_type},
'resultsLimit': 200
}
if continuation_marker:
query_body['continuationMarker'] = continuation_marker
body = json.dumps(query_body)
date = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
headers = {
'Content-Type': 'application/json',
'X-Apple-CloudKit-Request-KeyID': self.key_id,
'X-Apple-CloudKit-Request-ISO8601Date': date,
'X-Apple-CloudKit-Request-SignatureV1': self._sign(date, body, path),
}
if verbose:
page_num = len(all_records) // 200 + 1
print(f" Querying {record_type} (page {page_num})...")
try:
r = requests.post(f"{HOST}{path}", headers=headers, data=body, timeout=60)
if r.status_code != 200:
print(f" Query error: {r.status_code}: {r.text[:200]}")
break
result = r.json()
records = result.get('records', [])
for rec in records:
record_name = rec.get('recordName', '')
all_records[record_name] = rec
if verbose:
print(f" Found {len(records)} records (total: {len(all_records)})")
# Check for continuation
continuation_marker = result.get('continuationMarker')
if not continuation_marker:
break
time.sleep(0.3) # Rate limiting
except requests.exceptions.Timeout:
print(f" Query timeout after 60s")
break
except Exception as e:
print(f" Query failed: {e}")
break
return all_records
def lookup(self, record_type, record_names, verbose=False):
"""Lookup specific records by recordName."""
if not record_names:
return []
path = f"{self.path_base}/records/lookup"
records_to_lookup = [{'recordName': name} for name in record_names]
body = json.dumps({'records': records_to_lookup})
date = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
headers = {
'Content-Type': 'application/json',
'X-Apple-CloudKit-Request-KeyID': self.key_id,
'X-Apple-CloudKit-Request-ISO8601Date': date,
'X-Apple-CloudKit-Request-SignatureV1': self._sign(date, body, path),
}
if verbose:
print(f" Looking up {len(record_names)} {record_type} records...")
try:
r = requests.post(f"{HOST}{path}", headers=headers, data=body, timeout=30)
if r.status_code == 200:
result = r.json()
return result.get('records', [])
return {'error': f"{r.status_code}: {r.text[:200]}"}
except requests.exceptions.Timeout:
return {'error': 'Request timed out after 30s'}
except Exception as e:
return {'error': f"Request failed: {e}"}
def delete_all(self, record_type, verbose=False):
"""Delete all records of a given type."""
total_deleted = 0
while True:
result = self.query(record_type, verbose=verbose)
if 'error' in result:
print(f" Query error: {result['error']}")
break
records = result.get('records', [])
if not records:
break
# Build delete operations (recordChangeTag required for delete)
ops = [{
'operationType': 'delete',
'record': {
'recordName': r['recordName'],
'recordType': record_type,
'recordChangeTag': r.get('recordChangeTag', '')
}
} for r in records]
if verbose:
print(f" Sending delete for {len(ops)} records...")
delete_result = self.modify(ops)
if verbose:
print(f" Delete response: {json.dumps(delete_result)[:500]}")
if 'error' in delete_result:
print(f" Delete error: {delete_result['error']}")
break
# Check for individual record errors
result_records = delete_result.get('records', [])
successful = [r for r in result_records if 'serverErrorCode' not in r]
failed = [r for r in result_records if 'serverErrorCode' in r]
if failed and verbose:
print(f" Failed: {failed[0]}")
total_deleted += len(successful)
print(f" Deleted {len(successful)} {record_type} records" + (f" ({len(failed)} failed)" if failed else ""))
time.sleep(0.5)
return total_deleted
def import_data(ck, records, name, dry_run, verbose):
total = 0
errors = 0
for i in range(0, len(records), BATCH_SIZE):
batch = records[i:i+BATCH_SIZE]
ops = [{'operationType': 'forceReplace', 'record': r} for r in batch]
if verbose:
print(f" Batch {i//BATCH_SIZE + 1}: {len(batch)} records, {len(ops)} ops")
if not ops:
print(f" Warning: Empty batch at index {i}, skipping")
continue
if dry_run:
print(f" [DRY RUN] Would create {len(batch)} {name}")
total += len(batch)
else:
result = ck.modify(ops)
if 'error' in result:
errors += 1
if errors <= 3: # Only show first 3 errors
print(f" Error: {result['error']}")
if verbose and batch:
print(f" Sample record: {json.dumps(batch[0], indent=2)[:500]}")
if errors == 3:
print(" (suppressing further errors...)")
else:
result_records = result.get('records', [])
# Count only successful records (no serverErrorCode)
successful = [r for r in result_records if 'serverErrorCode' not in r]
failed = [r for r in result_records if 'serverErrorCode' in r]
n = len(successful)
total += n
print(f" Created {n} {name}")
if failed:
print(f" Failed {len(failed)} records: {failed[0].get('serverErrorCode')}: {failed[0].get('reason')}")
if verbose:
print(f" Response: {json.dumps(result, indent=2)[:1000]}")
time.sleep(0.5)
if errors > 0:
print(f" Total errors: {errors}")
return total
def fields_equal(local_value, cloud_value, field_type=None):
"""
Compare a local field value with a cloud field value.
Handles special cases for locations and references.
"""
# Handle location fields (compare with tolerance)
if isinstance(local_value, dict) and 'latitude' in local_value:
if not isinstance(cloud_value, dict) or 'latitude' not in cloud_value:
return False
lat_diff = abs(float(local_value['latitude']) - float(cloud_value['latitude']))
lng_diff = abs(float(local_value['longitude']) - float(cloud_value['longitude']))
return lat_diff < 0.0001 and lng_diff < 0.0001
# Handle reference fields (compare by recordName only)
if isinstance(local_value, dict) and 'recordName' in local_value:
if not isinstance(cloud_value, dict) or 'recordName' not in cloud_value:
return False
return local_value['recordName'] == cloud_value['recordName']
# Handle lists
if isinstance(local_value, list):
if not isinstance(cloud_value, list):
return False
return sorted(local_value) == sorted(cloud_value)
# Direct comparison for strings, ints, etc.
return local_value == cloud_value
def compute_diff(local_records, cloud_records, verbose=False):
"""
Compare local records against cloud records.
Returns dict with: 'new', 'updated', 'unchanged', 'deleted' lists.
Each item includes the record and (for updates) the cloud recordChangeTag.
"""
diff = {
'new': [],
'updated': [],
'unchanged': [],
'deleted': []
}
local_names = set(local_records.keys())
cloud_names = set(cloud_records.keys())
# New records: in local but not in cloud
for name in local_names - cloud_names:
diff['new'].append({'record': local_records[name]})
# Deleted records: in cloud but not in local
for name in cloud_names - local_names:
diff['deleted'].append({
'record': cloud_records[name],
'recordChangeTag': cloud_records[name].get('recordChangeTag', '')
})
# Check existing records for changes
for name in local_names & cloud_names:
local_rec = local_records[name]
cloud_rec = cloud_records[name]
local_fields = local_rec.get('fields', {})
cloud_fields = cloud_rec.get('fields', {})
# Compare all fields (except metadata like recordChangeTag)
has_diff = False
diff_fields = []
for field_name, field_data in local_fields.items():
local_value = field_data.get('value')
cloud_field = cloud_fields.get(field_name, {})
cloud_value = cloud_field.get('value')
if not fields_equal(local_value, cloud_value):
has_diff = True
diff_fields.append(field_name)
if has_diff:
diff['updated'].append({
'record': local_rec,
'recordChangeTag': cloud_rec.get('recordChangeTag', ''),
'changed_fields': diff_fields
})
if verbose:
print(f" Changed: {name} - fields: {', '.join(diff_fields)}")
else:
diff['unchanged'].append({'record': local_rec})
return diff
def show_diff_report(ck, data_dir, verbose=False):
"""Query CloudKit and show diff report for all record types."""
from pathlib import Path
data_dir = Path(data_dir)
print("\n" + "="*50)
print("CloudKit Diff Report")
print("="*50 + "\n")
# Load local data
stadiums = json.load(open(data_dir / 'stadiums_canonical.json')) if (data_dir / 'stadiums_canonical.json').exists() else []
teams = json.load(open(data_dir / 'teams_canonical.json')) if (data_dir / 'teams_canonical.json').exists() else []
# Load games from canonical/games/*.json
canonical_games_dir = data_dir / 'canonical' / 'games'
games = []
if canonical_games_dir.exists():
for games_file in sorted(canonical_games_dir.glob('*.json')):
with open(games_file) as f:
games.extend(json.load(f))
league_structure = json.load(open(data_dir / 'league_structure.json')) if (data_dir / 'league_structure.json').exists() else []
team_aliases = json.load(open(data_dir / 'team_aliases.json')) if (data_dir / 'team_aliases.json').exists() else []
stadium_aliases = json.load(open(data_dir / 'stadium_aliases.json')) if (data_dir / 'stadium_aliases.json').exists() else []
print(f"Local data: {len(stadiums)} stadiums, {len(teams)} teams, {len(games)} games")
print(f" {len(league_structure)} league structures, {len(team_aliases)} team aliases, {len(stadium_aliases)} stadium aliases\n")
# Build local record maps (same format as CloudKit records)
def build_stadium_records(stadiums):
records = {}
for s in stadiums:
stadium_id = s.get('canonical_id', s.get('id', ''))
record_name = deterministic_uuid(stadium_id)
team_abbrevs = s.get('primary_team_abbrevs', s.get('team_abbrevs', []))
fields = {
'stadiumId': {'value': record_name},
'canonicalId': {'value': stadium_id},
'name': {'value': s['name']},
'city': {'value': s['city']},
'state': {'value': s.get('state', '')},
'sport': {'value': s['sport']},
'source': {'value': s.get('source', 'canonical')},
'teamAbbrevs': {'value': team_abbrevs},
}
if s.get('latitude'):
fields['location'] = {'value': {'latitude': s['latitude'], 'longitude': s['longitude']}}
if s.get('capacity'):
fields['capacity'] = {'value': s['capacity']}
records[record_name] = {'recordType': 'Stadium', 'recordName': record_name, 'fields': fields}
return records
def build_team_records(teams):
records = {}
for t in teams:
team_id = t.get('canonical_id', '')
record_name = deterministic_uuid(team_id)
fields = {
'teamId': {'value': record_name},
'canonicalId': {'value': team_id},
'abbreviation': {'value': t['abbreviation']},
'name': {'value': t['name']},
'city': {'value': t['city']},
'sport': {'value': t['sport']},
'stadiumCanonicalId': {'value': t.get('stadium_canonical_id', '')},
}
if t.get('conference_id'):
fields['conferenceId'] = {'value': t['conference_id']}
if t.get('division_id'):
fields['divisionId'] = {'value': t['division_id']}
records[record_name] = {'recordType': 'Team', 'recordName': record_name, 'fields': fields}
return records
def build_game_records(games, stadiums):
records = {}
stadium_id_map = {s.get('canonical_id', s.get('id', '')): deterministic_uuid(s.get('canonical_id', s.get('id', ''))) for s in stadiums}
seen_ids = set()
for g in games:
game_id = g.get('canonical_id', g.get('id', ''))
if game_id in seen_ids:
continue
seen_ids.add(game_id)
game_uuid = deterministic_uuid(game_id)
sport = g['sport']
fields = {
'gameId': {'value': game_uuid},
'canonicalId': {'value': game_id},
'sport': {'value': sport},
'season': {'value': g.get('season', '')},
'source': {'value': g.get('source', 'canonical')},
}
# Parse date/time
if g.get('date'):
try:
time_str = g.get('time', '7:00p')
hour, minute = 19, 0
if time_str:
clean_time = time_str.lower().replace(' ', '')
is_pm = 'p' in clean_time
time_parts = clean_time.replace('p', '').replace('a', '').split(':')
if time_parts:
hour = int(time_parts[0])
if is_pm and hour != 12:
hour += 12
elif not is_pm and hour == 12:
hour = 0
if len(time_parts) > 1:
minute = int(time_parts[1])
dt = datetime.strptime(f"{g['date']} {hour:02d}:{minute:02d}", '%Y-%m-%d %H:%M')
fields['dateTime'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
except:
pass
# Team references
home_team_canonical_id = g.get('home_team_canonical_id', '')
away_team_canonical_id = g.get('away_team_canonical_id', '')
home_team_uuid = deterministic_uuid(home_team_canonical_id)
away_team_uuid = deterministic_uuid(away_team_canonical_id)
fields['homeTeamRef'] = {'value': {'recordName': home_team_uuid, 'action': 'NONE'}}
fields['awayTeamRef'] = {'value': {'recordName': away_team_uuid, 'action': 'NONE'}}
# Stadium reference
if g.get('stadium_canonical_id'):
stadium_canonical_id = g['stadium_canonical_id']
stadium_uuid = stadium_id_map.get(stadium_canonical_id)
if stadium_uuid:
fields['stadiumRef'] = {'value': {'recordName': stadium_uuid, 'action': 'NONE'}}
fields['stadiumCanonicalId'] = {'value': stadium_canonical_id}
records[game_uuid] = {'recordType': 'Game', 'recordName': game_uuid, 'fields': fields}
return records
def build_league_structure_records(league_structure):
records = {}
for ls in league_structure:
record_name = ls['id']
fields = {
'structureId': {'value': ls['id']},
'sport': {'value': ls['sport']},
'type': {'value': ls['type']},
'name': {'value': ls['name']},
'displayOrder': {'value': ls['display_order']},
'schemaVersion': {'value': 1},
}
if ls.get('abbreviation'):
fields['abbreviation'] = {'value': ls['abbreviation']}
if ls.get('parent_id'):
fields['parentId'] = {'value': ls['parent_id']}
records[record_name] = {'recordType': 'LeagueStructure', 'recordName': record_name, 'fields': fields}
return records
def build_team_alias_records(team_aliases):
records = {}
for ta in team_aliases:
record_name = ta['id']
fields = {
'aliasId': {'value': ta['id']},
'teamCanonicalId': {'value': ta['team_canonical_id']},
'aliasType': {'value': ta['alias_type']},
'aliasValue': {'value': ta['alias_value']},
'schemaVersion': {'value': 1},
}
records[record_name] = {'recordType': 'TeamAlias', 'recordName': record_name, 'fields': fields}
return records
def build_stadium_alias_records(stadium_aliases):
records = {}
for sa in stadium_aliases:
stadium_id = sa['stadium_canonical_id']
sport = stadium_id.split('_')[1] if '_' in stadium_id else 'unknown'
record_name = f"{sport}_{sa['alias_name'].lower()}"
fields = {
'aliasName': {'value': sa['alias_name'].lower()},
'stadiumCanonicalId': {'value': sa['stadium_canonical_id']},
'schemaVersion': {'value': 1},
}
records[record_name] = {'recordType': 'StadiumAlias', 'recordName': record_name, 'fields': fields}
return records
# Compute and display diffs for each record type
record_types = [
('Stadium', stadiums, build_stadium_records),
('Team', teams, build_team_records),
('Game', games, lambda g: build_game_records(g, stadiums)),
('LeagueStructure', league_structure, build_league_structure_records),
('TeamAlias', team_aliases, build_team_alias_records),
('StadiumAlias', stadium_aliases, build_stadium_alias_records),
]
all_diffs = {}
for record_type, data, builder in record_types:
if not data:
print(f"{record_type}: No local data")
continue
print(f"Checking {record_type}...")
local_records = builder(data)
# Query cloud records
cloud_records = ck.query_all(record_type, verbose=verbose)
# Compute diff
diff = compute_diff(local_records, cloud_records, verbose=verbose)
all_diffs[record_type] = diff
# Report
print(f" {record_type}: {len(diff['unchanged'])} unchanged, {len(diff['new'])} new, {len(diff['updated'])} updated, {len(diff['deleted'])} deleted")
print("\n" + "="*50)
print("Diff Summary")
print("="*50)
for record_type, diff in all_diffs.items():
total_changes = len(diff['new']) + len(diff['updated']) + len(diff['deleted'])
status = "in sync" if total_changes == 0 else f"{total_changes} changes needed"
print(f" {record_type}: {status}")
return all_diffs
def sync_diff(ck, diff, record_type, dry_run=False, verbose=False, delete_orphans=False):
"""
Sync only changed records based on diff.
Returns counts: created, updated, deleted, skipped, errors.
"""
stats = {'created': 0, 'updated': 0, 'deleted': 0, 'skipped': 0, 'errors': 0}
# Handle new records (forceReplace)
new_records = [item['record'] for item in diff['new']]
if new_records:
if dry_run:
print(f" [DRY RUN] Would create {len(new_records)} new {record_type}")
stats['created'] = len(new_records)
else:
for i in range(0, len(new_records), BATCH_SIZE):
batch = new_records[i:i+BATCH_SIZE]
ops = [{'operationType': 'forceReplace', 'record': r} for r in batch]
result = ck.modify(ops)
if 'error' in result:
print(f" Create error: {result['error']}")
stats['errors'] += len(batch)
else:
result_records = result.get('records', [])
successful = [r for r in result_records if 'serverErrorCode' not in r]
failed = [r for r in result_records if 'serverErrorCode' in r]
stats['created'] += len(successful)
stats['errors'] += len(failed)
if failed and verbose:
print(f" Create failed: {failed[0].get('serverErrorCode')}: {failed[0].get('reason')}")
time.sleep(0.3)
# Handle updated records (update with recordChangeTag)
updated_items = diff['updated']
if updated_items:
if dry_run:
print(f" [DRY RUN] Would update {len(updated_items)} {record_type}")
if verbose:
for item in updated_items[:5]:
print(f" - {item['record'].get('recordName')}: {', '.join(item.get('changed_fields', []))}")
if len(updated_items) > 5:
print(f" ... and {len(updated_items) - 5} more")
stats['updated'] = len(updated_items)
else:
for i in range(0, len(updated_items), BATCH_SIZE):
batch = updated_items[i:i+BATCH_SIZE]
ops = []
for item in batch:
record = item['record'].copy()
record['recordChangeTag'] = item['recordChangeTag']
ops.append({'operationType': 'update', 'record': record})
result = ck.modify(ops)
if 'error' in result:
print(f" Update error: {result['error']}")
stats['errors'] += len(batch)
else:
result_records = result.get('records', [])
successful = [r for r in result_records if 'serverErrorCode' not in r]
failed = [r for r in result_records if 'serverErrorCode' in r]
stats['updated'] += len(successful)
# Handle conflicts (409)
conflicts = [r for r in failed if r.get('serverErrorCode') == 'CONFLICT']
other_errors = [r for r in failed if r.get('serverErrorCode') != 'CONFLICT']
if conflicts:
print(f" {len(conflicts)} conflicts detected (records modified since query)")
# Re-try with forceReplace for conflicts (data loss is acceptable as we have source of truth)
conflict_records = [item['record'] for item in batch if any(
c.get('recordName') == item['record'].get('recordName') for c in conflicts
)]
if conflict_records:
retry_ops = [{'operationType': 'forceReplace', 'record': r} for r in conflict_records]
retry_result = ck.modify(retry_ops)
if 'error' not in retry_result:
retry_records = retry_result.get('records', [])
retry_success = [r for r in retry_records if 'serverErrorCode' not in r]
stats['updated'] += len(retry_success)
stats['errors'] -= len(retry_success)
if other_errors:
stats['errors'] += len(other_errors)
if verbose:
print(f" Update failed: {other_errors[0].get('serverErrorCode')}: {other_errors[0].get('reason')}")
time.sleep(0.3)
# Handle deleted records (only if delete_orphans is True)
deleted_items = diff['deleted']
if deleted_items:
if delete_orphans:
if dry_run:
print(f" [DRY RUN] Would delete {len(deleted_items)} orphan {record_type}")
stats['deleted'] = len(deleted_items)
else:
for i in range(0, len(deleted_items), BATCH_SIZE):
batch = deleted_items[i:i+BATCH_SIZE]
ops = [{
'operationType': 'delete',
'record': {
'recordName': item['record'].get('recordName'),
'recordType': record_type,
'recordChangeTag': item['recordChangeTag']
}
} for item in batch]
result = ck.modify(ops)
if 'error' in result:
print(f" Delete error: {result['error']}")
stats['errors'] += len(batch)
else:
result_records = result.get('records', [])
successful = [r for r in result_records if 'serverErrorCode' not in r]
failed = [r for r in result_records if 'serverErrorCode' in r]
stats['deleted'] += len(successful)
stats['errors'] += len(failed)
time.sleep(0.3)
else:
print(f" Warning: {len(deleted_items)} orphan {record_type} in CloudKit (use --delete-orphans to remove)")
# Count unchanged as skipped
stats['skipped'] = len(diff['unchanged'])
return stats
def run_smart_sync(ck, data_dir, dry_run=False, verbose=False, delete_orphans=False):
"""Run differential sync for all record types."""
from pathlib import Path
data_dir = Path(data_dir)
print("\n" + "="*50)
print(f"CloudKit Smart Sync {'(DRY RUN)' if dry_run else ''}")
print("="*50 + "\n")
# Load local data
stadiums = json.load(open(data_dir / 'stadiums_canonical.json')) if (data_dir / 'stadiums_canonical.json').exists() else []
teams = json.load(open(data_dir / 'teams_canonical.json')) if (data_dir / 'teams_canonical.json').exists() else []
# Load games from canonical/games/*.json
canonical_games_dir = data_dir / 'canonical' / 'games'
games = []
if canonical_games_dir.exists():
for games_file in sorted(canonical_games_dir.glob('*.json')):
with open(games_file) as f:
games.extend(json.load(f))
league_structure = json.load(open(data_dir / 'league_structure.json')) if (data_dir / 'league_structure.json').exists() else []
team_aliases = json.load(open(data_dir / 'team_aliases.json')) if (data_dir / 'team_aliases.json').exists() else []
stadium_aliases = json.load(open(data_dir / 'stadium_aliases.json')) if (data_dir / 'stadium_aliases.json').exists() else []
print(f"Local data: {len(stadiums)} stadiums, {len(teams)} teams, {len(games)} games")
print(f" {len(league_structure)} league structures, {len(team_aliases)} team aliases, {len(stadium_aliases)} stadium aliases\n")
# Build local record maps (reuse from show_diff_report)
def build_stadium_records(stadiums):
records = {}
for s in stadiums:
stadium_id = s.get('canonical_id', s.get('id', ''))
record_name = deterministic_uuid(stadium_id)
team_abbrevs = s.get('primary_team_abbrevs', s.get('team_abbrevs', []))
fields = {
'stadiumId': {'value': record_name},
'canonicalId': {'value': stadium_id},
'name': {'value': s['name']},
'city': {'value': s['city']},
'state': {'value': s.get('state', '')},
'sport': {'value': s['sport']},
'source': {'value': s.get('source', 'canonical')},
'teamAbbrevs': {'value': team_abbrevs},
}
if s.get('latitude'):
fields['location'] = {'value': {'latitude': s['latitude'], 'longitude': s['longitude']}}
if s.get('capacity'):
fields['capacity'] = {'value': s['capacity']}
records[record_name] = {'recordType': 'Stadium', 'recordName': record_name, 'fields': fields}
return records
def build_team_records(teams):
records = {}
for t in teams:
team_id = t.get('canonical_id', '')
record_name = deterministic_uuid(team_id)
fields = {
'teamId': {'value': record_name},
'canonicalId': {'value': team_id},
'abbreviation': {'value': t['abbreviation']},
'name': {'value': t['name']},
'city': {'value': t['city']},
'sport': {'value': t['sport']},
'stadiumCanonicalId': {'value': t.get('stadium_canonical_id', '')},
}
if t.get('conference_id'):
fields['conferenceId'] = {'value': t['conference_id']}
if t.get('division_id'):
fields['divisionId'] = {'value': t['division_id']}
records[record_name] = {'recordType': 'Team', 'recordName': record_name, 'fields': fields}
return records
def build_game_records(games, stadiums):
records = {}
stadium_id_map = {s.get('canonical_id', s.get('id', '')): deterministic_uuid(s.get('canonical_id', s.get('id', ''))) for s in stadiums}
seen_ids = set()
for g in games:
game_id = g.get('canonical_id', g.get('id', ''))
if game_id in seen_ids:
continue
seen_ids.add(game_id)
game_uuid = deterministic_uuid(game_id)
sport = g['sport']
fields = {
'gameId': {'value': game_uuid},
'canonicalId': {'value': game_id},
'sport': {'value': sport},
'season': {'value': g.get('season', '')},
'source': {'value': g.get('source', 'canonical')},
}
if g.get('date'):
try:
time_str = g.get('time', '7:00p')
hour, minute = 19, 0
if time_str:
clean_time = time_str.lower().replace(' ', '')
is_pm = 'p' in clean_time
time_parts = clean_time.replace('p', '').replace('a', '').split(':')
if time_parts:
hour = int(time_parts[0])
if is_pm and hour != 12:
hour += 12
elif not is_pm and hour == 12:
hour = 0
if len(time_parts) > 1:
minute = int(time_parts[1])
dt = datetime.strptime(f"{g['date']} {hour:02d}:{minute:02d}", '%Y-%m-%d %H:%M')
fields['dateTime'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
except:
pass
home_team_canonical_id = g.get('home_team_canonical_id', '')
away_team_canonical_id = g.get('away_team_canonical_id', '')
home_team_uuid = deterministic_uuid(home_team_canonical_id)
away_team_uuid = deterministic_uuid(away_team_canonical_id)
fields['homeTeamRef'] = {'value': {'recordName': home_team_uuid, 'action': 'NONE'}}
fields['awayTeamRef'] = {'value': {'recordName': away_team_uuid, 'action': 'NONE'}}
if g.get('stadium_canonical_id'):
stadium_canonical_id = g['stadium_canonical_id']
stadium_uuid = stadium_id_map.get(stadium_canonical_id)
if stadium_uuid:
fields['stadiumRef'] = {'value': {'recordName': stadium_uuid, 'action': 'NONE'}}
fields['stadiumCanonicalId'] = {'value': stadium_canonical_id}
records[game_uuid] = {'recordType': 'Game', 'recordName': game_uuid, 'fields': fields}
return records
def build_league_structure_records(league_structure):
records = {}
for ls in league_structure:
record_name = ls['id']
fields = {
'structureId': {'value': ls['id']},
'sport': {'value': ls['sport']},
'type': {'value': ls['type']},
'name': {'value': ls['name']},
'displayOrder': {'value': ls['display_order']},
'schemaVersion': {'value': 1},
}
if ls.get('abbreviation'):
fields['abbreviation'] = {'value': ls['abbreviation']}
if ls.get('parent_id'):
fields['parentId'] = {'value': ls['parent_id']}
records[record_name] = {'recordType': 'LeagueStructure', 'recordName': record_name, 'fields': fields}
return records
def build_team_alias_records(team_aliases):
records = {}
for ta in team_aliases:
record_name = ta['id']
fields = {
'aliasId': {'value': ta['id']},
'teamCanonicalId': {'value': ta['team_canonical_id']},
'aliasType': {'value': ta['alias_type']},
'aliasValue': {'value': ta['alias_value']},
'schemaVersion': {'value': 1},
}
records[record_name] = {'recordType': 'TeamAlias', 'recordName': record_name, 'fields': fields}
return records
def build_stadium_alias_records(stadium_aliases):
records = {}
for sa in stadium_aliases:
stadium_id = sa['stadium_canonical_id']
sport = stadium_id.split('_')[1] if '_' in stadium_id else 'unknown'
record_name = f"{sport}_{sa['alias_name'].lower()}"
fields = {
'aliasName': {'value': sa['alias_name'].lower()},
'stadiumCanonicalId': {'value': sa['stadium_canonical_id']},
'schemaVersion': {'value': 1},
}
records[record_name] = {'recordType': 'StadiumAlias', 'recordName': record_name, 'fields': fields}
return records
# Sync each record type
record_types = [
('Stadium', stadiums, build_stadium_records),
('Team', teams, build_team_records),
('Game', games, lambda g: build_game_records(g, stadiums)),
('LeagueStructure', league_structure, build_league_structure_records),
('TeamAlias', team_aliases, build_team_alias_records),
('StadiumAlias', stadium_aliases, build_stadium_alias_records),
]
total_stats = {'created': 0, 'updated': 0, 'deleted': 0, 'skipped': 0, 'errors': 0}
for record_type, data, builder in record_types:
if not data:
print(f"{record_type}: No local data, skipping")
continue
print(f"\n--- {record_type} ---")
local_records = builder(data)
# Query cloud records
print(f" Querying CloudKit...")
cloud_records = ck.query_all(record_type, verbose=verbose)
print(f" Found {len(cloud_records)} cloud records, {len(local_records)} local records")
# Compute diff
diff = compute_diff(local_records, cloud_records, verbose=verbose)
print(f" Diff: {len(diff['new'])} new, {len(diff['updated'])} updated, {len(diff['unchanged'])} unchanged, {len(diff['deleted'])} orphans")
# Sync
stats = sync_diff(ck, diff, record_type, dry_run=dry_run, verbose=verbose, delete_orphans=delete_orphans)
# Accumulate stats
for key in total_stats:
total_stats[key] += stats[key]
print(f" Result: {stats['created']} created, {stats['updated']} updated, {stats['deleted']} deleted, {stats['skipped']} skipped")
if stats['errors']:
print(f" Errors: {stats['errors']}")
# Summary
print("\n" + "="*50)
print("Smart Sync Summary")
print("="*50)
print(f" Created: {total_stats['created']}")
print(f" Updated: {total_stats['updated']}")
print(f" Deleted: {total_stats['deleted']}")
print(f" Skipped (unchanged): {total_stats['skipped']}")
if total_stats['errors']:
print(f" Errors: {total_stats['errors']}")
if dry_run:
print("\n[DRY RUN - no changes made]")
return total_stats
def verify_sync(ck, data_dir, verbose=False, deep=False):
"""
Verify that CloudKit data matches local canonical data.
Quick mode: compares counts and spot-checks 5 random records per type.
Deep mode: full field-by-field comparison of all records.
"""
import random
from pathlib import Path
data_dir = Path(data_dir)
print("\n" + "="*50)
print(f"CloudKit Sync Verification {'(DEEP)' if deep else '(Quick)'}")
print("="*50)
if deep:
print("\n⚠️ Deep verification may take several minutes for large datasets\n")
# Load local data
stadiums = json.load(open(data_dir / 'stadiums_canonical.json')) if (data_dir / 'stadiums_canonical.json').exists() else []
teams = json.load(open(data_dir / 'teams_canonical.json')) if (data_dir / 'teams_canonical.json').exists() else []
# Load games from canonical/games/*.json
canonical_games_dir = data_dir / 'canonical' / 'games'
games = []
if canonical_games_dir.exists():
for games_file in sorted(canonical_games_dir.glob('*.json')):
with open(games_file) as f:
games.extend(json.load(f))
league_structure = json.load(open(data_dir / 'league_structure.json')) if (data_dir / 'league_structure.json').exists() else []
team_aliases = json.load(open(data_dir / 'team_aliases.json')) if (data_dir / 'team_aliases.json').exists() else []
stadium_aliases = json.load(open(data_dir / 'stadium_aliases.json')) if (data_dir / 'stadium_aliases.json').exists() else []
# Deduplicate games by canonical_id
seen_ids = set()
unique_games = []
for g in games:
game_id = g.get('canonical_id', g.get('id', ''))
if game_id not in seen_ids:
seen_ids.add(game_id)
unique_games.append(g)
games = unique_games
local_counts = {
'Stadium': len(stadiums),
'Team': len(teams),
'Game': len(games),
'LeagueStructure': len(league_structure),
'TeamAlias': len(team_aliases),
'StadiumAlias': len(stadium_aliases),
}
print(f"Local data: {local_counts['Stadium']} stadiums, {local_counts['Team']} teams, {local_counts['Game']} games")
print(f" {local_counts['LeagueStructure']} league structures, {local_counts['TeamAlias']} team aliases, {local_counts['StadiumAlias']} stadium aliases\n")
# Build local record maps for spot-check comparison
def build_local_record_map(record_type, data):
"""Build a map of recordName -> fields for comparison."""
records = {}
if record_type == 'Stadium':
for s in data:
stadium_id = s.get('canonical_id', s.get('id', ''))
record_name = deterministic_uuid(stadium_id)
records[record_name] = {
'canonicalId': stadium_id,
'name': s['name'],
'city': s['city'],
'sport': s['sport'],
}
elif record_type == 'Team':
for t in data:
team_id = t.get('canonical_id', '')
record_name = deterministic_uuid(team_id)
records[record_name] = {
'canonicalId': team_id,
'abbreviation': t['abbreviation'],
'name': t['name'],
'city': t['city'],
'sport': t['sport'],
}
elif record_type == 'Game':
for g in data:
game_id = g.get('canonical_id', g.get('id', ''))
record_name = deterministic_uuid(game_id)
records[record_name] = {
'canonicalId': game_id,
'sport': g['sport'],
'season': g.get('season', ''),
}
elif record_type == 'LeagueStructure':
for ls in data:
record_name = ls['id']
records[record_name] = {
'structureId': ls['id'],
'sport': ls['sport'],
'type': ls['type'],
'name': ls['name'],
}
elif record_type == 'TeamAlias':
for ta in data:
record_name = ta['id']
records[record_name] = {
'aliasId': ta['id'],
'teamCanonicalId': ta['team_canonical_id'],
'aliasType': ta['alias_type'],
'aliasValue': ta['alias_value'],
}
elif record_type == 'StadiumAlias':
for sa in data:
stadium_id = sa['stadium_canonical_id']
sport = stadium_id.split('_')[1] if '_' in stadium_id else 'unknown'
record_name = f"{sport}_{sa['alias_name'].lower()}"
records[record_name] = {
'aliasName': sa['alias_name'].lower(),
'stadiumCanonicalId': sa['stadium_canonical_id'],
}
return records
data_map = {
'Stadium': stadiums,
'Team': teams,
'Game': games,
'LeagueStructure': league_structure,
'TeamAlias': team_aliases,
'StadiumAlias': stadium_aliases,
}
results = []
total_mismatches = 0
for record_type in ['Stadium', 'Team', 'Game', 'LeagueStructure', 'TeamAlias', 'StadiumAlias']:
local_count = local_counts[record_type]
if local_count == 0:
print(f"{record_type}: No local data, skipping")
continue
# Query CloudKit count
print(f"Checking {record_type}...")
cloud_records = ck.query_all(record_type, verbose=verbose)
cloud_count = len(cloud_records)
# Count comparison
if cloud_count == local_count:
status = "[OK]"
elif cloud_count < local_count:
status = f"[MISMATCH: {local_count - cloud_count} missing in CloudKit]"
total_mismatches += 1
else:
status = f"[MISMATCH: {cloud_count - local_count} extra in CloudKit]"
total_mismatches += 1
print(f" {record_type}: CloudKit={cloud_count}, Local={local_count} {status}")
# Spot-check or deep verification
local_records = build_local_record_map(record_type, data_map[record_type])
if deep:
# Full field-by-field comparison
field_mismatches = []
for record_name, local_fields in local_records.items():
cloud_rec = cloud_records.get(record_name)
if not cloud_rec:
field_mismatches.append(f" {record_name}: Missing in CloudKit")
continue
cloud_fields = cloud_rec.get('fields', {})
for field_name, expected_value in local_fields.items():
cloud_field = cloud_fields.get(field_name, {})
cloud_value = cloud_field.get('value')
if cloud_value != expected_value:
field_mismatches.append(f" {record_name}.{field_name}: expected '{expected_value}', got '{cloud_value}'")
if field_mismatches:
print(f" Field mismatches ({len(field_mismatches)}):")
for m in field_mismatches[:10]: # Show first 10
print(m)
if len(field_mismatches) > 10:
print(f" ... and {len(field_mismatches) - 10} more")
total_mismatches += len(field_mismatches)
else:
print(f" All fields verified [OK]")
elif cloud_count == local_count and cloud_count > 0:
# Spot-check 5 random records
sample_size = min(5, cloud_count)
sample_names = random.sample(list(local_records.keys()), sample_size)
spot_check_ok = True
for record_name in sample_names:
local_fields = local_records[record_name]
cloud_rec = cloud_records.get(record_name)
if not cloud_rec:
print(f" Spot-check failed: {record_name} missing in CloudKit")
spot_check_ok = False
continue
cloud_fields = cloud_rec.get('fields', {})
for field_name, expected_value in local_fields.items():
cloud_field = cloud_fields.get(field_name, {})
cloud_value = cloud_field.get('value')
if cloud_value != expected_value:
print(f" Spot-check mismatch: {record_name}.{field_name}: expected '{expected_value}', got '{cloud_value}'")
spot_check_ok = False
if spot_check_ok:
print(f" Spot-check ({sample_size} records): [OK]")
else:
total_mismatches += 1
results.append({
'type': record_type,
'local': local_count,
'cloud': cloud_count,
'match': cloud_count == local_count,
})
# Summary
print("\n" + "="*50)
print("Verification Summary")
print("="*50)
for r in results:
status = "[OK]" if r['match'] else "[MISMATCH]"
print(f" {r['type']}: Local={r['local']}, CloudKit={r['cloud']} {status}")
if total_mismatches == 0:
print("\n✓ All data verified - CloudKit matches local data")
else:
print(f"\n⚠ Found {total_mismatches} mismatch(es)")
return total_mismatches == 0
# Valid record types for individual record management
VALID_RECORD_TYPES = ['Stadium', 'Team', 'Game', 'LeagueStructure', 'TeamAlias', 'StadiumAlias']
def validate_record_type(record_type):
"""Validate record type and return normalized version."""
# Allow case-insensitive matching
for valid_type in VALID_RECORD_TYPES:
if record_type.lower() == valid_type.lower():
return valid_type
return None
def get_record(ck, record_type, record_id, verbose=False):
"""Get and display a single record by ID."""
normalized_type = validate_record_type(record_type)
if not normalized_type:
print(f"Error: Unknown record type '{record_type}'. Valid types: {', '.join(VALID_RECORD_TYPES)}")
return False
print(f"\nLooking up {normalized_type} with id '{record_id}'...")
# Try to look up by recordName directly
records = ck.lookup(normalized_type, [record_id], verbose=verbose)
if isinstance(records, dict) and 'error' in records:
print(f"Error: {records['error']}")
return False
# Filter out records with errors (NOT_FOUND)
found_records = [r for r in records if 'serverErrorCode' not in r]
# If not found, try deterministic_uuid lookup
if not found_records:
uuid_name = deterministic_uuid(record_id)
records = ck.lookup(normalized_type, [uuid_name], verbose=verbose)
if isinstance(records, dict) and 'error' in records:
print(f"Error: {records['error']}")
return False
found_records = [r for r in records if 'serverErrorCode' not in r]
# If still not found, try query by canonicalId field
if not found_records:
if verbose:
print(f" Trying query by canonicalId field...")
# Query by canonicalId (works for Stadium, Team, Game)
query_records = ck.query(normalized_type, [['canonicalId', 'EQUALS', record_id]])
if isinstance(query_records, dict) and 'error' in query_records:
pass # Ignore error, will report not found below
elif query_records:
result_records = query_records.get('records', [])
found_records = [r for r in result_records if 'serverErrorCode' not in r]
if not found_records:
print(f"Error: No {normalized_type} with id '{record_id}' found in CloudKit")
return False
record = found_records[0]
# Display record
print(f"\n{'='*50}")
print(f"Record: {record.get('recordName', 'N/A')}")
print(f"Type: {record.get('recordType', 'N/A')}")
print(f"ChangeTag: {record.get('recordChangeTag', 'N/A')}")
print(f"{'='*50}")
print("Fields:")
fields = record.get('fields', {})
for field_name, field_data in sorted(fields.items()):
value = field_data.get('value', 'N/A')
field_type = field_data.get('type', '')
if field_type:
print(f" {field_name}: {value} ({field_type})")
else:
print(f" {field_name}: {value}")
print()
return True
def list_records(ck, record_type, count_only=False, verbose=False):
"""List all recordNames for a record type."""
normalized_type = validate_record_type(record_type)
if not normalized_type:
print(f"Error: Unknown record type '{record_type}'. Valid types: {', '.join(VALID_RECORD_TYPES)}")
return False
print(f"\nQuerying {normalized_type} records...")
records = ck.query_all(normalized_type, verbose=verbose)
if isinstance(records, dict) and 'error' in records:
print(f"Error: {records['error']}")
return False
if count_only:
print(f"\n{normalized_type}: {len(records)} records")
else:
print(f"\n{normalized_type} ({len(records)} records):")
print("-" * 40)
for record_name in sorted(records.keys()):
print(record_name)
return True
def update_record(ck, record_type, record_id, field_updates, verbose=False):
"""Update a single record with field changes."""
normalized_type = validate_record_type(record_type)
if not normalized_type:
print(f"Error: Unknown record type '{record_type}'. Valid types: {', '.join(VALID_RECORD_TYPES)}")
return False
if not field_updates:
print("Error: No field updates specified. Use format: field=value")
return False
# Parse field updates
updates = {}
for update in field_updates:
if '=' not in update:
print(f"Error: Invalid update format '{update}'. Use: field=value")
return False
field_name, value = update.split('=', 1)
# Try to parse value as number if possible
try:
if '.' in value:
value = float(value)
else:
value = int(value)
except ValueError:
pass # Keep as string
updates[field_name] = value
print(f"\nLooking up {normalized_type} with id '{record_id}'...")
# Look up record to get recordChangeTag
records = ck.lookup(normalized_type, [record_id], verbose=verbose)
if isinstance(records, dict) and 'error' in records:
print(f"Error: {records['error']}")
return False
# Filter out NOT_FOUND records
found_records = [r for r in records if 'serverErrorCode' not in r]
if not found_records:
# Try with deterministic_uuid
uuid_name = deterministic_uuid(record_id)
records = ck.lookup(normalized_type, [uuid_name], verbose=verbose)
if isinstance(records, dict) and 'error' in records:
print(f"Error: {records['error']}")
return False
found_records = [r for r in records if 'serverErrorCode' not in r]
if not found_records:
print(f"Error: No {normalized_type} with id '{record_id}' found in CloudKit")
return False
record_id = uuid_name
record = found_records[0]
record_change_tag = record.get('recordChangeTag', '')
print(f"Found record: {record_id}")
print(f"Current recordChangeTag: {record_change_tag}")
print(f"\nUpdating fields: {updates}")
# Build update operation
updated_record = {
'recordType': normalized_type,
'recordName': record_id,
'recordChangeTag': record_change_tag,
'fields': {field: {'value': value} for field, value in updates.items()}
}
result = ck.modify([{'operationType': 'update', 'record': updated_record}])
if 'error' in result:
print(f"Error: {result['error']}")
return False
result_records = result.get('records', [])
if not result_records:
print("Error: No response from CloudKit")
return False
result_record = result_records[0]
if 'serverErrorCode' in result_record:
error_code = result_record.get('serverErrorCode')
reason = result_record.get('reason', 'Unknown')
if error_code == 'CONFLICT':
print(f"\nConflict detected: Record was modified since lookup.")
print("Retrying with forceReplace...")
# Retry with forceReplace
updated_record.pop('recordChangeTag', None)
retry_result = ck.modify([{'operationType': 'forceReplace', 'record': updated_record}])
if 'error' in retry_result:
print(f"Error: {retry_result['error']}")
return False
retry_records = retry_result.get('records', [])
if retry_records and 'serverErrorCode' not in retry_records[0]:
print(f"\n✓ Record updated successfully (forceReplace)")
return True
else:
print(f"Error: {retry_records[0].get('reason', 'Unknown error')}")
return False
else:
print(f"Error: {error_code}: {reason}")
return False
print(f"\n✓ Record updated successfully")
print(f"New recordChangeTag: {result_record.get('recordChangeTag', 'N/A')}")
return True
def delete_record(ck, record_type, record_id, force=False, verbose=False):
"""Delete a single record by ID."""
normalized_type = validate_record_type(record_type)
if not normalized_type:
print(f"Error: Unknown record type '{record_type}'. Valid types: {', '.join(VALID_RECORD_TYPES)}")
return False
print(f"\nLooking up {normalized_type} with id '{record_id}'...")
# Look up record to get recordChangeTag
records = ck.lookup(normalized_type, [record_id], verbose=verbose)
if isinstance(records, dict) and 'error' in records:
print(f"Error: {records['error']}")
return False
# Filter out NOT_FOUND records
found_records = [r for r in records if 'serverErrorCode' not in r]
if not found_records:
# Try with deterministic_uuid
uuid_name = deterministic_uuid(record_id)
records = ck.lookup(normalized_type, [uuid_name], verbose=verbose)
if isinstance(records, dict) and 'error' in records:
print(f"Error: {records['error']}")
return False
found_records = [r for r in records if 'serverErrorCode' not in r]
if not found_records:
print(f"Error: No {normalized_type} with id '{record_id}' found in CloudKit")
return False
record_id = uuid_name
record = found_records[0]
record_change_tag = record.get('recordChangeTag', '')
print(f"Found record: {record_id}")
# Show record details
fields = record.get('fields', {})
print("\nRecord details:")
for field_name, field_data in list(fields.items())[:5]: # Show first 5 fields
print(f" {field_name}: {field_data.get('value', 'N/A')}")
if len(fields) > 5:
print(f" ... and {len(fields) - 5} more fields")
# Confirm deletion
if not force:
try:
confirm = input("\nAre you sure you want to delete this record? (yes/no): ").strip().lower()
if confirm not in ['yes', 'y']:
print("Deletion cancelled.")
return False
except (EOFError, KeyboardInterrupt):
print("\nDeletion cancelled.")
return False
print("\nDeleting record...")
# Build delete operation
delete_op = {
'operationType': 'delete',
'record': {
'recordName': record_id,
'recordType': normalized_type,
'recordChangeTag': record_change_tag
}
}
result = ck.modify([delete_op])
if 'error' in result:
print(f"Error: {result['error']}")
return False
result_records = result.get('records', [])
if not result_records:
print("Error: No response from CloudKit")
return False
result_record = result_records[0]
if 'serverErrorCode' in result_record:
error_code = result_record.get('serverErrorCode')
reason = result_record.get('reason', 'Unknown')
print(f"Error: {error_code}: {reason}")
return False
print(f"\n✓ Record deleted successfully")
return True
def main():
p = argparse.ArgumentParser(description='Import JSON to CloudKit')
p.add_argument('--key-id', default=DEFAULT_KEY_ID)
p.add_argument('--key-file', default=DEFAULT_KEY_FILE)
p.add_argument('--container', default=CONTAINER)
p.add_argument('--env', choices=['development', 'production'], default='development')
p.add_argument('--data-dir', default='./data')
p.add_argument('--stadiums-only', action='store_true')
p.add_argument('--games-only', action='store_true')
p.add_argument('--games-files', type=str, help='Comma-separated list of game files to import (e.g., mlb_2025.json,nba_2025.json)')
p.add_argument('--league-structure-only', action='store_true', help='Import only league structure')
p.add_argument('--team-aliases-only', action='store_true', help='Import only team aliases')
p.add_argument('--stadium-aliases-only', action='store_true', help='Import only stadium aliases')
p.add_argument('--canonical-only', action='store_true', help='Import only canonical data (league structure + team aliases + stadium aliases)')
p.add_argument('--delete-all', action='store_true', help='Delete all records before importing')
p.add_argument('--delete-only', action='store_true', help='Only delete records, do not import')
p.add_argument('--diff', action='store_true', help='Show diff between local and CloudKit without importing')
p.add_argument('--smart-sync', action='store_true', help='Differential sync: only upload new/changed records')
p.add_argument('--delete-orphans', action='store_true', help='With --smart-sync, also delete records not in local data')
p.add_argument('--verify', action='store_true', help='Verify CloudKit matches local data (quick: counts + spot-check)')
p.add_argument('--verify-deep', action='store_true', help='Verify CloudKit matches local data (deep: full field comparison)')
# Individual record management
p.add_argument('--get', nargs=2, metavar=('TYPE', 'ID'), help='Get a single record (e.g., --get Stadium stadium_nba_td_garden)')
p.add_argument('--list', metavar='TYPE', help='List all recordNames for a type (e.g., --list Stadium)')
p.add_argument('--count', action='store_true', help='With --list, show only the count')
p.add_argument('--update-record', nargs='+', metavar='ARG', help='Update a record: TYPE ID FIELD=VALUE [FIELD=VALUE ...] (e.g., --update-record Stadium id123 capacity=19156)')
p.add_argument('--delete-record', nargs=2, metavar=('TYPE', 'ID'), help='Delete a single record (e.g., --delete-record Game game_mlb_2025_xxx)')
p.add_argument('--force', action='store_true', help='Skip confirmation for --delete-record')
p.add_argument('--dry-run', action='store_true')
p.add_argument('--verbose', '-v', action='store_true')
p.add_argument('--interactive', '-i', action='store_true', help='Show interactive menu')
args = p.parse_args()
# Show interactive menu if no action flags provided or --interactive
has_action_flag = any([
args.stadiums_only, args.games_only, args.games_files, args.league_structure_only,
args.team_aliases_only, args.stadium_aliases_only, args.canonical_only,
args.delete_all, args.delete_only, args.dry_run, args.diff, args.smart_sync,
args.verify, args.verify_deep, args.get, args.list, args.update_record, args.delete_record
])
# Track selected game files (for option 4 or --games-files)
selected_game_files = None
if args.games_files:
# Parse comma-separated list from command line
selected_game_files = [f.strip() for f in args.games_files.split(',')]
args.games_only = True # Imply --games-only
if args.interactive or not has_action_flag:
choice = show_menu()
if choice is None:
return
# Map menu choice to flags
if choice == 1: # Import all
pass # Default behavior
elif choice == 2: # Stadiums only
args.stadiums_only = True
elif choice == 3: # Games only (all files)
args.games_only = True
elif choice == 4: # Games - select specific files
args.games_only = True
selected_game_files = show_game_files_menu(Path(args.data_dir))
if not selected_game_files:
print("No files selected. Exiting.")
return
elif choice == 5: # League structure only
args.league_structure_only = True
elif choice == 6: # Team aliases only
args.team_aliases_only = True
elif choice == 7: # Stadium aliases only
args.stadium_aliases_only = True
elif choice == 8: # Canonical only
args.canonical_only = True
elif choice == 9: # Delete all then import
args.delete_all = True
elif choice == 10: # Delete only
args.delete_only = True
elif choice == 11: # Dry run
args.dry_run = True
elif choice == 12: # Smart sync
args.smart_sync = True
elif choice == 13: # Smart sync + delete orphans
args.smart_sync = True
args.delete_orphans = True
elif choice == 14: # Verify sync (quick)
args.verify = True
elif choice == 15: # Verify sync (deep)
args.verify_deep = True
print(f"\n{'='*50}")
print(f"CloudKit Import {'(DRY RUN)' if args.dry_run else ''}")
print(f"{'='*50}")
print(f"Container: {args.container}")
print(f"Environment: {args.env}\n")
data_dir = Path(args.data_dir)
# Load canonical format files (from canonicalization pipeline)
# Fall back to legacy format for backward compatibility
if (data_dir / 'stadiums_canonical.json').exists():
stadiums = json.load(open(data_dir / 'stadiums_canonical.json'))
use_canonical = True
else:
stadiums = json.load(open(data_dir / 'stadiums.json'))
use_canonical = False
if (data_dir / 'teams_canonical.json').exists():
teams = json.load(open(data_dir / 'teams_canonical.json'))
else:
teams = [] # Legacy: extracted from stadiums
# Load games: try new structure first (canonical/games/*.json), then fallback
canonical_games_dir = data_dir / 'canonical' / 'games'
games = []
games_source = None
if selected_game_files:
# Load only the selected files
for filename in selected_game_files:
filepath = canonical_games_dir / filename
if filepath.exists():
with open(filepath) as f:
file_games = json.load(f)
games.extend(file_games)
print(f" Loading {filename}: {len(file_games):,} games")
games_source = f"selected files: {', '.join(selected_game_files)}"
elif canonical_games_dir.exists() and any(canonical_games_dir.glob('*.json')):
# New structure: load all sport/season files
for games_file in sorted(canonical_games_dir.glob('*.json')):
with open(games_file) as f:
file_games = json.load(f)
games.extend(file_games)
games_source = "canonical/games/*.json"
elif (data_dir / 'games_canonical.json').exists():
games = json.load(open(data_dir / 'games_canonical.json'))
games_source = "games_canonical.json"
elif (data_dir / 'games.json').exists():
games = json.load(open(data_dir / 'games.json'))
games_source = "games.json (legacy)"
league_structure = json.load(open(data_dir / 'league_structure.json')) if (data_dir / 'league_structure.json').exists() else []
team_aliases = json.load(open(data_dir / 'team_aliases.json')) if (data_dir / 'team_aliases.json').exists() else []
stadium_aliases = json.load(open(data_dir / 'stadium_aliases.json')) if (data_dir / 'stadium_aliases.json').exists() else []
print(f"Using {'canonical' if use_canonical else 'legacy'} format")
print(f"Loaded {len(stadiums)} stadiums, {len(teams)} teams, {len(games)} games")
if games_source:
print(f" Games loaded from: {games_source}")
print(f"Loaded {len(league_structure)} league structures, {len(team_aliases)} team aliases, {len(stadium_aliases)} stadium aliases\n")
ck = None
if not args.dry_run:
if not HAS_CRYPTO:
sys.exit("Error: pip install cryptography")
if not os.path.exists(args.key_file):
sys.exit(f"Error: Key file not found: {args.key_file}")
ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env)
# Handle diff mode (show diff without importing)
if args.diff:
if not ck:
# Need CloudKit connection for diff
if not HAS_CRYPTO:
sys.exit("Error: pip install cryptography")
if not os.path.exists(args.key_file):
sys.exit(f"Error: Key file not found: {args.key_file}")
ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env)
show_diff_report(ck, args.data_dir, verbose=args.verbose)
return
# Handle verify mode
if args.verify or args.verify_deep:
if not ck:
# Need CloudKit connection for verification
if not HAS_CRYPTO:
sys.exit("Error: pip install cryptography")
if not os.path.exists(args.key_file):
sys.exit(f"Error: Key file not found: {args.key_file}")
ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env)
verify_sync(ck, args.data_dir, verbose=args.verbose, deep=args.verify_deep)
return
# Handle individual record operations
if args.get:
record_type, record_id = args.get
if not ck:
if not HAS_CRYPTO:
sys.exit("Error: pip install cryptography")
if not os.path.exists(args.key_file):
sys.exit(f"Error: Key file not found: {args.key_file}")
ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env)
get_record(ck, record_type, record_id, verbose=args.verbose)
return
if args.list:
if not ck:
if not HAS_CRYPTO:
sys.exit("Error: pip install cryptography")
if not os.path.exists(args.key_file):
sys.exit(f"Error: Key file not found: {args.key_file}")
ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env)
list_records(ck, args.list, count_only=args.count, verbose=args.verbose)
return
if args.update_record:
if len(args.update_record) < 3:
sys.exit("Error: --update-record requires TYPE ID FIELD=VALUE [FIELD=VALUE ...]")
record_type = args.update_record[0]
record_id = args.update_record[1]
field_updates = {}
for update in args.update_record[2:]:
if '=' not in update:
sys.exit(f"Error: Invalid field update '{update}'. Format: FIELD=VALUE")
field, value = update.split('=', 1)
# Try to parse as number
try:
value = int(value)
except ValueError:
try:
value = float(value)
except ValueError:
pass # Keep as string
field_updates[field] = value
if not ck:
if not HAS_CRYPTO:
sys.exit("Error: pip install cryptography")
if not os.path.exists(args.key_file):
sys.exit(f"Error: Key file not found: {args.key_file}")
ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env)
update_record(ck, record_type, record_id, field_updates, verbose=args.verbose)
return
if args.delete_record:
record_type, record_id = args.delete_record
if not ck:
if not HAS_CRYPTO:
sys.exit("Error: pip install cryptography")
if not os.path.exists(args.key_file):
sys.exit(f"Error: Key file not found: {args.key_file}")
ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env)
delete_record(ck, record_type, record_id, force=args.force, verbose=args.verbose)
return
# Handle smart sync mode (differential upload)
if args.smart_sync:
if not ck:
# Need CloudKit connection for smart sync
if not HAS_CRYPTO:
sys.exit("Error: pip install cryptography")
if not os.path.exists(args.key_file):
sys.exit(f"Error: Key file not found: {args.key_file}")
ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env)
run_smart_sync(ck, args.data_dir, dry_run=args.dry_run, verbose=args.verbose, delete_orphans=args.delete_orphans)
return
# Handle deletion
if args.delete_all or args.delete_only:
if not ck:
sys.exit("Error: --key-id and --key-file required for deletion")
print("--- Deleting Existing Records ---")
# Delete in order: dependent records first, then base records
for record_type in ['Game', 'TeamAlias', 'StadiumAlias', 'Team', 'LeagueStructure', 'Stadium']:
print(f" Deleting {record_type} records...")
deleted = ck.delete_all(record_type, verbose=args.verbose)
print(f" Deleted {deleted} {record_type} records")
if args.delete_only:
print(f"\n{'='*50}")
print("DELETE COMPLETE")
print()
return
stats = {'stadiums': 0, 'teams': 0, 'games': 0, 'league_structures': 0, 'team_aliases': 0, 'stadium_aliases': 0}
team_map = {}
# Determine what to import based on flags
import_stadiums = not args.games_only and not args.league_structure_only and not args.team_aliases_only and not args.stadium_aliases_only and not args.canonical_only
import_teams = not args.games_only and not args.league_structure_only and not args.team_aliases_only and not args.stadium_aliases_only and not args.canonical_only
import_games = not args.stadiums_only and not args.league_structure_only and not args.team_aliases_only and not args.stadium_aliases_only and not args.canonical_only
import_league_structure = args.league_structure_only or args.canonical_only or (not args.stadiums_only and not args.games_only and not args.team_aliases_only and not args.stadium_aliases_only)
import_team_aliases = args.team_aliases_only or args.canonical_only or (not args.stadiums_only and not args.games_only and not args.league_structure_only and not args.stadium_aliases_only)
import_stadium_aliases = args.stadium_aliases_only or args.canonical_only or (not args.stadiums_only and not args.games_only and not args.league_structure_only and not args.team_aliases_only)
# Build stadium ID lookup
# Canonical format uses canonical_id, legacy uses id
def get_stadium_id(s):
return s.get('canonical_id', s.get('id', ''))
def get_team_id(t):
return t.get('canonical_id', '')
stadium_id_map = {get_stadium_id(s): deterministic_uuid(get_stadium_id(s)) for s in stadiums}
# Import stadiums
if import_stadiums:
print("--- Stadiums ---")
recs = []
for s in stadiums:
stadium_id = get_stadium_id(s)
record_name = deterministic_uuid(stadium_id)
# Canonical format uses primary_team_abbrevs, legacy uses team_abbrevs
team_abbrevs = s.get('primary_team_abbrevs', s.get('team_abbrevs', []))
fields = {
'stadiumId': {'value': record_name},
'canonicalId': {'value': stadium_id}, # Store canonical_id as string
'name': {'value': s['name']},
'city': {'value': s['city']},
'state': {'value': s.get('state', '')},
'sport': {'value': s['sport']},
'source': {'value': s.get('source', 'canonical')},
'teamAbbrevs': {'value': team_abbrevs},
}
if s.get('latitude'):
fields['location'] = {'value': {'latitude': s['latitude'], 'longitude': s['longitude']}}
if s.get('capacity'):
fields['capacity'] = {'value': s['capacity']}
recs.append({'recordType': 'Stadium', 'recordName': record_name, 'fields': fields})
stats['stadiums'] = import_data(ck, recs, 'stadiums', args.dry_run, args.verbose)
# Import teams (canonical format has dedicated teams file)
if import_teams:
print("--- Teams ---")
if teams:
# Canonical format: use teams_canonical.json
recs = []
for t in teams:
team_id = get_team_id(t)
record_name = deterministic_uuid(team_id)
team_map[(t['sport'], t['abbreviation'])] = record_name
fields = {
'teamId': {'value': record_name},
'canonicalId': {'value': team_id}, # Store canonical_id as string
'abbreviation': {'value': t['abbreviation']},
'name': {'value': t['name']},
'city': {'value': t['city']},
'sport': {'value': t['sport']},
'stadiumCanonicalId': {'value': t.get('stadium_canonical_id', '')},
}
if t.get('conference_id'):
fields['conferenceId'] = {'value': t['conference_id']}
if t.get('division_id'):
fields['divisionId'] = {'value': t['division_id']}
recs.append({'recordType': 'Team', 'recordName': record_name, 'fields': fields})
stats['teams'] = import_data(ck, recs, 'teams', args.dry_run, args.verbose)
else:
# Legacy format: extract teams from stadiums
teams_dict = {}
for s in stadiums:
team_abbrevs = s.get('primary_team_abbrevs', s.get('team_abbrevs', []))
for abbr in team_abbrevs:
team_key = f"{s['sport']}_{abbr}"
if team_key not in teams_dict:
teams_dict[team_key] = {'abbr': abbr, 'city': s['city'], 'sport': s['sport']}
team_uuid = deterministic_uuid(team_key)
team_map[(s['sport'], abbr)] = team_uuid
recs = [{
'recordType': 'Team', 'recordName': deterministic_uuid(team_key),
'fields': {
'teamId': {'value': deterministic_uuid(team_key)},
'canonicalId': {'value': team_key},
'abbreviation': {'value': info['abbr']},
'name': {'value': info['abbr']},
'city': {'value': info['city']},
'sport': {'value': info['sport']},
}
} for team_key, info in teams_dict.items()]
stats['teams'] = import_data(ck, recs, 'teams', args.dry_run, args.verbose)
# Import games
if import_games and games:
# Detect canonical game format (has canonical_id field)
use_canonical_games = games and 'canonical_id' in games[0]
# Rebuild team_map if only importing games (--games-only flag)
if not team_map:
if teams:
# Canonical format: use teams_canonical.json
for t in teams:
team_id = get_team_id(t)
team_map[(t['sport'], t['abbreviation'])] = deterministic_uuid(team_id)
else:
# Legacy format: extract from stadiums
for s in stadiums:
team_abbrevs = s.get('primary_team_abbrevs', s.get('team_abbrevs', []))
for abbr in team_abbrevs:
team_key = f"{s['sport']}_{abbr}"
team_map[(s['sport'], abbr)] = deterministic_uuid(team_key)
# Build team -> stadium map for stadiumRef (legacy format needs this)
team_stadium_map = {}
for s in stadiums:
stadium_id = get_stadium_id(s)
stadium_uuid = stadium_id_map[stadium_id]
team_abbrevs = s.get('primary_team_abbrevs', s.get('team_abbrevs', []))
for abbr in team_abbrevs:
team_stadium_map[(s['sport'], abbr)] = stadium_uuid
print("--- Games ---")
print(f" Using {'canonical' if use_canonical_games else 'legacy'} game format")
# Deduplicate games by ID (canonical_id or id)
seen_ids = set()
unique_games = []
for g in games:
game_id = g.get('canonical_id', g.get('id', ''))
if game_id not in seen_ids:
seen_ids.add(game_id)
unique_games.append(g)
if len(unique_games) < len(games):
print(f" Removed {len(games) - len(unique_games)} duplicate games")
recs = []
for g in unique_games:
# Get game ID (canonical or legacy)
game_id = g.get('canonical_id', g.get('id', ''))
game_uuid = deterministic_uuid(game_id)
sport = g['sport']
fields = {
'gameId': {'value': game_uuid},
'canonicalId': {'value': game_id}, # Store canonical_id as string
'sport': {'value': sport},
'season': {'value': g.get('season', '')},
'source': {'value': g.get('source', 'canonical' if use_canonical_games else '')},
}
# Parse date/time
if g.get('date'):
try:
# Parse time like "7:30p" or "10:00a"
time_str = g.get('time', '7:00p')
hour, minute = 19, 0
if time_str:
clean_time = time_str.lower().replace(' ', '')
is_pm = 'p' in clean_time
time_parts = clean_time.replace('p', '').replace('a', '').split(':')
if time_parts:
hour = int(time_parts[0])
if is_pm and hour != 12:
hour += 12
elif not is_pm and hour == 12:
hour = 0
if len(time_parts) > 1:
minute = int(time_parts[1])
dt = datetime.strptime(f"{g['date']} {hour:02d}:{minute:02d}", '%Y-%m-%d %H:%M')
# CloudKit expects TIMESTAMP type with milliseconds since epoch
fields['dateTime'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
except Exception as e:
if args.verbose:
print(f" Warning: Failed to parse date/time for {game_id}: {e}")
# Team references
if use_canonical_games:
# Canonical format: extract team abbrev from canonical ID (team_nba_atl -> atl)
home_team_canonical_id = g.get('home_team_canonical_id', '')
away_team_canonical_id = g.get('away_team_canonical_id', '')
home_team_uuid = deterministic_uuid(home_team_canonical_id)
away_team_uuid = deterministic_uuid(away_team_canonical_id)
else:
# Legacy format: use abbreviations
home_team_key = f"{sport}_{g.get('home_team_abbrev', '')}"
away_team_key = f"{sport}_{g.get('away_team_abbrev', '')}"
home_team_uuid = deterministic_uuid(home_team_key)
away_team_uuid = deterministic_uuid(away_team_key)
fields['homeTeamRef'] = {'value': {'recordName': home_team_uuid, 'action': 'NONE'}}
fields['awayTeamRef'] = {'value': {'recordName': away_team_uuid, 'action': 'NONE'}}
# Stadium reference
if use_canonical_games and g.get('stadium_canonical_id'):
# Canonical format: use stadium_canonical_id directly
stadium_canonical_id = g['stadium_canonical_id']
stadium_uuid = stadium_id_map.get(stadium_canonical_id)
if stadium_uuid:
fields['stadiumRef'] = {'value': {'recordName': stadium_uuid, 'action': 'NONE'}}
fields['stadiumCanonicalId'] = {'value': stadium_canonical_id}
else:
# Legacy format: look up by home team abbrev
stadium_uuid = team_stadium_map.get((sport, g.get('home_team_abbrev', '')))
if stadium_uuid:
fields['stadiumRef'] = {'value': {'recordName': stadium_uuid, 'action': 'NONE'}}
recs.append({'recordType': 'Game', 'recordName': game_uuid, 'fields': fields})
stats['games'] = import_data(ck, recs, 'games', args.dry_run, args.verbose)
# Import league structure
if import_league_structure and league_structure:
print("--- League Structure ---")
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
recs = [{
'recordType': 'LeagueStructure',
'recordName': ls['id'], # Use the id as recordName
'fields': {
'structureId': {'value': ls['id']},
'sport': {'value': ls['sport']},
'type': {'value': ls['type']},
'name': {'value': ls['name']},
'displayOrder': {'value': ls['display_order']},
'schemaVersion': {'value': 1},
'lastModified': {'value': now_ms, 'type': 'TIMESTAMP'},
**({'abbreviation': {'value': ls['abbreviation']}} if ls.get('abbreviation') else {}),
**({'parentId': {'value': ls['parent_id']}} if ls.get('parent_id') else {}),
}
} for ls in league_structure]
stats['league_structures'] = import_data(ck, recs, 'league structures', args.dry_run, args.verbose)
# Import team aliases
if import_team_aliases and team_aliases:
print("--- Team Aliases ---")
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
recs = []
for ta in team_aliases:
fields = {
'aliasId': {'value': ta['id']},
'teamCanonicalId': {'value': ta['team_canonical_id']},
'aliasType': {'value': ta['alias_type']},
'aliasValue': {'value': ta['alias_value']},
'schemaVersion': {'value': 1},
'lastModified': {'value': now_ms, 'type': 'TIMESTAMP'},
}
# Add optional date fields
if ta.get('valid_from'):
try:
dt = datetime.strptime(ta['valid_from'], '%Y-%m-%d')
fields['validFrom'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
except:
pass
if ta.get('valid_until'):
try:
dt = datetime.strptime(ta['valid_until'], '%Y-%m-%d')
fields['validUntil'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
except:
pass
recs.append({
'recordType': 'TeamAlias',
'recordName': ta['id'], # Use the id as recordName
'fields': fields
})
stats['team_aliases'] = import_data(ck, recs, 'team aliases', args.dry_run, args.verbose)
# Import stadium aliases
if import_stadium_aliases and stadium_aliases:
print("--- Stadium Aliases ---")
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
recs = []
for sa in stadium_aliases:
fields = {
'aliasName': {'value': sa['alias_name'].lower()}, # Normalize to lowercase
'stadiumCanonicalId': {'value': sa['stadium_canonical_id']},
'schemaVersion': {'value': 1},
'lastModified': {'value': now_ms, 'type': 'TIMESTAMP'},
}
# Add optional date fields
if sa.get('valid_from'):
try:
dt = datetime.strptime(sa['valid_from'], '%Y-%m-%d')
fields['validFrom'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
except:
pass
if sa.get('valid_until'):
try:
dt = datetime.strptime(sa['valid_until'], '%Y-%m-%d')
fields['validUntil'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
except:
pass
# Extract sport from stadium_canonical_id (e.g., "stadium_nba_td_garden" -> "nba")
# This makes record names unique for shared venues (TD Garden has NBA and NHL entries)
stadium_id = sa['stadium_canonical_id']
sport = stadium_id.split('_')[1] if '_' in stadium_id else 'unknown'
record_name = f"{sport}_{sa['alias_name'].lower()}"
recs.append({
'recordType': 'StadiumAlias',
'recordName': record_name,
'fields': fields
})
stats['stadium_aliases'] = import_data(ck, recs, 'stadium aliases', args.dry_run, args.verbose)
print(f"\n{'='*50}")
print(f"COMPLETE: {stats['stadiums']} stadiums, {stats['teams']} teams, {stats['games']} games, {stats['league_structures']} league structures, {stats['team_aliases']} team aliases, {stats['stadium_aliases']} stadium aliases")
if args.dry_run:
print("[DRY RUN - nothing imported]")
print()
if __name__ == '__main__':
main()