Files
SportstimeAPI/cloudkit/client.py
Trey t 63acf7accb feat: add Django web app, CloudKit sync, dashboard, and game_datetime_utc export
Adds the full Django application layer on top of sportstime_parser:
- core: Sport, Team, Stadium, Game models with aliases and league structure
- scraper: orchestration engine, adapter, job management, Celery tasks
- cloudkit: CloudKit sync client, sync state tracking, sync jobs
- dashboard: staff dashboard for monitoring scrapers, sync, review queue
- notifications: email reports for scrape/sync results
- Docker setup for deployment (Dockerfile, docker-compose, entrypoint)

Game exports now use game_datetime_utc (ISO 8601 UTC) instead of
venue-local date+time strings, matching the canonical format used
by the iOS app.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 14:04:27 -06:00

386 lines
15 KiB
Python

"""
CloudKit Web Services API client.
Adapted from existing sportstime_parser.uploaders.cloudkit
"""
import base64
import hashlib
import json
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import jwt
import requests
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
class CloudKitClient:
"""
Client for CloudKit Web Services API.
"""
BASE_URL = "https://api.apple-cloudkit.com"
TOKEN_EXPIRY_SECONDS = 3600 # 1 hour
def __init__(
self,
container_id: str,
environment: str = 'development',
key_id: str = '',
private_key: str = '',
private_key_path: str = '',
):
self.container_id = container_id
self.environment = environment
self.key_id = key_id
self._private_key_pem = private_key
self.private_key_path = private_key_path
self._private_key = None
self._token = None
self._token_expiry = 0
# Load private key
if not self._private_key_pem and private_key_path:
key_path = Path(private_key_path)
if key_path.exists():
self._private_key_pem = key_path.read_text()
if self._private_key_pem:
self._private_key = serialization.load_pem_private_key(
self._private_key_pem.encode(),
password=None,
backend=default_backend(),
)
@property
def is_configured(self) -> bool:
"""Check if the client has valid authentication credentials."""
return bool(self.key_id and self._private_key)
def _get_api_path(self, operation: str) -> str:
"""Build the full API path for an operation."""
return f"/database/1/{self.container_id}/{self.environment}/public/{operation}"
def _get_token(self) -> str:
"""Get a valid JWT token, generating a new one if needed."""
if not self.is_configured:
raise ValueError("CloudKit credentials not configured")
now = time.time()
# Return cached token if still valid (with 5 min buffer)
if self._token and (self._token_expiry - now) > 300:
return self._token
# Generate new token
expiry = now + self.TOKEN_EXPIRY_SECONDS
payload = {
'iss': self.key_id,
'iat': int(now),
'exp': int(expiry),
'sub': self.container_id,
}
self._token = jwt.encode(
payload,
self._private_key,
algorithm='ES256',
)
self._token_expiry = expiry
return self._token
def _sign_request(self, method: str, path: str, body: Optional[bytes] = None) -> dict:
"""Generate request headers with authentication.
Args:
method: HTTP method
path: API path
body: Request body bytes
Returns:
Dictionary of headers to include in the request
"""
token = self._get_token()
# CloudKit uses date in ISO format
date_str = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
# Calculate body hash
if body:
body_hash = base64.b64encode(
hashlib.sha256(body).digest()
).decode()
else:
body_hash = base64.b64encode(
hashlib.sha256(b"").digest()
).decode()
# Build the message to sign
# Format: date:body_hash:path
message = f"{date_str}:{body_hash}:{path}"
# Sign the message
signature = self._private_key.sign(
message.encode(),
ec.ECDSA(hashes.SHA256()),
)
signature_b64 = base64.b64encode(signature).decode()
return {
'Authorization': f'Bearer {token}',
'X-Apple-CloudKit-Request-KeyID': self.key_id,
'X-Apple-CloudKit-Request-ISO8601Date': date_str,
'X-Apple-CloudKit-Request-SignatureV1': signature_b64,
'Content-Type': 'application/json',
}
def _request(self, method: str, operation: str, body: Optional[dict] = None) -> dict:
"""Make a request to the CloudKit API."""
path = self._get_api_path(operation)
url = f"{self.BASE_URL}{path}"
body_bytes = json.dumps(body).encode() if body else None
headers = self._sign_request(method, path, body_bytes)
response = requests.request(
method=method,
url=url,
headers=headers,
data=body_bytes,
)
if response.status_code == 200:
return response.json()
else:
response.raise_for_status()
def _get_url(self, path: str) -> str:
"""Build full API URL."""
return f"{self.BASE_URL}/database/1/{self.container_id}/{self.environment}/public{path}"
def fetch_records(
self,
record_type: str,
filter_by: Optional[dict] = None,
sort_by: Optional[str] = None,
limit: int = 200,
) -> list:
"""
Fetch records from CloudKit.
"""
query = {
'recordType': record_type,
}
if filter_by:
query['filterBy'] = filter_by
if sort_by:
query['sortBy'] = [{'fieldName': sort_by}]
payload = {
'query': query,
'resultsLimit': limit,
}
data = self._request('POST', 'records/query', payload)
return data.get('records', [])
def save_records(self, records: list) -> dict:
"""
Save records to CloudKit.
"""
operations = []
for record in records:
op = {
'operationType': 'forceReplace',
'record': record,
}
operations.append(op)
payload = {
'operations': operations,
}
return self._request('POST', 'records/modify', payload)
def delete_records(self, record_names: list, record_type: str) -> dict:
"""
Delete records from CloudKit.
"""
operations = []
for name in record_names:
op = {
'operationType': 'delete',
'record': {
'recordName': name,
'recordType': record_type,
},
}
operations.append(op)
payload = {
'operations': operations,
}
return self._request('POST', 'records/modify', payload)
def to_cloudkit_record(self, record_type: str, data: dict) -> dict:
"""
Convert local data dict to CloudKit record format.
Field names must match existing CloudKit schema.
"""
record = {
'recordType': record_type,
'recordName': data['id'],
'fields': {},
}
if record_type == 'Sport':
fields = record['fields']
fields['sportId'] = {'value': data['id'], 'type': 'STRING'}
fields['abbreviation'] = {'value': data['abbreviation'].upper(), 'type': 'STRING'}
fields['displayName'] = {'value': data['displayName'], 'type': 'STRING'}
fields['iconName'] = {'value': data.get('iconName', ''), 'type': 'STRING'}
fields['colorHex'] = {'value': data.get('colorHex', ''), 'type': 'STRING'}
fields['seasonStartMonth'] = {'value': data.get('seasonStartMonth', 1), 'type': 'INT64'}
fields['seasonEndMonth'] = {'value': data.get('seasonEndMonth', 12), 'type': 'INT64'}
fields['isActive'] = {'value': 1 if data.get('isActive') else 0, 'type': 'INT64'}
elif record_type == 'Game':
# Match existing CloudKit Game schema
fields = record['fields']
fields['gameId'] = {'value': data['id'], 'type': 'STRING'}
fields['canonicalId'] = {'value': data['id'], 'type': 'STRING'}
fields['sport'] = {'value': data['sport'].upper(), 'type': 'STRING'}
fields['season'] = {'value': str(data['season']), 'type': 'STRING'}
fields['homeTeamCanonicalId'] = {'value': data['homeTeamId'], 'type': 'STRING'}
fields['awayTeamCanonicalId'] = {'value': data['awayTeamId'], 'type': 'STRING'}
if data.get('stadiumId'):
fields['stadiumCanonicalId'] = {'value': data['stadiumId'], 'type': 'STRING'}
if data.get('gameDate'):
dt = datetime.fromisoformat(data['gameDate'].replace('Z', '+00:00'))
fields['dateTime'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
fields['isPlayoff'] = {'value': 1 if data.get('isPlayoff') else 0, 'type': 'INT64'}
elif record_type == 'Team':
# Match existing CloudKit Team schema
fields = record['fields']
fields['teamId'] = {'value': data['id'], 'type': 'STRING'}
fields['canonicalId'] = {'value': data['id'], 'type': 'STRING'}
fields['sport'] = {'value': data['sport'].upper(), 'type': 'STRING'}
fields['city'] = {'value': data.get('city', ''), 'type': 'STRING'}
fields['name'] = {'value': data.get('name', ''), 'type': 'STRING'}
fields['abbreviation'] = {'value': data.get('abbreviation', ''), 'type': 'STRING'}
if data.get('homeStadiumId'):
fields['stadiumCanonicalId'] = {'value': data['homeStadiumId'], 'type': 'STRING'}
if data.get('primaryColor'):
fields['primaryColor'] = {'value': data['primaryColor'], 'type': 'STRING'}
if data.get('secondaryColor'):
fields['secondaryColor'] = {'value': data['secondaryColor'], 'type': 'STRING'}
if data.get('logoUrl'):
fields['logoUrl'] = {'value': data['logoUrl'], 'type': 'STRING'}
if data.get('divisionId'):
fields['divisionCanonicalId'] = {'value': data['divisionId'], 'type': 'STRING'}
if data.get('conferenceId'):
fields['conferenceCanonicalId'] = {'value': data['conferenceId'], 'type': 'STRING'}
elif record_type == 'Stadium':
# Match existing CloudKit Stadium schema
fields = record['fields']
fields['stadiumId'] = {'value': data['id'], 'type': 'STRING'}
fields['canonicalId'] = {'value': data['id'], 'type': 'STRING'}
fields['sport'] = {'value': data['sport'].upper(), 'type': 'STRING'}
fields['name'] = {'value': data.get('name', ''), 'type': 'STRING'}
fields['city'] = {'value': data.get('city', ''), 'type': 'STRING'}
if data.get('state'):
fields['state'] = {'value': data['state'], 'type': 'STRING'}
# Use LOCATION type for coordinates
if data.get('latitude') is not None and data.get('longitude') is not None:
fields['location'] = {
'value': {
'latitude': float(data['latitude']),
'longitude': float(data['longitude']),
},
'type': 'LOCATION'
}
if data.get('capacity'):
fields['capacity'] = {'value': data['capacity'], 'type': 'INT64'}
if data.get('yearOpened'):
fields['yearOpened'] = {'value': data['yearOpened'], 'type': 'INT64'}
if data.get('imageUrl'):
fields['imageURL'] = {'value': data['imageUrl'], 'type': 'STRING'}
if data.get('timezone'):
fields['timezoneIdentifier'] = {'value': data['timezone'], 'type': 'STRING'}
elif record_type == 'Conference':
fields = record['fields']
fields['conferenceId'] = {'value': data['id'], 'type': 'STRING'}
fields['canonicalId'] = {'value': data['id'], 'type': 'STRING'}
fields['sport'] = {'value': data['sport'].upper(), 'type': 'STRING'}
fields['name'] = {'value': data.get('name', ''), 'type': 'STRING'}
fields['shortName'] = {'value': data.get('shortName', ''), 'type': 'STRING'}
fields['order'] = {'value': data.get('order', 0), 'type': 'INT64'}
elif record_type == 'Division':
fields = record['fields']
fields['divisionId'] = {'value': data['id'], 'type': 'STRING'}
fields['canonicalId'] = {'value': data['id'], 'type': 'STRING'}
fields['conferenceCanonicalId'] = {'value': data['conferenceId'], 'type': 'STRING'}
fields['sport'] = {'value': data['sport'].upper(), 'type': 'STRING'}
fields['name'] = {'value': data.get('name', ''), 'type': 'STRING'}
fields['shortName'] = {'value': data.get('shortName', ''), 'type': 'STRING'}
fields['order'] = {'value': data.get('order', 0), 'type': 'INT64'}
elif record_type == 'TeamAlias':
fields = record['fields']
fields['aliasId'] = {'value': data['id'], 'type': 'STRING'}
fields['teamCanonicalId'] = {'value': data['teamId'], 'type': 'STRING'}
fields['aliasValue'] = {'value': data.get('alias', ''), 'type': 'STRING'}
fields['aliasType'] = {'value': data.get('aliasType', ''), 'type': 'STRING'}
if data.get('validFrom'):
dt = datetime.fromisoformat(data['validFrom'])
fields['validFrom'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
if data.get('validUntil'):
dt = datetime.fromisoformat(data['validUntil'])
fields['validUntil'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
elif record_type == 'StadiumAlias':
fields = record['fields']
fields['stadiumCanonicalId'] = {'value': data['stadiumId'], 'type': 'STRING'}
fields['aliasName'] = {'value': data.get('alias', ''), 'type': 'STRING'}
if data.get('validFrom'):
dt = datetime.fromisoformat(data['validFrom'])
fields['validFrom'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
if data.get('validUntil'):
dt = datetime.fromisoformat(data['validUntil'])
fields['validUntil'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
elif record_type == 'LeagueStructure':
fields = record['fields']
fields['structureId'] = {'value': data['id'], 'type': 'STRING'}
fields['sport'] = {'value': data['sport'].upper(), 'type': 'STRING'}
fields['type'] = {'value': data['type'], 'type': 'STRING'}
fields['name'] = {'value': data.get('name', ''), 'type': 'STRING'}
fields['abbreviation'] = {'value': data.get('abbreviation', ''), 'type': 'STRING'}
fields['parentId'] = {'value': data.get('parentId', ''), 'type': 'STRING'}
fields['displayOrder'] = {'value': data.get('displayOrder', 0), 'type': 'INT64'}
return record
def test_connection(self) -> bool:
"""
Test the CloudKit connection.
"""
try:
# Try to fetch a small query
self.fetch_records('Team', limit=1)
return True
except Exception:
return False