""" CloudKit Web Services API client. Adapted from existing sportstime_parser.uploaders.cloudkit """ import base64 import hashlib import json import time from datetime import datetime, timedelta from pathlib import Path from typing import Optional import jwt import requests from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.backends import default_backend class CloudKitClient: """ Client for CloudKit Web Services API. """ BASE_URL = "https://api.apple-cloudkit.com" TOKEN_EXPIRY_SECONDS = 3600 # 1 hour def __init__( self, container_id: str, environment: str = 'development', key_id: str = '', private_key: str = '', private_key_path: str = '', ): self.container_id = container_id self.environment = environment self.key_id = key_id self._private_key_pem = private_key self.private_key_path = private_key_path self._private_key = None self._token = None self._token_expiry = 0 # Load private key if not self._private_key_pem and private_key_path: key_path = Path(private_key_path) if key_path.exists(): self._private_key_pem = key_path.read_text() if self._private_key_pem: self._private_key = serialization.load_pem_private_key( self._private_key_pem.encode(), password=None, backend=default_backend(), ) @property def is_configured(self) -> bool: """Check if the client has valid authentication credentials.""" return bool(self.key_id and self._private_key) def _get_api_path(self, operation: str) -> str: """Build the full API path for an operation.""" return f"/database/1/{self.container_id}/{self.environment}/public/{operation}" def _get_token(self) -> str: """Get a valid JWT token, generating a new one if needed.""" if not self.is_configured: raise ValueError("CloudKit credentials not configured") now = time.time() # Return cached token if still valid (with 5 min buffer) if self._token and (self._token_expiry - now) > 300: return self._token # Generate new token expiry = now + self.TOKEN_EXPIRY_SECONDS payload = { 'iss': self.key_id, 'iat': int(now), 'exp': int(expiry), 'sub': self.container_id, } self._token = jwt.encode( payload, self._private_key, algorithm='ES256', ) self._token_expiry = expiry return self._token def _sign_request(self, method: str, path: str, body: Optional[bytes] = None) -> dict: """Generate request headers with authentication. Args: method: HTTP method path: API path body: Request body bytes Returns: Dictionary of headers to include in the request """ token = self._get_token() # CloudKit uses date in ISO format date_str = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") # Calculate body hash if body: body_hash = base64.b64encode( hashlib.sha256(body).digest() ).decode() else: body_hash = base64.b64encode( hashlib.sha256(b"").digest() ).decode() # Build the message to sign # Format: date:body_hash:path message = f"{date_str}:{body_hash}:{path}" # Sign the message signature = self._private_key.sign( message.encode(), ec.ECDSA(hashes.SHA256()), ) signature_b64 = base64.b64encode(signature).decode() return { 'Authorization': f'Bearer {token}', 'X-Apple-CloudKit-Request-KeyID': self.key_id, 'X-Apple-CloudKit-Request-ISO8601Date': date_str, 'X-Apple-CloudKit-Request-SignatureV1': signature_b64, 'Content-Type': 'application/json', } def _request(self, method: str, operation: str, body: Optional[dict] = None) -> dict: """Make a request to the CloudKit API.""" path = self._get_api_path(operation) url = f"{self.BASE_URL}{path}" body_bytes = json.dumps(body).encode() if body else None headers = self._sign_request(method, path, body_bytes) response = requests.request( method=method, url=url, headers=headers, data=body_bytes, ) if response.status_code == 200: return response.json() else: response.raise_for_status() def _get_url(self, path: str) -> str: """Build full API URL.""" return f"{self.BASE_URL}/database/1/{self.container_id}/{self.environment}/public{path}" def fetch_records( self, record_type: str, filter_by: Optional[dict] = None, sort_by: Optional[str] = None, limit: int = 200, ) -> list: """ Fetch records from CloudKit. """ query = { 'recordType': record_type, } if filter_by: query['filterBy'] = filter_by if sort_by: query['sortBy'] = [{'fieldName': sort_by}] payload = { 'query': query, 'resultsLimit': limit, } data = self._request('POST', 'records/query', payload) return data.get('records', []) def save_records(self, records: list) -> dict: """ Save records to CloudKit. """ operations = [] for record in records: op = { 'operationType': 'forceReplace', 'record': record, } operations.append(op) payload = { 'operations': operations, } return self._request('POST', 'records/modify', payload) def delete_records(self, record_names: list, record_type: str) -> dict: """ Delete records from CloudKit. """ operations = [] for name in record_names: op = { 'operationType': 'delete', 'record': { 'recordName': name, 'recordType': record_type, }, } operations.append(op) payload = { 'operations': operations, } return self._request('POST', 'records/modify', payload) def to_cloudkit_record(self, record_type: str, data: dict) -> dict: """ Convert local data dict to CloudKit record format. Field names must match existing CloudKit schema. """ record = { 'recordType': record_type, 'recordName': data['id'], 'fields': {}, } if record_type == 'Sport': fields = record['fields'] fields['sportId'] = {'value': data['id'], 'type': 'STRING'} fields['abbreviation'] = {'value': data['abbreviation'].upper(), 'type': 'STRING'} fields['displayName'] = {'value': data['displayName'], 'type': 'STRING'} fields['iconName'] = {'value': data.get('iconName', ''), 'type': 'STRING'} fields['colorHex'] = {'value': data.get('colorHex', ''), 'type': 'STRING'} fields['seasonStartMonth'] = {'value': data.get('seasonStartMonth', 1), 'type': 'INT64'} fields['seasonEndMonth'] = {'value': data.get('seasonEndMonth', 12), 'type': 'INT64'} fields['isActive'] = {'value': 1 if data.get('isActive') else 0, 'type': 'INT64'} elif record_type == 'Game': # Match existing CloudKit Game schema fields = record['fields'] fields['gameId'] = {'value': data['id'], 'type': 'STRING'} fields['canonicalId'] = {'value': data['id'], 'type': 'STRING'} fields['sport'] = {'value': data['sport'].upper(), 'type': 'STRING'} fields['season'] = {'value': str(data['season']), 'type': 'STRING'} fields['homeTeamCanonicalId'] = {'value': data['homeTeamId'], 'type': 'STRING'} fields['awayTeamCanonicalId'] = {'value': data['awayTeamId'], 'type': 'STRING'} if data.get('stadiumId'): fields['stadiumCanonicalId'] = {'value': data['stadiumId'], 'type': 'STRING'} if data.get('gameDate'): dt = datetime.fromisoformat(data['gameDate'].replace('Z', '+00:00')) fields['dateTime'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'} fields['isPlayoff'] = {'value': 1 if data.get('isPlayoff') else 0, 'type': 'INT64'} elif record_type == 'Team': # Match existing CloudKit Team schema fields = record['fields'] fields['teamId'] = {'value': data['id'], 'type': 'STRING'} fields['canonicalId'] = {'value': data['id'], 'type': 'STRING'} fields['sport'] = {'value': data['sport'].upper(), 'type': 'STRING'} fields['city'] = {'value': data.get('city', ''), 'type': 'STRING'} fields['name'] = {'value': data.get('name', ''), 'type': 'STRING'} fields['abbreviation'] = {'value': data.get('abbreviation', ''), 'type': 'STRING'} if data.get('homeStadiumId'): fields['stadiumCanonicalId'] = {'value': data['homeStadiumId'], 'type': 'STRING'} if data.get('primaryColor'): fields['primaryColor'] = {'value': data['primaryColor'], 'type': 'STRING'} if data.get('secondaryColor'): fields['secondaryColor'] = {'value': data['secondaryColor'], 'type': 'STRING'} if data.get('logoUrl'): fields['logoUrl'] = {'value': data['logoUrl'], 'type': 'STRING'} if data.get('divisionId'): fields['divisionCanonicalId'] = {'value': data['divisionId'], 'type': 'STRING'} if data.get('conferenceId'): fields['conferenceCanonicalId'] = {'value': data['conferenceId'], 'type': 'STRING'} elif record_type == 'Stadium': # Match existing CloudKit Stadium schema fields = record['fields'] fields['stadiumId'] = {'value': data['id'], 'type': 'STRING'} fields['canonicalId'] = {'value': data['id'], 'type': 'STRING'} fields['sport'] = {'value': data['sport'].upper(), 'type': 'STRING'} fields['name'] = {'value': data.get('name', ''), 'type': 'STRING'} fields['city'] = {'value': data.get('city', ''), 'type': 'STRING'} if data.get('state'): fields['state'] = {'value': data['state'], 'type': 'STRING'} # Use LOCATION type for coordinates if data.get('latitude') is not None and data.get('longitude') is not None: fields['location'] = { 'value': { 'latitude': float(data['latitude']), 'longitude': float(data['longitude']), }, 'type': 'LOCATION' } if data.get('capacity'): fields['capacity'] = {'value': data['capacity'], 'type': 'INT64'} if data.get('yearOpened'): fields['yearOpened'] = {'value': data['yearOpened'], 'type': 'INT64'} if data.get('imageUrl'): fields['imageURL'] = {'value': data['imageUrl'], 'type': 'STRING'} if data.get('timezone'): fields['timezoneIdentifier'] = {'value': data['timezone'], 'type': 'STRING'} elif record_type == 'Conference': fields = record['fields'] fields['conferenceId'] = {'value': data['id'], 'type': 'STRING'} fields['canonicalId'] = {'value': data['id'], 'type': 'STRING'} fields['sport'] = {'value': data['sport'].upper(), 'type': 'STRING'} fields['name'] = {'value': data.get('name', ''), 'type': 'STRING'} fields['shortName'] = {'value': data.get('shortName', ''), 'type': 'STRING'} fields['order'] = {'value': data.get('order', 0), 'type': 'INT64'} elif record_type == 'Division': fields = record['fields'] fields['divisionId'] = {'value': data['id'], 'type': 'STRING'} fields['canonicalId'] = {'value': data['id'], 'type': 'STRING'} fields['conferenceCanonicalId'] = {'value': data['conferenceId'], 'type': 'STRING'} fields['sport'] = {'value': data['sport'].upper(), 'type': 'STRING'} fields['name'] = {'value': data.get('name', ''), 'type': 'STRING'} fields['shortName'] = {'value': data.get('shortName', ''), 'type': 'STRING'} fields['order'] = {'value': data.get('order', 0), 'type': 'INT64'} elif record_type == 'TeamAlias': fields = record['fields'] fields['aliasId'] = {'value': data['id'], 'type': 'STRING'} fields['teamCanonicalId'] = {'value': data['teamId'], 'type': 'STRING'} fields['aliasValue'] = {'value': data.get('alias', ''), 'type': 'STRING'} fields['aliasType'] = {'value': data.get('aliasType', ''), 'type': 'STRING'} if data.get('validFrom'): dt = datetime.fromisoformat(data['validFrom']) fields['validFrom'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'} if data.get('validUntil'): dt = datetime.fromisoformat(data['validUntil']) fields['validUntil'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'} elif record_type == 'StadiumAlias': fields = record['fields'] fields['stadiumCanonicalId'] = {'value': data['stadiumId'], 'type': 'STRING'} fields['aliasName'] = {'value': data.get('alias', ''), 'type': 'STRING'} if data.get('validFrom'): dt = datetime.fromisoformat(data['validFrom']) fields['validFrom'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'} if data.get('validUntil'): dt = datetime.fromisoformat(data['validUntil']) fields['validUntil'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'} elif record_type == 'LeagueStructure': fields = record['fields'] fields['structureId'] = {'value': data['id'], 'type': 'STRING'} fields['sport'] = {'value': data['sport'].upper(), 'type': 'STRING'} fields['type'] = {'value': data['type'], 'type': 'STRING'} fields['name'] = {'value': data.get('name', ''), 'type': 'STRING'} fields['abbreviation'] = {'value': data.get('abbreviation', ''), 'type': 'STRING'} fields['parentId'] = {'value': data.get('parentId', ''), 'type': 'STRING'} fields['displayOrder'] = {'value': data.get('displayOrder', 0), 'type': 'INT64'} return record def test_connection(self) -> bool: """ Test the CloudKit connection. """ try: # Try to fetch a small query self.fetch_records('Team', limit=1) return True except Exception: return False