Replace monolithic scraping scripts with sportstime_parser package: - Multi-source scrapers with automatic fallback for 7 sports - Canonical ID generation for games, teams, and stadiums - Fuzzy matching with configurable thresholds for name resolution - CloudKit Web Services uploader with JWT auth, diff-based updates - Resumable uploads with checkpoint state persistence - Validation reports with manual review items and suggested matches - Comprehensive test suite (249 tests) CLI: sportstime-parser scrape|validate|upload|status|retry|clear Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
566 lines
18 KiB
Python
566 lines
18 KiB
Python
"""CloudKit Web Services client for sportstime-parser.
|
|
|
|
This module provides a client for uploading data to CloudKit using the
|
|
CloudKit Web Services API. It handles JWT authentication, request signing,
|
|
and batch operations.
|
|
|
|
Reference: https://developer.apple.com/documentation/cloudkitwebservices
|
|
"""
|
|
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
from enum import Enum
|
|
|
|
import jwt
|
|
import requests
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ec
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
from ..config import (
|
|
CLOUDKIT_CONTAINER_ID,
|
|
CLOUDKIT_ENVIRONMENT,
|
|
CLOUDKIT_BATCH_SIZE,
|
|
)
|
|
from ..utils.logging import get_logger
|
|
|
|
|
|
class RecordType(str, Enum):
|
|
"""CloudKit record types for SportsTime."""
|
|
GAME = "Game"
|
|
TEAM = "Team"
|
|
STADIUM = "Stadium"
|
|
TEAM_ALIAS = "TeamAlias"
|
|
STADIUM_ALIAS = "StadiumAlias"
|
|
|
|
|
|
@dataclass
|
|
class CloudKitRecord:
|
|
"""Represents a CloudKit record for upload.
|
|
|
|
Attributes:
|
|
record_name: Unique record identifier (canonical ID)
|
|
record_type: CloudKit record type
|
|
fields: Dictionary of field name -> field value
|
|
record_change_tag: Version tag for conflict detection (None for new records)
|
|
"""
|
|
record_name: str
|
|
record_type: RecordType
|
|
fields: dict[str, Any]
|
|
record_change_tag: Optional[str] = None
|
|
|
|
def to_cloudkit_dict(self) -> dict:
|
|
"""Convert to CloudKit API format."""
|
|
record = {
|
|
"recordName": self.record_name,
|
|
"recordType": self.record_type.value,
|
|
"fields": self._format_fields(),
|
|
}
|
|
if self.record_change_tag:
|
|
record["recordChangeTag"] = self.record_change_tag
|
|
return record
|
|
|
|
def _format_fields(self) -> dict:
|
|
"""Format fields for CloudKit API."""
|
|
formatted = {}
|
|
for key, value in self.fields.items():
|
|
if value is None:
|
|
continue
|
|
formatted[key] = self._format_field_value(value)
|
|
return formatted
|
|
|
|
def _format_field_value(self, value: Any) -> dict:
|
|
"""Format a single field value for CloudKit API."""
|
|
if isinstance(value, str):
|
|
return {"value": value, "type": "STRING"}
|
|
elif isinstance(value, int):
|
|
return {"value": value, "type": "INT64"}
|
|
elif isinstance(value, float):
|
|
return {"value": value, "type": "DOUBLE"}
|
|
elif isinstance(value, bool):
|
|
return {"value": 1 if value else 0, "type": "INT64"}
|
|
elif isinstance(value, datetime):
|
|
# CloudKit expects milliseconds since epoch
|
|
timestamp_ms = int(value.timestamp() * 1000)
|
|
return {"value": timestamp_ms, "type": "TIMESTAMP"}
|
|
elif isinstance(value, list):
|
|
return {"value": value, "type": "STRING_LIST"}
|
|
elif isinstance(value, dict) and "latitude" in value and "longitude" in value:
|
|
return {
|
|
"value": {
|
|
"latitude": value["latitude"],
|
|
"longitude": value["longitude"],
|
|
},
|
|
"type": "LOCATION",
|
|
}
|
|
else:
|
|
# Default to string
|
|
return {"value": str(value), "type": "STRING"}
|
|
|
|
|
|
@dataclass
|
|
class OperationResult:
|
|
"""Result of a CloudKit operation."""
|
|
record_name: str
|
|
success: bool
|
|
record_change_tag: Optional[str] = None
|
|
error_code: Optional[str] = None
|
|
error_message: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class BatchResult:
|
|
"""Result of a batch CloudKit operation."""
|
|
successful: list[OperationResult] = field(default_factory=list)
|
|
failed: list[OperationResult] = field(default_factory=list)
|
|
|
|
@property
|
|
def all_succeeded(self) -> bool:
|
|
return len(self.failed) == 0
|
|
|
|
@property
|
|
def success_count(self) -> int:
|
|
return len(self.successful)
|
|
|
|
@property
|
|
def failure_count(self) -> int:
|
|
return len(self.failed)
|
|
|
|
|
|
class CloudKitClient:
|
|
"""Client for CloudKit Web Services API.
|
|
|
|
Handles authentication via server-to-server JWT tokens and provides
|
|
methods for CRUD operations on CloudKit records.
|
|
|
|
Authentication requires:
|
|
- Key ID: CloudKit key identifier from Apple Developer Portal
|
|
- Private Key: EC private key in PEM format
|
|
|
|
Environment variables:
|
|
- CLOUDKIT_KEY_ID: The key identifier
|
|
- CLOUDKIT_PRIVATE_KEY_PATH: Path to the private key file
|
|
- CLOUDKIT_PRIVATE_KEY: The private key contents (alternative to path)
|
|
"""
|
|
|
|
BASE_URL = "https://api.apple-cloudkit.com"
|
|
TOKEN_EXPIRY_SECONDS = 3600 # 1 hour
|
|
|
|
def __init__(
|
|
self,
|
|
container_id: str = CLOUDKIT_CONTAINER_ID,
|
|
environment: str = CLOUDKIT_ENVIRONMENT,
|
|
key_id: Optional[str] = None,
|
|
private_key: Optional[str] = None,
|
|
private_key_path: Optional[str] = None,
|
|
):
|
|
"""Initialize the CloudKit client.
|
|
|
|
Args:
|
|
container_id: CloudKit container identifier
|
|
environment: 'development' or 'production'
|
|
key_id: CloudKit server-to-server key ID
|
|
private_key: PEM-encoded EC private key contents
|
|
private_key_path: Path to PEM-encoded EC private key file
|
|
"""
|
|
self.container_id = container_id
|
|
self.environment = environment
|
|
self.logger = get_logger()
|
|
|
|
# Load authentication credentials
|
|
self.key_id = key_id or os.environ.get("CLOUDKIT_KEY_ID")
|
|
|
|
if private_key:
|
|
self._private_key_pem = private_key
|
|
elif private_key_path:
|
|
self._private_key_pem = Path(private_key_path).read_text()
|
|
elif os.environ.get("CLOUDKIT_PRIVATE_KEY"):
|
|
self._private_key_pem = os.environ["CLOUDKIT_PRIVATE_KEY"]
|
|
elif os.environ.get("CLOUDKIT_PRIVATE_KEY_PATH"):
|
|
self._private_key_pem = Path(os.environ["CLOUDKIT_PRIVATE_KEY_PATH"]).read_text()
|
|
else:
|
|
self._private_key_pem = None
|
|
|
|
# Parse the private key if available
|
|
self._private_key = None
|
|
if self._private_key_pem:
|
|
self._private_key = serialization.load_pem_private_key(
|
|
self._private_key_pem.encode(),
|
|
password=None,
|
|
backend=default_backend(),
|
|
)
|
|
|
|
# Token cache
|
|
self._token: Optional[str] = None
|
|
self._token_expiry: float = 0
|
|
|
|
# Session for connection pooling
|
|
self._session = requests.Session()
|
|
|
|
@property
|
|
def is_configured(self) -> bool:
|
|
"""Check if the client has valid authentication credentials."""
|
|
return bool(self.key_id and self._private_key)
|
|
|
|
def _get_api_path(self, operation: str) -> str:
|
|
"""Build the full API path for an operation."""
|
|
return f"/database/1/{self.container_id}/{self.environment}/public/{operation}"
|
|
|
|
def _get_token(self) -> str:
|
|
"""Get a valid JWT token, generating a new one if needed."""
|
|
if not self.is_configured:
|
|
raise ValueError(
|
|
"CloudKit client not configured. Set CLOUDKIT_KEY_ID and "
|
|
"CLOUDKIT_PRIVATE_KEY_PATH environment variables."
|
|
)
|
|
|
|
now = time.time()
|
|
|
|
# Return cached token if still valid (with 5 min buffer)
|
|
if self._token and (self._token_expiry - now) > 300:
|
|
return self._token
|
|
|
|
# Generate new token
|
|
expiry = now + self.TOKEN_EXPIRY_SECONDS
|
|
|
|
payload = {
|
|
"iss": self.key_id,
|
|
"iat": int(now),
|
|
"exp": int(expiry),
|
|
"sub": self.container_id,
|
|
}
|
|
|
|
self._token = jwt.encode(
|
|
payload,
|
|
self._private_key,
|
|
algorithm="ES256",
|
|
)
|
|
self._token_expiry = expiry
|
|
|
|
return self._token
|
|
|
|
def _sign_request(self, method: str, path: str, body: Optional[bytes] = None) -> dict:
|
|
"""Generate request headers with authentication.
|
|
|
|
Args:
|
|
method: HTTP method
|
|
path: API path
|
|
body: Request body bytes
|
|
|
|
Returns:
|
|
Dictionary of headers to include in the request
|
|
"""
|
|
token = self._get_token()
|
|
|
|
# CloudKit uses date in ISO format
|
|
date_str = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
# Calculate body hash
|
|
if body:
|
|
body_hash = base64.b64encode(
|
|
hashlib.sha256(body).digest()
|
|
).decode()
|
|
else:
|
|
body_hash = base64.b64encode(
|
|
hashlib.sha256(b"").digest()
|
|
).decode()
|
|
|
|
# Build the message to sign
|
|
# Format: date:body_hash:path
|
|
message = f"{date_str}:{body_hash}:{path}"
|
|
|
|
# Sign the message
|
|
signature = self._private_key.sign(
|
|
message.encode(),
|
|
ec.ECDSA(hashes.SHA256()),
|
|
)
|
|
signature_b64 = base64.b64encode(signature).decode()
|
|
|
|
return {
|
|
"Authorization": f"Bearer {token}",
|
|
"X-Apple-CloudKit-Request-KeyID": self.key_id,
|
|
"X-Apple-CloudKit-Request-ISO8601Date": date_str,
|
|
"X-Apple-CloudKit-Request-SignatureV1": signature_b64,
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
operation: str,
|
|
body: Optional[dict] = None,
|
|
) -> dict:
|
|
"""Make a request to the CloudKit API.
|
|
|
|
Args:
|
|
method: HTTP method
|
|
operation: API operation path
|
|
body: Request body as dictionary
|
|
|
|
Returns:
|
|
Response data as dictionary
|
|
|
|
Raises:
|
|
CloudKitError: If the request fails
|
|
"""
|
|
path = self._get_api_path(operation)
|
|
url = f"{self.BASE_URL}{path}"
|
|
|
|
body_bytes = json.dumps(body).encode() if body else None
|
|
headers = self._sign_request(method, path, body_bytes)
|
|
|
|
response = self._session.request(
|
|
method=method,
|
|
url=url,
|
|
headers=headers,
|
|
data=body_bytes,
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
elif response.status_code == 421:
|
|
# Authentication required - token may be expired
|
|
self._token = None
|
|
raise CloudKitAuthError("Authentication failed - check credentials")
|
|
elif response.status_code == 429:
|
|
raise CloudKitRateLimitError("Rate limit exceeded")
|
|
elif response.status_code >= 500:
|
|
raise CloudKitServerError(f"Server error: {response.status_code}")
|
|
else:
|
|
try:
|
|
error_data = response.json()
|
|
error_msg = error_data.get("serverErrorCode", str(response.status_code))
|
|
except (json.JSONDecodeError, KeyError):
|
|
error_msg = response.text
|
|
raise CloudKitError(f"Request failed: {error_msg}")
|
|
|
|
def fetch_records(
|
|
self,
|
|
record_type: RecordType,
|
|
record_names: Optional[list[str]] = None,
|
|
limit: int = 200,
|
|
) -> list[dict]:
|
|
"""Fetch records from CloudKit.
|
|
|
|
Args:
|
|
record_type: Type of records to fetch
|
|
record_names: Specific record names to fetch (optional)
|
|
limit: Maximum records to return (default 200)
|
|
|
|
Returns:
|
|
List of record dictionaries
|
|
"""
|
|
if record_names:
|
|
# Fetch specific records by name
|
|
body = {
|
|
"records": [{"recordName": name} for name in record_names],
|
|
}
|
|
response = self._request("POST", "records/lookup", body)
|
|
else:
|
|
# Query all records of type
|
|
body = {
|
|
"query": {
|
|
"recordType": record_type.value,
|
|
},
|
|
"resultsLimit": limit,
|
|
}
|
|
response = self._request("POST", "records/query", body)
|
|
|
|
records = response.get("records", [])
|
|
return [r for r in records if "recordName" in r]
|
|
|
|
def fetch_all_records(self, record_type: RecordType) -> list[dict]:
|
|
"""Fetch all records of a type using pagination.
|
|
|
|
Args:
|
|
record_type: Type of records to fetch
|
|
|
|
Returns:
|
|
List of all record dictionaries
|
|
"""
|
|
all_records = []
|
|
continuation_marker = None
|
|
|
|
while True:
|
|
body = {
|
|
"query": {
|
|
"recordType": record_type.value,
|
|
},
|
|
"resultsLimit": 200,
|
|
}
|
|
|
|
if continuation_marker:
|
|
body["continuationMarker"] = continuation_marker
|
|
|
|
response = self._request("POST", "records/query", body)
|
|
|
|
records = response.get("records", [])
|
|
all_records.extend([r for r in records if "recordName" in r])
|
|
|
|
continuation_marker = response.get("continuationMarker")
|
|
if not continuation_marker:
|
|
break
|
|
|
|
return all_records
|
|
|
|
def save_records(self, records: list[CloudKitRecord]) -> BatchResult:
|
|
"""Save records to CloudKit (create or update).
|
|
|
|
Args:
|
|
records: List of records to save
|
|
|
|
Returns:
|
|
BatchResult with success/failure details
|
|
"""
|
|
result = BatchResult()
|
|
|
|
# Process in batches
|
|
for i in range(0, len(records), CLOUDKIT_BATCH_SIZE):
|
|
batch = records[i:i + CLOUDKIT_BATCH_SIZE]
|
|
batch_result = self._save_batch(batch)
|
|
result.successful.extend(batch_result.successful)
|
|
result.failed.extend(batch_result.failed)
|
|
|
|
return result
|
|
|
|
def _save_batch(self, records: list[CloudKitRecord]) -> BatchResult:
|
|
"""Save a single batch of records.
|
|
|
|
Args:
|
|
records: List of records (max CLOUDKIT_BATCH_SIZE)
|
|
|
|
Returns:
|
|
BatchResult with success/failure details
|
|
"""
|
|
result = BatchResult()
|
|
|
|
operations = []
|
|
for record in records:
|
|
op = {
|
|
"operationType": "forceReplace",
|
|
"record": record.to_cloudkit_dict(),
|
|
}
|
|
operations.append(op)
|
|
|
|
body = {"operations": operations}
|
|
|
|
try:
|
|
response = self._request("POST", "records/modify", body)
|
|
except CloudKitError as e:
|
|
# Entire batch failed
|
|
for record in records:
|
|
result.failed.append(OperationResult(
|
|
record_name=record.record_name,
|
|
success=False,
|
|
error_message=str(e),
|
|
))
|
|
return result
|
|
|
|
# Process individual results
|
|
for record_data in response.get("records", []):
|
|
record_name = record_data.get("recordName", "unknown")
|
|
|
|
if "serverErrorCode" in record_data:
|
|
result.failed.append(OperationResult(
|
|
record_name=record_name,
|
|
success=False,
|
|
error_code=record_data.get("serverErrorCode"),
|
|
error_message=record_data.get("reason"),
|
|
))
|
|
else:
|
|
result.successful.append(OperationResult(
|
|
record_name=record_name,
|
|
success=True,
|
|
record_change_tag=record_data.get("recordChangeTag"),
|
|
))
|
|
|
|
return result
|
|
|
|
def delete_records(
|
|
self,
|
|
record_type: RecordType,
|
|
record_names: list[str],
|
|
) -> BatchResult:
|
|
"""Delete records from CloudKit.
|
|
|
|
Args:
|
|
record_type: Type of records to delete
|
|
record_names: List of record names to delete
|
|
|
|
Returns:
|
|
BatchResult with success/failure details
|
|
"""
|
|
result = BatchResult()
|
|
|
|
# Process in batches
|
|
for i in range(0, len(record_names), CLOUDKIT_BATCH_SIZE):
|
|
batch = record_names[i:i + CLOUDKIT_BATCH_SIZE]
|
|
|
|
operations = []
|
|
for name in batch:
|
|
operations.append({
|
|
"operationType": "delete",
|
|
"record": {
|
|
"recordName": name,
|
|
"recordType": record_type.value,
|
|
},
|
|
})
|
|
|
|
body = {"operations": operations}
|
|
|
|
try:
|
|
response = self._request("POST", "records/modify", body)
|
|
except CloudKitError as e:
|
|
for name in batch:
|
|
result.failed.append(OperationResult(
|
|
record_name=name,
|
|
success=False,
|
|
error_message=str(e),
|
|
))
|
|
continue
|
|
|
|
for record_data in response.get("records", []):
|
|
record_name = record_data.get("recordName", "unknown")
|
|
|
|
if "serverErrorCode" in record_data:
|
|
result.failed.append(OperationResult(
|
|
record_name=record_name,
|
|
success=False,
|
|
error_code=record_data.get("serverErrorCode"),
|
|
error_message=record_data.get("reason"),
|
|
))
|
|
else:
|
|
result.successful.append(OperationResult(
|
|
record_name=record_name,
|
|
success=True,
|
|
))
|
|
|
|
return result
|
|
|
|
|
|
class CloudKitError(Exception):
|
|
"""Base exception for CloudKit errors."""
|
|
pass
|
|
|
|
|
|
class CloudKitAuthError(CloudKitError):
|
|
"""Authentication error."""
|
|
pass
|
|
|
|
|
|
class CloudKitRateLimitError(CloudKitError):
|
|
"""Rate limit exceeded."""
|
|
pass
|
|
|
|
|
|
class CloudKitServerError(CloudKitError):
|
|
"""Server-side error."""
|
|
pass
|