Files
Sportstime/Scripts/cloudkit_import.py
Trey t 1ee47df53e Add StadiumAlias CloudKit sync and offline-first data architecture
- Add CKStadiumAlias model for CloudKit record mapping
- Add fetchStadiumAliases/fetchStadiumAliasChanges to CloudKitService
- Add syncStadiumAliases to CanonicalSyncService for delta sync
- Add subscribeToStadiumAliasUpdates for push notifications
- Update cloudkit_import.py with --stadium-aliases-only option

Data Architecture Updates:
- Remove obsolete provider files (CanonicalDataProvider, CloudKitDataProvider, StubDataProvider)
- AppDataProvider now reads exclusively from SwiftData
- Add background CloudKit sync on app startup (non-blocking)
- Document data architecture in CLAUDE.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-08 22:20:07 -06:00

573 lines
25 KiB
Python
Executable File

#!/usr/bin/env python3
"""
CloudKit Import Script
======================
Imports JSON data into CloudKit. Run separately from pipeline.
Setup:
1. CloudKit Dashboard > Tokens & Keys > Server-to-Server Keys
2. Create key with Read/Write access to public database
3. Download .p8 file and note Key ID
Usage:
python cloudkit_import.py --dry-run # Preview first
python cloudkit_import.py --key-id XX --key-file key.p8 # Import all
python cloudkit_import.py --stadiums-only ... # Stadiums first
python cloudkit_import.py --games-only ... # Games after
python cloudkit_import.py --stadium-aliases-only ... # Stadium aliases only
python cloudkit_import.py --delete-all ... # Delete then import
python cloudkit_import.py --delete-only ... # Delete only (no import)
"""
import argparse, json, time, os, sys, hashlib, base64, requests
from datetime import datetime, timezone
from pathlib import Path
try:
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
HAS_CRYPTO = True
except ImportError:
HAS_CRYPTO = False
CONTAINER = "iCloud.com.sportstime.app"
HOST = "https://api.apple-cloudkit.com"
BATCH_SIZE = 200
# Hardcoded credentials
DEFAULT_KEY_ID = "152be0715e0276e31aaea5cbfe79dc872f298861a55c70fae14e5fe3e026cff9"
DEFAULT_KEY_FILE = "eckey.pem"
def show_menu():
"""Show interactive menu and return selected action."""
print("\n" + "="*50)
print("CloudKit Import - Select Action")
print("="*50)
print("\n 1. Import all (stadiums, teams, games, league structure, team aliases, stadium aliases)")
print(" 2. Stadiums only")
print(" 3. Games only")
print(" 4. League structure only")
print(" 5. Team aliases only")
print(" 6. Stadium aliases only")
print(" 7. Canonical only (league structure + team aliases + stadium aliases)")
print(" 8. Delete all then import")
print(" 9. Delete only (no import)")
print(" 10. Dry run (preview only)")
print(" 0. Exit")
print()
while True:
try:
choice = input("Enter choice [1-10, 0 to exit]: ").strip()
if choice == '0':
return None
if choice in ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']:
return int(choice)
print("Invalid choice. Please enter 1-10 or 0.")
except (EOFError, KeyboardInterrupt):
print("\nExiting.")
return None
def deterministic_uuid(string: str) -> str:
"""
Generate a deterministic UUID from a string using SHA256.
Matches the StubDataProvider.deterministicUUID() implementation in Swift.
"""
# SHA256 hash of the string
hash_bytes = hashlib.sha256(string.encode('utf-8')).digest()
# Use first 16 bytes
uuid_bytes = bytearray(hash_bytes[:16])
# Set UUID version (4) and variant bits to match Swift implementation
uuid_bytes[6] = (uuid_bytes[6] & 0x0F) | 0x40
uuid_bytes[8] = (uuid_bytes[8] & 0x3F) | 0x80
# Format as UUID string
return f"{uuid_bytes[0:4].hex()}-{uuid_bytes[4:6].hex()}-{uuid_bytes[6:8].hex()}-{uuid_bytes[8:10].hex()}-{uuid_bytes[10:16].hex()}".upper()
class CloudKit:
def __init__(self, key_id, private_key, container, env):
self.key_id = key_id
self.private_key = private_key
self.path_base = f"/database/1/{container}/{env}/public"
def _sign(self, date, body, path):
key = serialization.load_pem_private_key(self.private_key, None, default_backend())
body_hash = base64.b64encode(hashlib.sha256(body.encode()).digest()).decode()
sig = key.sign(f"{date}:{body_hash}:{path}".encode(), ec.ECDSA(hashes.SHA256()))
return base64.b64encode(sig).decode()
def modify(self, operations):
path = f"{self.path_base}/records/modify"
body = json.dumps({'operations': operations})
date = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
headers = {
'Content-Type': 'application/json',
'X-Apple-CloudKit-Request-KeyID': self.key_id,
'X-Apple-CloudKit-Request-ISO8601Date': date,
'X-Apple-CloudKit-Request-SignatureV1': self._sign(date, body, path),
}
r = requests.post(f"{HOST}{path}", headers=headers, data=body, timeout=60)
if r.status_code == 200:
return r.json()
else:
try:
err = r.json()
reason = err.get('reason', 'Unknown')
code = err.get('serverErrorCode', r.status_code)
return {'error': f"{code}: {reason}"}
except:
return {'error': f"{r.status_code}: {r.text[:200]}"}
def query(self, record_type, limit=200, verbose=False):
"""Query records of a given type."""
path = f"{self.path_base}/records/query"
body = json.dumps({
'query': {'recordType': record_type},
'resultsLimit': limit
})
date = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
headers = {
'Content-Type': 'application/json',
'X-Apple-CloudKit-Request-KeyID': self.key_id,
'X-Apple-CloudKit-Request-ISO8601Date': date,
'X-Apple-CloudKit-Request-SignatureV1': self._sign(date, body, path),
}
if verbose:
print(f" Querying {record_type}...")
try:
r = requests.post(f"{HOST}{path}", headers=headers, data=body, timeout=30)
if verbose:
print(f" Response: {r.status_code}")
if r.status_code == 200:
result = r.json()
if verbose:
print(f" Found {len(result.get('records', []))} records")
return result
return {'error': f"{r.status_code}: {r.text[:200]}"}
except requests.exceptions.Timeout:
return {'error': 'Request timed out after 30s'}
except Exception as e:
return {'error': f"Request failed: {e}"}
def delete_all(self, record_type, verbose=False):
"""Delete all records of a given type."""
total_deleted = 0
while True:
result = self.query(record_type, verbose=verbose)
if 'error' in result:
print(f" Query error: {result['error']}")
break
records = result.get('records', [])
if not records:
break
# Build delete operations (recordChangeTag required for delete)
ops = [{
'operationType': 'delete',
'record': {
'recordName': r['recordName'],
'recordType': record_type,
'recordChangeTag': r.get('recordChangeTag', '')
}
} for r in records]
if verbose:
print(f" Sending delete for {len(ops)} records...")
delete_result = self.modify(ops)
if verbose:
print(f" Delete response: {json.dumps(delete_result)[:500]}")
if 'error' in delete_result:
print(f" Delete error: {delete_result['error']}")
break
# Check for individual record errors
result_records = delete_result.get('records', [])
successful = [r for r in result_records if 'serverErrorCode' not in r]
failed = [r for r in result_records if 'serverErrorCode' in r]
if failed and verbose:
print(f" Failed: {failed[0]}")
total_deleted += len(successful)
print(f" Deleted {len(successful)} {record_type} records" + (f" ({len(failed)} failed)" if failed else ""))
time.sleep(0.5)
return total_deleted
def import_data(ck, records, name, dry_run, verbose):
total = 0
errors = 0
for i in range(0, len(records), BATCH_SIZE):
batch = records[i:i+BATCH_SIZE]
ops = [{'operationType': 'forceReplace', 'record': r} for r in batch]
if verbose:
print(f" Batch {i//BATCH_SIZE + 1}: {len(batch)} records, {len(ops)} ops")
if not ops:
print(f" Warning: Empty batch at index {i}, skipping")
continue
if dry_run:
print(f" [DRY RUN] Would create {len(batch)} {name}")
total += len(batch)
else:
result = ck.modify(ops)
if 'error' in result:
errors += 1
if errors <= 3: # Only show first 3 errors
print(f" Error: {result['error']}")
if verbose and batch:
print(f" Sample record: {json.dumps(batch[0], indent=2)[:500]}")
if errors == 3:
print(" (suppressing further errors...)")
else:
result_records = result.get('records', [])
# Count only successful records (no serverErrorCode)
successful = [r for r in result_records if 'serverErrorCode' not in r]
failed = [r for r in result_records if 'serverErrorCode' in r]
n = len(successful)
total += n
print(f" Created {n} {name}")
if failed:
print(f" Failed {len(failed)} records: {failed[0].get('serverErrorCode')}: {failed[0].get('reason')}")
if verbose:
print(f" Response: {json.dumps(result, indent=2)[:1000]}")
time.sleep(0.5)
if errors > 0:
print(f" Total errors: {errors}")
return total
def main():
p = argparse.ArgumentParser(description='Import JSON to CloudKit')
p.add_argument('--key-id', default=DEFAULT_KEY_ID)
p.add_argument('--key-file', default=DEFAULT_KEY_FILE)
p.add_argument('--container', default=CONTAINER)
p.add_argument('--env', choices=['development', 'production'], default='development')
p.add_argument('--data-dir', default='./data')
p.add_argument('--stadiums-only', action='store_true')
p.add_argument('--games-only', action='store_true')
p.add_argument('--league-structure-only', action='store_true', help='Import only league structure')
p.add_argument('--team-aliases-only', action='store_true', help='Import only team aliases')
p.add_argument('--stadium-aliases-only', action='store_true', help='Import only stadium aliases')
p.add_argument('--canonical-only', action='store_true', help='Import only canonical data (league structure + team aliases + stadium aliases)')
p.add_argument('--delete-all', action='store_true', help='Delete all records before importing')
p.add_argument('--delete-only', action='store_true', help='Only delete records, do not import')
p.add_argument('--dry-run', action='store_true')
p.add_argument('--verbose', '-v', action='store_true')
p.add_argument('--interactive', '-i', action='store_true', help='Show interactive menu')
args = p.parse_args()
# Show interactive menu if no action flags provided or --interactive
has_action_flag = any([
args.stadiums_only, args.games_only, args.league_structure_only,
args.team_aliases_only, args.stadium_aliases_only, args.canonical_only,
args.delete_all, args.delete_only, args.dry_run
])
if args.interactive or not has_action_flag:
choice = show_menu()
if choice is None:
return
# Map menu choice to flags
if choice == 1: # Import all
pass # Default behavior
elif choice == 2: # Stadiums only
args.stadiums_only = True
elif choice == 3: # Games only
args.games_only = True
elif choice == 4: # League structure only
args.league_structure_only = True
elif choice == 5: # Team aliases only
args.team_aliases_only = True
elif choice == 6: # Stadium aliases only
args.stadium_aliases_only = True
elif choice == 7: # Canonical only
args.canonical_only = True
elif choice == 8: # Delete all then import
args.delete_all = True
elif choice == 9: # Delete only
args.delete_only = True
elif choice == 10: # Dry run
args.dry_run = True
print(f"\n{'='*50}")
print(f"CloudKit Import {'(DRY RUN)' if args.dry_run else ''}")
print(f"{'='*50}")
print(f"Container: {args.container}")
print(f"Environment: {args.env}\n")
data_dir = Path(args.data_dir)
stadiums = json.load(open(data_dir / 'stadiums.json'))
games = json.load(open(data_dir / 'games.json')) if (data_dir / 'games.json').exists() else []
league_structure = json.load(open(data_dir / 'league_structure.json')) if (data_dir / 'league_structure.json').exists() else []
team_aliases = json.load(open(data_dir / 'team_aliases.json')) if (data_dir / 'team_aliases.json').exists() else []
stadium_aliases = json.load(open(data_dir / 'stadium_aliases.json')) if (data_dir / 'stadium_aliases.json').exists() else []
print(f"Loaded {len(stadiums)} stadiums, {len(games)} games, {len(league_structure)} league structures, {len(team_aliases)} team aliases, {len(stadium_aliases)} stadium aliases\n")
ck = None
if not args.dry_run:
if not HAS_CRYPTO:
sys.exit("Error: pip install cryptography")
if not os.path.exists(args.key_file):
sys.exit(f"Error: Key file not found: {args.key_file}")
ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env)
# Handle deletion
if args.delete_all or args.delete_only:
if not ck:
sys.exit("Error: --key-id and --key-file required for deletion")
print("--- Deleting Existing Records ---")
# Delete in order: dependent records first, then base records
for record_type in ['Game', 'TeamAlias', 'StadiumAlias', 'Team', 'LeagueStructure', 'Stadium']:
print(f" Deleting {record_type} records...")
deleted = ck.delete_all(record_type, verbose=args.verbose)
print(f" Deleted {deleted} {record_type} records")
if args.delete_only:
print(f"\n{'='*50}")
print("DELETE COMPLETE")
print()
return
stats = {'stadiums': 0, 'teams': 0, 'games': 0, 'league_structures': 0, 'team_aliases': 0, 'stadium_aliases': 0}
team_map = {}
# Determine what to import based on flags
import_stadiums = not args.games_only and not args.league_structure_only and not args.team_aliases_only and not args.stadium_aliases_only and not args.canonical_only
import_teams = not args.games_only and not args.league_structure_only and not args.team_aliases_only and not args.stadium_aliases_only and not args.canonical_only
import_games = not args.stadiums_only and not args.league_structure_only and not args.team_aliases_only and not args.stadium_aliases_only and not args.canonical_only
import_league_structure = args.league_structure_only or args.canonical_only or (not args.stadiums_only and not args.games_only and not args.team_aliases_only and not args.stadium_aliases_only)
import_team_aliases = args.team_aliases_only or args.canonical_only or (not args.stadiums_only and not args.games_only and not args.league_structure_only and not args.stadium_aliases_only)
import_stadium_aliases = args.stadium_aliases_only or args.canonical_only or (not args.stadiums_only and not args.games_only and not args.league_structure_only and not args.team_aliases_only)
# Build stadium UUID lookup (stadium string ID -> UUID)
stadium_uuid_map = {s['id']: deterministic_uuid(s['id']) for s in stadiums}
# Import stadiums & teams
if import_stadiums:
print("--- Stadiums ---")
recs = [{
'recordType': 'Stadium', 'recordName': stadium_uuid_map[s['id']],
'fields': {
'stadiumId': {'value': stadium_uuid_map[s['id']]}, 'name': {'value': s['name']},
'city': {'value': s['city']}, 'state': {'value': s.get('state', '')},
'sport': {'value': s['sport']}, 'source': {'value': s.get('source', '')},
'teamAbbrevs': {'value': s.get('team_abbrevs', [])},
**({'location': {'value': {'latitude': s['latitude'], 'longitude': s['longitude']}}}
if s.get('latitude') else {}),
**({'capacity': {'value': s['capacity']}} if s.get('capacity') else {}),
}
} for s in stadiums]
stats['stadiums'] = import_data(ck, recs, 'stadiums', args.dry_run, args.verbose)
print("--- Teams ---")
teams = {}
for s in stadiums:
for abbr in s.get('team_abbrevs', []):
team_key = f"{s['sport']}_{abbr}" # Match Swift: "{sport.rawValue}_{abbrev}"
if team_key not in teams:
teams[team_key] = {'abbr': abbr, 'city': s['city'], 'sport': s['sport']}
team_uuid = deterministic_uuid(team_key)
team_map[(s['sport'], abbr)] = team_uuid
recs = [{
'recordType': 'Team', 'recordName': deterministic_uuid(team_key),
'fields': {
'teamId': {'value': deterministic_uuid(team_key)},
'abbreviation': {'value': info['abbr']},
'name': {'value': info['abbr']},
'city': {'value': info['city']},
'sport': {'value': info['sport']},
}
} for team_key, info in teams.items()]
stats['teams'] = import_data(ck, recs, 'teams', args.dry_run, args.verbose)
# Import games
if import_games and games:
# Rebuild team_map if only importing games (--games-only flag)
if not team_map:
for s in stadiums:
for abbr in s.get('team_abbrevs', []):
team_key = f"{s['sport']}_{abbr}"
team_map[(s['sport'], abbr)] = deterministic_uuid(team_key)
# Build team -> stadium map for stadiumRef
team_stadium_map = {}
for s in stadiums:
stadium_uuid = stadium_uuid_map[s['id']]
for abbr in s.get('team_abbrevs', []):
team_stadium_map[(s['sport'], abbr)] = stadium_uuid
print("--- Games ---")
# Deduplicate games by ID
seen_ids = set()
unique_games = []
for g in games:
if g['id'] not in seen_ids:
seen_ids.add(g['id'])
unique_games.append(g)
if len(unique_games) < len(games):
print(f" Removed {len(games) - len(unique_games)} duplicate games")
recs = []
for g in unique_games:
game_uuid = deterministic_uuid(g['id'])
sport = g['sport']
fields = {
'gameId': {'value': game_uuid}, 'sport': {'value': sport},
'season': {'value': g.get('season', '')}, 'source': {'value': g.get('source', '')},
}
if g.get('date'):
try:
# Parse time like "7:30p" or "10:00a"
time_str = g.get('time', '7:00p')
hour, minute = 19, 0
if time_str:
clean_time = time_str.lower().replace(' ', '')
is_pm = 'p' in clean_time
time_parts = clean_time.replace('p', '').replace('a', '').split(':')
if time_parts:
hour = int(time_parts[0])
if is_pm and hour != 12:
hour += 12
elif not is_pm and hour == 12:
hour = 0
if len(time_parts) > 1:
minute = int(time_parts[1])
dt = datetime.strptime(f"{g['date']} {hour:02d}:{minute:02d}", '%Y-%m-%d %H:%M')
# CloudKit expects TIMESTAMP type with milliseconds since epoch
fields['dateTime'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
except Exception as e:
if args.verbose:
print(f" Warning: Failed to parse date/time for {g['id']}: {e}")
# Team references - use (sport, abbrev) tuple for lookup
home_team_key = f"{sport}_{g.get('home_team_abbrev', '')}"
away_team_key = f"{sport}_{g.get('away_team_abbrev', '')}"
home_team_uuid = deterministic_uuid(home_team_key)
away_team_uuid = deterministic_uuid(away_team_key)
fields['homeTeamRef'] = {'value': {'recordName': home_team_uuid, 'action': 'NONE'}}
fields['awayTeamRef'] = {'value': {'recordName': away_team_uuid, 'action': 'NONE'}}
# Stadium reference - look up by home team abbrev
stadium_uuid = team_stadium_map.get((sport, g.get('home_team_abbrev', '')))
if stadium_uuid:
fields['stadiumRef'] = {'value': {'recordName': stadium_uuid, 'action': 'NONE'}}
recs.append({'recordType': 'Game', 'recordName': game_uuid, 'fields': fields})
stats['games'] = import_data(ck, recs, 'games', args.dry_run, args.verbose)
# Import league structure
if import_league_structure and league_structure:
print("--- League Structure ---")
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
recs = [{
'recordType': 'LeagueStructure',
'recordName': ls['id'], # Use the id as recordName
'fields': {
'structureId': {'value': ls['id']},
'sport': {'value': ls['sport']},
'type': {'value': ls['type']},
'name': {'value': ls['name']},
'displayOrder': {'value': ls['display_order']},
'schemaVersion': {'value': 1},
'lastModified': {'value': now_ms, 'type': 'TIMESTAMP'},
**({'abbreviation': {'value': ls['abbreviation']}} if ls.get('abbreviation') else {}),
**({'parentId': {'value': ls['parent_id']}} if ls.get('parent_id') else {}),
}
} for ls in league_structure]
stats['league_structures'] = import_data(ck, recs, 'league structures', args.dry_run, args.verbose)
# Import team aliases
if import_team_aliases and team_aliases:
print("--- Team Aliases ---")
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
recs = []
for ta in team_aliases:
fields = {
'aliasId': {'value': ta['id']},
'teamCanonicalId': {'value': ta['team_canonical_id']},
'aliasType': {'value': ta['alias_type']},
'aliasValue': {'value': ta['alias_value']},
'schemaVersion': {'value': 1},
'lastModified': {'value': now_ms, 'type': 'TIMESTAMP'},
}
# Add optional date fields
if ta.get('valid_from'):
try:
dt = datetime.strptime(ta['valid_from'], '%Y-%m-%d')
fields['validFrom'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
except:
pass
if ta.get('valid_until'):
try:
dt = datetime.strptime(ta['valid_until'], '%Y-%m-%d')
fields['validUntil'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
except:
pass
recs.append({
'recordType': 'TeamAlias',
'recordName': ta['id'], # Use the id as recordName
'fields': fields
})
stats['team_aliases'] = import_data(ck, recs, 'team aliases', args.dry_run, args.verbose)
# Import stadium aliases
if import_stadium_aliases and stadium_aliases:
print("--- Stadium Aliases ---")
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
recs = []
for sa in stadium_aliases:
fields = {
'aliasName': {'value': sa['alias_name'].lower()}, # Normalize to lowercase
'stadiumCanonicalId': {'value': sa['stadium_canonical_id']},
'schemaVersion': {'value': 1},
'lastModified': {'value': now_ms, 'type': 'TIMESTAMP'},
}
# Add optional date fields
if sa.get('valid_from'):
try:
dt = datetime.strptime(sa['valid_from'], '%Y-%m-%d')
fields['validFrom'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
except:
pass
if sa.get('valid_until'):
try:
dt = datetime.strptime(sa['valid_until'], '%Y-%m-%d')
fields['validUntil'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
except:
pass
recs.append({
'recordType': 'StadiumAlias',
'recordName': sa['alias_name'].lower(), # Use alias_name as recordName (unique key)
'fields': fields
})
stats['stadium_aliases'] = import_data(ck, recs, 'stadium aliases', args.dry_run, args.verbose)
print(f"\n{'='*50}")
print(f"COMPLETE: {stats['stadiums']} stadiums, {stats['teams']} teams, {stats['games']} games, {stats['league_structures']} league structures, {stats['team_aliases']} team aliases, {stats['stadium_aliases']} stadium aliases")
if args.dry_run:
print("[DRY RUN - nothing imported]")
print()
if __name__ == '__main__':
main()