- Add local canonicalization pipeline (stadiums, teams, games) that generates deterministic canonical IDs before CloudKit upload - Fix CanonicalSyncService to use deterministic UUIDs from canonical IDs instead of random UUIDs from CloudKit records - Add SyncStadium/SyncTeam/SyncGame types to CloudKitService that preserve canonical ID relationships during sync - Add canonical ID field keys to CKModels for reading from CloudKit records - Bundle canonical JSON files (stadiums_canonical, teams_canonical, games_canonical, stadium_aliases) for consistent bootstrap data - Update BootstrapService to prefer canonical format files over legacy format This ensures all entities use consistent deterministic UUIDs derived from their canonical IDs, preventing duplicate records when syncing CloudKit data with bootstrapped local data. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
697 lines
31 KiB
Python
Executable File
697 lines
31 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
CloudKit Import Script
|
|
======================
|
|
Imports canonical JSON data into CloudKit. Run after canonicalization pipeline.
|
|
|
|
Expected input files (from canonicalization pipeline):
|
|
- stadiums_canonical.json
|
|
- teams_canonical.json
|
|
- games_canonical.json
|
|
- stadium_aliases.json
|
|
- league_structure.json
|
|
- team_aliases.json
|
|
|
|
Setup:
|
|
1. CloudKit Dashboard > Tokens & Keys > Server-to-Server Keys
|
|
2. Create key with Read/Write access to public database
|
|
3. Download .p8 file and note Key ID
|
|
|
|
Usage:
|
|
python cloudkit_import.py --dry-run # Preview first
|
|
python cloudkit_import.py --key-id XX --key-file key.p8 # Import all
|
|
python cloudkit_import.py --stadiums-only ... # Stadiums first
|
|
python cloudkit_import.py --games-only ... # Games after
|
|
python cloudkit_import.py --stadium-aliases-only ... # Stadium aliases only
|
|
python cloudkit_import.py --delete-all ... # Delete then import
|
|
python cloudkit_import.py --delete-only ... # Delete only (no import)
|
|
"""
|
|
|
|
import argparse, json, time, os, sys, hashlib, base64, requests
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
try:
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ec
|
|
from cryptography.hazmat.backends import default_backend
|
|
HAS_CRYPTO = True
|
|
except ImportError:
|
|
HAS_CRYPTO = False
|
|
|
|
CONTAINER = "iCloud.com.sportstime.app"
|
|
HOST = "https://api.apple-cloudkit.com"
|
|
BATCH_SIZE = 200
|
|
|
|
# Hardcoded credentials
|
|
DEFAULT_KEY_ID = "152be0715e0276e31aaea5cbfe79dc872f298861a55c70fae14e5fe3e026cff9"
|
|
DEFAULT_KEY_FILE = "eckey.pem"
|
|
|
|
|
|
def show_menu():
|
|
"""Show interactive menu and return selected action."""
|
|
print("\n" + "="*50)
|
|
print("CloudKit Import - Select Action")
|
|
print("="*50)
|
|
print("\n 1. Import all (stadiums, teams, games, league structure, team aliases, stadium aliases)")
|
|
print(" 2. Stadiums only")
|
|
print(" 3. Games only")
|
|
print(" 4. League structure only")
|
|
print(" 5. Team aliases only")
|
|
print(" 6. Stadium aliases only")
|
|
print(" 7. Canonical only (league structure + team aliases + stadium aliases)")
|
|
print(" 8. Delete all then import")
|
|
print(" 9. Delete only (no import)")
|
|
print(" 10. Dry run (preview only)")
|
|
print(" 0. Exit")
|
|
print()
|
|
|
|
while True:
|
|
try:
|
|
choice = input("Enter choice [1-10, 0 to exit]: ").strip()
|
|
if choice == '0':
|
|
return None
|
|
if choice in ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']:
|
|
return int(choice)
|
|
print("Invalid choice. Please enter 1-10 or 0.")
|
|
except (EOFError, KeyboardInterrupt):
|
|
print("\nExiting.")
|
|
return None
|
|
|
|
|
|
def deterministic_uuid(string: str) -> str:
|
|
"""
|
|
Generate a deterministic UUID from a string using SHA256.
|
|
Matches the StubDataProvider.deterministicUUID() implementation in Swift.
|
|
"""
|
|
# SHA256 hash of the string
|
|
hash_bytes = hashlib.sha256(string.encode('utf-8')).digest()
|
|
# Use first 16 bytes
|
|
uuid_bytes = bytearray(hash_bytes[:16])
|
|
# Set UUID version (4) and variant bits to match Swift implementation
|
|
uuid_bytes[6] = (uuid_bytes[6] & 0x0F) | 0x40
|
|
uuid_bytes[8] = (uuid_bytes[8] & 0x3F) | 0x80
|
|
# Format as UUID string
|
|
return f"{uuid_bytes[0:4].hex()}-{uuid_bytes[4:6].hex()}-{uuid_bytes[6:8].hex()}-{uuid_bytes[8:10].hex()}-{uuid_bytes[10:16].hex()}".upper()
|
|
|
|
|
|
class CloudKit:
|
|
def __init__(self, key_id, private_key, container, env):
|
|
self.key_id = key_id
|
|
self.private_key = private_key
|
|
self.path_base = f"/database/1/{container}/{env}/public"
|
|
|
|
def _sign(self, date, body, path):
|
|
key = serialization.load_pem_private_key(self.private_key, None, default_backend())
|
|
body_hash = base64.b64encode(hashlib.sha256(body.encode()).digest()).decode()
|
|
sig = key.sign(f"{date}:{body_hash}:{path}".encode(), ec.ECDSA(hashes.SHA256()))
|
|
return base64.b64encode(sig).decode()
|
|
|
|
def modify(self, operations):
|
|
path = f"{self.path_base}/records/modify"
|
|
body = json.dumps({'operations': operations})
|
|
date = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'X-Apple-CloudKit-Request-KeyID': self.key_id,
|
|
'X-Apple-CloudKit-Request-ISO8601Date': date,
|
|
'X-Apple-CloudKit-Request-SignatureV1': self._sign(date, body, path),
|
|
}
|
|
r = requests.post(f"{HOST}{path}", headers=headers, data=body, timeout=60)
|
|
if r.status_code == 200:
|
|
return r.json()
|
|
else:
|
|
try:
|
|
err = r.json()
|
|
reason = err.get('reason', 'Unknown')
|
|
code = err.get('serverErrorCode', r.status_code)
|
|
return {'error': f"{code}: {reason}"}
|
|
except:
|
|
return {'error': f"{r.status_code}: {r.text[:200]}"}
|
|
|
|
def query(self, record_type, limit=200, verbose=False):
|
|
"""Query records of a given type."""
|
|
path = f"{self.path_base}/records/query"
|
|
body = json.dumps({
|
|
'query': {'recordType': record_type},
|
|
'resultsLimit': limit
|
|
})
|
|
date = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'X-Apple-CloudKit-Request-KeyID': self.key_id,
|
|
'X-Apple-CloudKit-Request-ISO8601Date': date,
|
|
'X-Apple-CloudKit-Request-SignatureV1': self._sign(date, body, path),
|
|
}
|
|
if verbose:
|
|
print(f" Querying {record_type}...")
|
|
try:
|
|
r = requests.post(f"{HOST}{path}", headers=headers, data=body, timeout=30)
|
|
if verbose:
|
|
print(f" Response: {r.status_code}")
|
|
if r.status_code == 200:
|
|
result = r.json()
|
|
if verbose:
|
|
print(f" Found {len(result.get('records', []))} records")
|
|
return result
|
|
return {'error': f"{r.status_code}: {r.text[:200]}"}
|
|
except requests.exceptions.Timeout:
|
|
return {'error': 'Request timed out after 30s'}
|
|
except Exception as e:
|
|
return {'error': f"Request failed: {e}"}
|
|
|
|
def delete_all(self, record_type, verbose=False):
|
|
"""Delete all records of a given type."""
|
|
total_deleted = 0
|
|
while True:
|
|
result = self.query(record_type, verbose=verbose)
|
|
if 'error' in result:
|
|
print(f" Query error: {result['error']}")
|
|
break
|
|
|
|
records = result.get('records', [])
|
|
if not records:
|
|
break
|
|
|
|
# Build delete operations (recordChangeTag required for delete)
|
|
ops = [{
|
|
'operationType': 'delete',
|
|
'record': {
|
|
'recordName': r['recordName'],
|
|
'recordType': record_type,
|
|
'recordChangeTag': r.get('recordChangeTag', '')
|
|
}
|
|
} for r in records]
|
|
|
|
if verbose:
|
|
print(f" Sending delete for {len(ops)} records...")
|
|
|
|
delete_result = self.modify(ops)
|
|
|
|
if verbose:
|
|
print(f" Delete response: {json.dumps(delete_result)[:500]}")
|
|
|
|
if 'error' in delete_result:
|
|
print(f" Delete error: {delete_result['error']}")
|
|
break
|
|
|
|
# Check for individual record errors
|
|
result_records = delete_result.get('records', [])
|
|
successful = [r for r in result_records if 'serverErrorCode' not in r]
|
|
failed = [r for r in result_records if 'serverErrorCode' in r]
|
|
|
|
if failed and verbose:
|
|
print(f" Failed: {failed[0]}")
|
|
|
|
total_deleted += len(successful)
|
|
print(f" Deleted {len(successful)} {record_type} records" + (f" ({len(failed)} failed)" if failed else ""))
|
|
|
|
time.sleep(0.5)
|
|
|
|
return total_deleted
|
|
|
|
|
|
def import_data(ck, records, name, dry_run, verbose):
|
|
total = 0
|
|
errors = 0
|
|
for i in range(0, len(records), BATCH_SIZE):
|
|
batch = records[i:i+BATCH_SIZE]
|
|
ops = [{'operationType': 'forceReplace', 'record': r} for r in batch]
|
|
|
|
if verbose:
|
|
print(f" Batch {i//BATCH_SIZE + 1}: {len(batch)} records, {len(ops)} ops")
|
|
|
|
if not ops:
|
|
print(f" Warning: Empty batch at index {i}, skipping")
|
|
continue
|
|
|
|
if dry_run:
|
|
print(f" [DRY RUN] Would create {len(batch)} {name}")
|
|
total += len(batch)
|
|
else:
|
|
result = ck.modify(ops)
|
|
if 'error' in result:
|
|
errors += 1
|
|
if errors <= 3: # Only show first 3 errors
|
|
print(f" Error: {result['error']}")
|
|
if verbose and batch:
|
|
print(f" Sample record: {json.dumps(batch[0], indent=2)[:500]}")
|
|
if errors == 3:
|
|
print(" (suppressing further errors...)")
|
|
else:
|
|
result_records = result.get('records', [])
|
|
# Count only successful records (no serverErrorCode)
|
|
successful = [r for r in result_records if 'serverErrorCode' not in r]
|
|
failed = [r for r in result_records if 'serverErrorCode' in r]
|
|
n = len(successful)
|
|
total += n
|
|
print(f" Created {n} {name}")
|
|
if failed:
|
|
print(f" Failed {len(failed)} records: {failed[0].get('serverErrorCode')}: {failed[0].get('reason')}")
|
|
if verbose:
|
|
print(f" Response: {json.dumps(result, indent=2)[:1000]}")
|
|
time.sleep(0.5)
|
|
if errors > 0:
|
|
print(f" Total errors: {errors}")
|
|
return total
|
|
|
|
|
|
def main():
|
|
p = argparse.ArgumentParser(description='Import JSON to CloudKit')
|
|
p.add_argument('--key-id', default=DEFAULT_KEY_ID)
|
|
p.add_argument('--key-file', default=DEFAULT_KEY_FILE)
|
|
p.add_argument('--container', default=CONTAINER)
|
|
p.add_argument('--env', choices=['development', 'production'], default='development')
|
|
p.add_argument('--data-dir', default='./data')
|
|
p.add_argument('--stadiums-only', action='store_true')
|
|
p.add_argument('--games-only', action='store_true')
|
|
p.add_argument('--league-structure-only', action='store_true', help='Import only league structure')
|
|
p.add_argument('--team-aliases-only', action='store_true', help='Import only team aliases')
|
|
p.add_argument('--stadium-aliases-only', action='store_true', help='Import only stadium aliases')
|
|
p.add_argument('--canonical-only', action='store_true', help='Import only canonical data (league structure + team aliases + stadium aliases)')
|
|
p.add_argument('--delete-all', action='store_true', help='Delete all records before importing')
|
|
p.add_argument('--delete-only', action='store_true', help='Only delete records, do not import')
|
|
p.add_argument('--dry-run', action='store_true')
|
|
p.add_argument('--verbose', '-v', action='store_true')
|
|
p.add_argument('--interactive', '-i', action='store_true', help='Show interactive menu')
|
|
args = p.parse_args()
|
|
|
|
# Show interactive menu if no action flags provided or --interactive
|
|
has_action_flag = any([
|
|
args.stadiums_only, args.games_only, args.league_structure_only,
|
|
args.team_aliases_only, args.stadium_aliases_only, args.canonical_only,
|
|
args.delete_all, args.delete_only, args.dry_run
|
|
])
|
|
|
|
if args.interactive or not has_action_flag:
|
|
choice = show_menu()
|
|
if choice is None:
|
|
return
|
|
|
|
# Map menu choice to flags
|
|
if choice == 1: # Import all
|
|
pass # Default behavior
|
|
elif choice == 2: # Stadiums only
|
|
args.stadiums_only = True
|
|
elif choice == 3: # Games only
|
|
args.games_only = True
|
|
elif choice == 4: # League structure only
|
|
args.league_structure_only = True
|
|
elif choice == 5: # Team aliases only
|
|
args.team_aliases_only = True
|
|
elif choice == 6: # Stadium aliases only
|
|
args.stadium_aliases_only = True
|
|
elif choice == 7: # Canonical only
|
|
args.canonical_only = True
|
|
elif choice == 8: # Delete all then import
|
|
args.delete_all = True
|
|
elif choice == 9: # Delete only
|
|
args.delete_only = True
|
|
elif choice == 10: # Dry run
|
|
args.dry_run = True
|
|
|
|
print(f"\n{'='*50}")
|
|
print(f"CloudKit Import {'(DRY RUN)' if args.dry_run else ''}")
|
|
print(f"{'='*50}")
|
|
print(f"Container: {args.container}")
|
|
print(f"Environment: {args.env}\n")
|
|
|
|
data_dir = Path(args.data_dir)
|
|
|
|
# Load canonical format files (from canonicalization pipeline)
|
|
# Fall back to legacy format for backward compatibility
|
|
if (data_dir / 'stadiums_canonical.json').exists():
|
|
stadiums = json.load(open(data_dir / 'stadiums_canonical.json'))
|
|
use_canonical = True
|
|
else:
|
|
stadiums = json.load(open(data_dir / 'stadiums.json'))
|
|
use_canonical = False
|
|
|
|
if (data_dir / 'teams_canonical.json').exists():
|
|
teams = json.load(open(data_dir / 'teams_canonical.json'))
|
|
else:
|
|
teams = [] # Legacy: extracted from stadiums
|
|
|
|
if (data_dir / 'games_canonical.json').exists():
|
|
games = json.load(open(data_dir / 'games_canonical.json'))
|
|
elif (data_dir / 'games.json').exists():
|
|
games = json.load(open(data_dir / 'games.json'))
|
|
else:
|
|
games = []
|
|
|
|
league_structure = json.load(open(data_dir / 'league_structure.json')) if (data_dir / 'league_structure.json').exists() else []
|
|
team_aliases = json.load(open(data_dir / 'team_aliases.json')) if (data_dir / 'team_aliases.json').exists() else []
|
|
stadium_aliases = json.load(open(data_dir / 'stadium_aliases.json')) if (data_dir / 'stadium_aliases.json').exists() else []
|
|
|
|
print(f"Using {'canonical' if use_canonical else 'legacy'} format")
|
|
print(f"Loaded {len(stadiums)} stadiums, {len(teams)} teams, {len(games)} games")
|
|
print(f"Loaded {len(league_structure)} league structures, {len(team_aliases)} team aliases, {len(stadium_aliases)} stadium aliases\n")
|
|
|
|
ck = None
|
|
if not args.dry_run:
|
|
if not HAS_CRYPTO:
|
|
sys.exit("Error: pip install cryptography")
|
|
if not os.path.exists(args.key_file):
|
|
sys.exit(f"Error: Key file not found: {args.key_file}")
|
|
ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env)
|
|
|
|
# Handle deletion
|
|
if args.delete_all or args.delete_only:
|
|
if not ck:
|
|
sys.exit("Error: --key-id and --key-file required for deletion")
|
|
|
|
print("--- Deleting Existing Records ---")
|
|
# Delete in order: dependent records first, then base records
|
|
for record_type in ['Game', 'TeamAlias', 'StadiumAlias', 'Team', 'LeagueStructure', 'Stadium']:
|
|
print(f" Deleting {record_type} records...")
|
|
deleted = ck.delete_all(record_type, verbose=args.verbose)
|
|
print(f" Deleted {deleted} {record_type} records")
|
|
|
|
if args.delete_only:
|
|
print(f"\n{'='*50}")
|
|
print("DELETE COMPLETE")
|
|
print()
|
|
return
|
|
|
|
stats = {'stadiums': 0, 'teams': 0, 'games': 0, 'league_structures': 0, 'team_aliases': 0, 'stadium_aliases': 0}
|
|
team_map = {}
|
|
|
|
# Determine what to import based on flags
|
|
import_stadiums = not args.games_only and not args.league_structure_only and not args.team_aliases_only and not args.stadium_aliases_only and not args.canonical_only
|
|
import_teams = not args.games_only and not args.league_structure_only and not args.team_aliases_only and not args.stadium_aliases_only and not args.canonical_only
|
|
import_games = not args.stadiums_only and not args.league_structure_only and not args.team_aliases_only and not args.stadium_aliases_only and not args.canonical_only
|
|
import_league_structure = args.league_structure_only or args.canonical_only or (not args.stadiums_only and not args.games_only and not args.team_aliases_only and not args.stadium_aliases_only)
|
|
import_team_aliases = args.team_aliases_only or args.canonical_only or (not args.stadiums_only and not args.games_only and not args.league_structure_only and not args.stadium_aliases_only)
|
|
import_stadium_aliases = args.stadium_aliases_only or args.canonical_only or (not args.stadiums_only and not args.games_only and not args.league_structure_only and not args.team_aliases_only)
|
|
|
|
# Build stadium ID lookup
|
|
# Canonical format uses canonical_id, legacy uses id
|
|
def get_stadium_id(s):
|
|
return s.get('canonical_id', s.get('id', ''))
|
|
|
|
def get_team_id(t):
|
|
return t.get('canonical_id', '')
|
|
|
|
stadium_id_map = {get_stadium_id(s): deterministic_uuid(get_stadium_id(s)) for s in stadiums}
|
|
|
|
# Import stadiums
|
|
if import_stadiums:
|
|
print("--- Stadiums ---")
|
|
recs = []
|
|
for s in stadiums:
|
|
stadium_id = get_stadium_id(s)
|
|
record_name = deterministic_uuid(stadium_id)
|
|
# Canonical format uses primary_team_abbrevs, legacy uses team_abbrevs
|
|
team_abbrevs = s.get('primary_team_abbrevs', s.get('team_abbrevs', []))
|
|
|
|
fields = {
|
|
'stadiumId': {'value': record_name},
|
|
'canonicalId': {'value': stadium_id}, # Store canonical_id as string
|
|
'name': {'value': s['name']},
|
|
'city': {'value': s['city']},
|
|
'state': {'value': s.get('state', '')},
|
|
'sport': {'value': s['sport']},
|
|
'source': {'value': s.get('source', 'canonical')},
|
|
'teamAbbrevs': {'value': team_abbrevs},
|
|
}
|
|
if s.get('latitude'):
|
|
fields['location'] = {'value': {'latitude': s['latitude'], 'longitude': s['longitude']}}
|
|
if s.get('capacity'):
|
|
fields['capacity'] = {'value': s['capacity']}
|
|
|
|
recs.append({'recordType': 'Stadium', 'recordName': record_name, 'fields': fields})
|
|
stats['stadiums'] = import_data(ck, recs, 'stadiums', args.dry_run, args.verbose)
|
|
|
|
# Import teams (canonical format has dedicated teams file)
|
|
if import_teams:
|
|
print("--- Teams ---")
|
|
if teams:
|
|
# Canonical format: use teams_canonical.json
|
|
recs = []
|
|
for t in teams:
|
|
team_id = get_team_id(t)
|
|
record_name = deterministic_uuid(team_id)
|
|
team_map[(t['sport'], t['abbreviation'])] = record_name
|
|
|
|
fields = {
|
|
'teamId': {'value': record_name},
|
|
'canonicalId': {'value': team_id}, # Store canonical_id as string
|
|
'abbreviation': {'value': t['abbreviation']},
|
|
'name': {'value': t['name']},
|
|
'city': {'value': t['city']},
|
|
'sport': {'value': t['sport']},
|
|
'stadiumCanonicalId': {'value': t.get('stadium_canonical_id', '')},
|
|
}
|
|
if t.get('conference_id'):
|
|
fields['conferenceId'] = {'value': t['conference_id']}
|
|
if t.get('division_id'):
|
|
fields['divisionId'] = {'value': t['division_id']}
|
|
|
|
recs.append({'recordType': 'Team', 'recordName': record_name, 'fields': fields})
|
|
stats['teams'] = import_data(ck, recs, 'teams', args.dry_run, args.verbose)
|
|
else:
|
|
# Legacy format: extract teams from stadiums
|
|
teams_dict = {}
|
|
for s in stadiums:
|
|
team_abbrevs = s.get('primary_team_abbrevs', s.get('team_abbrevs', []))
|
|
for abbr in team_abbrevs:
|
|
team_key = f"{s['sport']}_{abbr}"
|
|
if team_key not in teams_dict:
|
|
teams_dict[team_key] = {'abbr': abbr, 'city': s['city'], 'sport': s['sport']}
|
|
team_uuid = deterministic_uuid(team_key)
|
|
team_map[(s['sport'], abbr)] = team_uuid
|
|
|
|
recs = [{
|
|
'recordType': 'Team', 'recordName': deterministic_uuid(team_key),
|
|
'fields': {
|
|
'teamId': {'value': deterministic_uuid(team_key)},
|
|
'canonicalId': {'value': team_key},
|
|
'abbreviation': {'value': info['abbr']},
|
|
'name': {'value': info['abbr']},
|
|
'city': {'value': info['city']},
|
|
'sport': {'value': info['sport']},
|
|
}
|
|
} for team_key, info in teams_dict.items()]
|
|
stats['teams'] = import_data(ck, recs, 'teams', args.dry_run, args.verbose)
|
|
|
|
# Import games
|
|
if import_games and games:
|
|
# Detect canonical game format (has canonical_id field)
|
|
use_canonical_games = games and 'canonical_id' in games[0]
|
|
|
|
# Rebuild team_map if only importing games (--games-only flag)
|
|
if not team_map:
|
|
if teams:
|
|
# Canonical format: use teams_canonical.json
|
|
for t in teams:
|
|
team_id = get_team_id(t)
|
|
team_map[(t['sport'], t['abbreviation'])] = deterministic_uuid(team_id)
|
|
else:
|
|
# Legacy format: extract from stadiums
|
|
for s in stadiums:
|
|
team_abbrevs = s.get('primary_team_abbrevs', s.get('team_abbrevs', []))
|
|
for abbr in team_abbrevs:
|
|
team_key = f"{s['sport']}_{abbr}"
|
|
team_map[(s['sport'], abbr)] = deterministic_uuid(team_key)
|
|
|
|
# Build team -> stadium map for stadiumRef (legacy format needs this)
|
|
team_stadium_map = {}
|
|
for s in stadiums:
|
|
stadium_id = get_stadium_id(s)
|
|
stadium_uuid = stadium_id_map[stadium_id]
|
|
team_abbrevs = s.get('primary_team_abbrevs', s.get('team_abbrevs', []))
|
|
for abbr in team_abbrevs:
|
|
team_stadium_map[(s['sport'], abbr)] = stadium_uuid
|
|
|
|
print("--- Games ---")
|
|
print(f" Using {'canonical' if use_canonical_games else 'legacy'} game format")
|
|
|
|
# Deduplicate games by ID (canonical_id or id)
|
|
seen_ids = set()
|
|
unique_games = []
|
|
for g in games:
|
|
game_id = g.get('canonical_id', g.get('id', ''))
|
|
if game_id not in seen_ids:
|
|
seen_ids.add(game_id)
|
|
unique_games.append(g)
|
|
|
|
if len(unique_games) < len(games):
|
|
print(f" Removed {len(games) - len(unique_games)} duplicate games")
|
|
|
|
recs = []
|
|
for g in unique_games:
|
|
# Get game ID (canonical or legacy)
|
|
game_id = g.get('canonical_id', g.get('id', ''))
|
|
game_uuid = deterministic_uuid(game_id)
|
|
sport = g['sport']
|
|
|
|
fields = {
|
|
'gameId': {'value': game_uuid},
|
|
'canonicalId': {'value': game_id}, # Store canonical_id as string
|
|
'sport': {'value': sport},
|
|
'season': {'value': g.get('season', '')},
|
|
'source': {'value': g.get('source', 'canonical' if use_canonical_games else '')},
|
|
}
|
|
|
|
# Parse date/time
|
|
if g.get('date'):
|
|
try:
|
|
# Parse time like "7:30p" or "10:00a"
|
|
time_str = g.get('time', '7:00p')
|
|
hour, minute = 19, 0
|
|
if time_str:
|
|
clean_time = time_str.lower().replace(' ', '')
|
|
is_pm = 'p' in clean_time
|
|
time_parts = clean_time.replace('p', '').replace('a', '').split(':')
|
|
if time_parts:
|
|
hour = int(time_parts[0])
|
|
if is_pm and hour != 12:
|
|
hour += 12
|
|
elif not is_pm and hour == 12:
|
|
hour = 0
|
|
if len(time_parts) > 1:
|
|
minute = int(time_parts[1])
|
|
dt = datetime.strptime(f"{g['date']} {hour:02d}:{minute:02d}", '%Y-%m-%d %H:%M')
|
|
# CloudKit expects TIMESTAMP type with milliseconds since epoch
|
|
fields['dateTime'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
|
|
except Exception as e:
|
|
if args.verbose:
|
|
print(f" Warning: Failed to parse date/time for {game_id}: {e}")
|
|
|
|
# Team references
|
|
if use_canonical_games:
|
|
# Canonical format: extract team abbrev from canonical ID (team_nba_atl -> atl)
|
|
home_team_canonical_id = g.get('home_team_canonical_id', '')
|
|
away_team_canonical_id = g.get('away_team_canonical_id', '')
|
|
home_team_uuid = deterministic_uuid(home_team_canonical_id)
|
|
away_team_uuid = deterministic_uuid(away_team_canonical_id)
|
|
else:
|
|
# Legacy format: use abbreviations
|
|
home_team_key = f"{sport}_{g.get('home_team_abbrev', '')}"
|
|
away_team_key = f"{sport}_{g.get('away_team_abbrev', '')}"
|
|
home_team_uuid = deterministic_uuid(home_team_key)
|
|
away_team_uuid = deterministic_uuid(away_team_key)
|
|
|
|
fields['homeTeamRef'] = {'value': {'recordName': home_team_uuid, 'action': 'NONE'}}
|
|
fields['awayTeamRef'] = {'value': {'recordName': away_team_uuid, 'action': 'NONE'}}
|
|
|
|
# Stadium reference
|
|
if use_canonical_games and g.get('stadium_canonical_id'):
|
|
# Canonical format: use stadium_canonical_id directly
|
|
stadium_canonical_id = g['stadium_canonical_id']
|
|
stadium_uuid = stadium_id_map.get(stadium_canonical_id)
|
|
if stadium_uuid:
|
|
fields['stadiumRef'] = {'value': {'recordName': stadium_uuid, 'action': 'NONE'}}
|
|
fields['stadiumCanonicalId'] = {'value': stadium_canonical_id}
|
|
else:
|
|
# Legacy format: look up by home team abbrev
|
|
stadium_uuid = team_stadium_map.get((sport, g.get('home_team_abbrev', '')))
|
|
if stadium_uuid:
|
|
fields['stadiumRef'] = {'value': {'recordName': stadium_uuid, 'action': 'NONE'}}
|
|
|
|
recs.append({'recordType': 'Game', 'recordName': game_uuid, 'fields': fields})
|
|
|
|
stats['games'] = import_data(ck, recs, 'games', args.dry_run, args.verbose)
|
|
|
|
# Import league structure
|
|
if import_league_structure and league_structure:
|
|
print("--- League Structure ---")
|
|
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
|
|
recs = [{
|
|
'recordType': 'LeagueStructure',
|
|
'recordName': ls['id'], # Use the id as recordName
|
|
'fields': {
|
|
'structureId': {'value': ls['id']},
|
|
'sport': {'value': ls['sport']},
|
|
'type': {'value': ls['type']},
|
|
'name': {'value': ls['name']},
|
|
'displayOrder': {'value': ls['display_order']},
|
|
'schemaVersion': {'value': 1},
|
|
'lastModified': {'value': now_ms, 'type': 'TIMESTAMP'},
|
|
**({'abbreviation': {'value': ls['abbreviation']}} if ls.get('abbreviation') else {}),
|
|
**({'parentId': {'value': ls['parent_id']}} if ls.get('parent_id') else {}),
|
|
}
|
|
} for ls in league_structure]
|
|
stats['league_structures'] = import_data(ck, recs, 'league structures', args.dry_run, args.verbose)
|
|
|
|
# Import team aliases
|
|
if import_team_aliases and team_aliases:
|
|
print("--- Team Aliases ---")
|
|
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
|
|
recs = []
|
|
for ta in team_aliases:
|
|
fields = {
|
|
'aliasId': {'value': ta['id']},
|
|
'teamCanonicalId': {'value': ta['team_canonical_id']},
|
|
'aliasType': {'value': ta['alias_type']},
|
|
'aliasValue': {'value': ta['alias_value']},
|
|
'schemaVersion': {'value': 1},
|
|
'lastModified': {'value': now_ms, 'type': 'TIMESTAMP'},
|
|
}
|
|
# Add optional date fields
|
|
if ta.get('valid_from'):
|
|
try:
|
|
dt = datetime.strptime(ta['valid_from'], '%Y-%m-%d')
|
|
fields['validFrom'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
|
|
except:
|
|
pass
|
|
if ta.get('valid_until'):
|
|
try:
|
|
dt = datetime.strptime(ta['valid_until'], '%Y-%m-%d')
|
|
fields['validUntil'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
|
|
except:
|
|
pass
|
|
recs.append({
|
|
'recordType': 'TeamAlias',
|
|
'recordName': ta['id'], # Use the id as recordName
|
|
'fields': fields
|
|
})
|
|
stats['team_aliases'] = import_data(ck, recs, 'team aliases', args.dry_run, args.verbose)
|
|
|
|
# Import stadium aliases
|
|
if import_stadium_aliases and stadium_aliases:
|
|
print("--- Stadium Aliases ---")
|
|
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
|
|
recs = []
|
|
for sa in stadium_aliases:
|
|
fields = {
|
|
'aliasName': {'value': sa['alias_name'].lower()}, # Normalize to lowercase
|
|
'stadiumCanonicalId': {'value': sa['stadium_canonical_id']},
|
|
'schemaVersion': {'value': 1},
|
|
'lastModified': {'value': now_ms, 'type': 'TIMESTAMP'},
|
|
}
|
|
# Add optional date fields
|
|
if sa.get('valid_from'):
|
|
try:
|
|
dt = datetime.strptime(sa['valid_from'], '%Y-%m-%d')
|
|
fields['validFrom'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
|
|
except:
|
|
pass
|
|
if sa.get('valid_until'):
|
|
try:
|
|
dt = datetime.strptime(sa['valid_until'], '%Y-%m-%d')
|
|
fields['validUntil'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
|
|
except:
|
|
pass
|
|
# Extract sport from stadium_canonical_id (e.g., "stadium_nba_td_garden" -> "nba")
|
|
# This makes record names unique for shared venues (TD Garden has NBA and NHL entries)
|
|
stadium_id = sa['stadium_canonical_id']
|
|
sport = stadium_id.split('_')[1] if '_' in stadium_id else 'unknown'
|
|
record_name = f"{sport}_{sa['alias_name'].lower()}"
|
|
recs.append({
|
|
'recordType': 'StadiumAlias',
|
|
'recordName': record_name,
|
|
'fields': fields
|
|
})
|
|
stats['stadium_aliases'] = import_data(ck, recs, 'stadium aliases', args.dry_run, args.verbose)
|
|
|
|
print(f"\n{'='*50}")
|
|
print(f"COMPLETE: {stats['stadiums']} stadiums, {stats['teams']} teams, {stats['games']} games, {stats['league_structures']} league structures, {stats['team_aliases']} team aliases, {stats['stadium_aliases']} stadium aliases")
|
|
if args.dry_run:
|
|
print("[DRY RUN - nothing imported]")
|
|
print()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|