#!/usr/bin/env python3 """ CloudKit Import Script ====================== Imports JSON data into CloudKit. Run separately from pipeline. Setup: 1. CloudKit Dashboard > Tokens & Keys > Server-to-Server Keys 2. Create key with Read/Write access to public database 3. Download .p8 file and note Key ID Usage: python cloudkit_import.py --dry-run # Preview first python cloudkit_import.py --key-id XX --key-file key.p8 # Import all python cloudkit_import.py --stadiums-only ... # Stadiums first python cloudkit_import.py --games-only ... # Games after python cloudkit_import.py --delete-all ... # Delete then import python cloudkit_import.py --delete-only ... # Delete only (no import) """ import argparse, json, time, os, sys, hashlib, base64, requests from datetime import datetime, timezone from pathlib import Path try: from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.backends import default_backend HAS_CRYPTO = True except ImportError: HAS_CRYPTO = False CONTAINER = "iCloud.com.sportstime.app" HOST = "https://api.apple-cloudkit.com" BATCH_SIZE = 200 class CloudKit: def __init__(self, key_id, private_key, container, env): self.key_id = key_id self.private_key = private_key self.path_base = f"/database/1/{container}/{env}/public" def _sign(self, date, body, path): key = serialization.load_pem_private_key(self.private_key, None, default_backend()) body_hash = base64.b64encode(hashlib.sha256(body.encode()).digest()).decode() sig = key.sign(f"{date}:{body_hash}:{path}".encode(), ec.ECDSA(hashes.SHA256())) return base64.b64encode(sig).decode() def modify(self, operations): path = f"{self.path_base}/records/modify" body = json.dumps({'operations': operations}) date = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') headers = { 'Content-Type': 'application/json', 'X-Apple-CloudKit-Request-KeyID': self.key_id, 'X-Apple-CloudKit-Request-ISO8601Date': date, 'X-Apple-CloudKit-Request-SignatureV1': self._sign(date, body, path), } r = requests.post(f"{HOST}{path}", headers=headers, data=body, timeout=60) if r.status_code == 200: return r.json() else: try: err = r.json() reason = err.get('reason', 'Unknown') code = err.get('serverErrorCode', r.status_code) return {'error': f"{code}: {reason}"} except: return {'error': f"{r.status_code}: {r.text[:200]}"} def query(self, record_type, limit=200): """Query records of a given type.""" path = f"{self.path_base}/records/query" body = json.dumps({ 'query': {'recordType': record_type}, 'resultsLimit': limit }) date = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') headers = { 'Content-Type': 'application/json', 'X-Apple-CloudKit-Request-KeyID': self.key_id, 'X-Apple-CloudKit-Request-ISO8601Date': date, 'X-Apple-CloudKit-Request-SignatureV1': self._sign(date, body, path), } r = requests.post(f"{HOST}{path}", headers=headers, data=body, timeout=60) if r.status_code == 200: return r.json() return {'error': f"{r.status_code}: {r.text[:200]}"} def delete_all(self, record_type, verbose=False): """Delete all records of a given type.""" total_deleted = 0 while True: result = self.query(record_type) if 'error' in result: print(f" Query error: {result['error']}") break records = result.get('records', []) if not records: break # Build delete operations ops = [{ 'operationType': 'delete', 'record': {'recordName': r['recordName'], 'recordType': record_type} } for r in records] delete_result = self.modify(ops) if 'error' in delete_result: print(f" Delete error: {delete_result['error']}") break deleted = len(delete_result.get('records', [])) total_deleted += deleted if verbose: print(f" Deleted {deleted} {record_type} records...") time.sleep(0.5) return total_deleted def import_data(ck, records, name, dry_run, verbose): total = 0 errors = 0 for i in range(0, len(records), BATCH_SIZE): batch = records[i:i+BATCH_SIZE] ops = [{'operationType': 'forceReplace', 'record': r} for r in batch] if verbose: print(f" Batch {i//BATCH_SIZE + 1}: {len(batch)} records, {len(ops)} ops") if not ops: print(f" Warning: Empty batch at index {i}, skipping") continue if dry_run: print(f" [DRY RUN] Would create {len(batch)} {name}") total += len(batch) else: result = ck.modify(ops) if 'error' in result: errors += 1 if errors <= 3: # Only show first 3 errors print(f" Error: {result['error']}") if verbose and batch: print(f" Sample record: {json.dumps(batch[0], indent=2)[:500]}") if errors == 3: print(" (suppressing further errors...)") else: result_records = result.get('records', []) # Count only successful records (no serverErrorCode) successful = [r for r in result_records if 'serverErrorCode' not in r] failed = [r for r in result_records if 'serverErrorCode' in r] n = len(successful) total += n print(f" Created {n} {name}") if failed: print(f" Failed {len(failed)} records: {failed[0].get('serverErrorCode')}: {failed[0].get('reason')}") if verbose: print(f" Response: {json.dumps(result, indent=2)[:1000]}") time.sleep(0.5) if errors > 0: print(f" Total errors: {errors}") return total def main(): p = argparse.ArgumentParser(description='Import JSON to CloudKit') p.add_argument('--key-id', default=os.environ.get('CLOUDKIT_KEY_ID')) p.add_argument('--key-file', default=os.environ.get('CLOUDKIT_KEY_FILE')) p.add_argument('--container', default=CONTAINER) p.add_argument('--env', choices=['development', 'production'], default='development') p.add_argument('--data-dir', default='./data') p.add_argument('--stadiums-only', action='store_true') p.add_argument('--games-only', action='store_true') p.add_argument('--delete-all', action='store_true', help='Delete all records before importing') p.add_argument('--delete-only', action='store_true', help='Only delete records, do not import') p.add_argument('--dry-run', action='store_true') p.add_argument('--verbose', '-v', action='store_true') args = p.parse_args() print(f"\n{'='*50}") print(f"CloudKit Import {'(DRY RUN)' if args.dry_run else ''}") print(f"{'='*50}") print(f"Container: {args.container}") print(f"Environment: {args.env}\n") data_dir = Path(args.data_dir) stadiums = json.load(open(data_dir / 'stadiums.json')) games = json.load(open(data_dir / 'games.json')) if (data_dir / 'games.json').exists() else [] print(f"Loaded {len(stadiums)} stadiums, {len(games)} games\n") ck = None if not args.dry_run: if not HAS_CRYPTO: sys.exit("Error: pip install cryptography") if not args.key_id or not args.key_file: sys.exit("Error: --key-id and --key-file required (or use --dry-run)") ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env) # Handle deletion if args.delete_all or args.delete_only: if not ck: sys.exit("Error: --key-id and --key-file required for deletion") print("--- Deleting Existing Records ---") # Delete in order: Games first (has references), then Teams, then Stadiums for record_type in ['Game', 'Team', 'Stadium']: print(f" Deleting {record_type} records...") deleted = ck.delete_all(record_type, verbose=args.verbose) print(f" Deleted {deleted} {record_type} records") if args.delete_only: print(f"\n{'='*50}") print("DELETE COMPLETE") print() return stats = {'stadiums': 0, 'teams': 0, 'games': 0} team_map = {} # Import stadiums & teams if not args.games_only: print("--- Stadiums ---") recs = [{ 'recordType': 'Stadium', 'recordName': s['id'], 'fields': { 'stadiumId': {'value': s['id']}, 'name': {'value': s['name']}, 'city': {'value': s['city']}, 'state': {'value': s.get('state', '')}, 'sport': {'value': s['sport']}, 'source': {'value': s.get('source', '')}, 'teamAbbrevs': {'value': s.get('team_abbrevs', [])}, **({'location': {'value': {'latitude': s['latitude'], 'longitude': s['longitude']}}} if s.get('latitude') else {}), **({'capacity': {'value': s['capacity']}} if s.get('capacity') else {}), } } for s in stadiums] stats['stadiums'] = import_data(ck, recs, 'stadiums', args.dry_run, args.verbose) print("--- Teams ---") teams = {} for s in stadiums: for abbr in s.get('team_abbrevs', []): if abbr not in teams: teams[abbr] = {'city': s['city'], 'sport': s['sport']} team_map[abbr] = f"team_{abbr.lower()}" recs = [{ 'recordType': 'Team', 'recordName': f"team_{abbr.lower()}", 'fields': { 'teamId': {'value': f"team_{abbr.lower()}"}, 'abbreviation': {'value': abbr}, 'name': {'value': abbr}, 'city': {'value': info['city']}, 'sport': {'value': info['sport']}, } } for abbr, info in teams.items()] stats['teams'] = import_data(ck, recs, 'teams', args.dry_run, args.verbose) # Import games if not args.stadiums_only and games: if not team_map: for s in stadiums: for abbr in s.get('team_abbrevs', []): team_map[abbr] = f"team_{abbr.lower()}" print("--- Games ---") # Deduplicate games by ID seen_ids = set() unique_games = [] for g in games: if g['id'] not in seen_ids: seen_ids.add(g['id']) unique_games.append(g) if len(unique_games) < len(games): print(f" Removed {len(games) - len(unique_games)} duplicate games") recs = [] for g in unique_games: fields = { 'gameId': {'value': g['id']}, 'sport': {'value': g['sport']}, 'season': {'value': g.get('season', '')}, 'source': {'value': g.get('source', '')}, } if g.get('date'): try: dt = datetime.strptime(f"{g['date']} {g.get('time', '19:00')}", '%Y-%m-%d %H:%M') fields['dateTime'] = {'value': int(dt.timestamp() * 1000)} except: pass if g.get('home_team_abbrev') in team_map: fields['homeTeamRef'] = {'value': {'recordName': team_map[g['home_team_abbrev']], 'action': 'NONE'}} if g.get('away_team_abbrev') in team_map: fields['awayTeamRef'] = {'value': {'recordName': team_map[g['away_team_abbrev']], 'action': 'NONE'}} recs.append({'recordType': 'Game', 'recordName': g['id'], 'fields': fields}) stats['games'] = import_data(ck, recs, 'games', args.dry_run, args.verbose) print(f"\n{'='*50}") print(f"COMPLETE: {stats['stadiums']} stadiums, {stats['teams']} teams, {stats['games']} games") if args.dry_run: print("[DRY RUN - nothing imported]") print() if __name__ == '__main__': main()