Files
SportstimeAPI/cloudkit/tasks.py
Trey t 63acf7accb feat: add Django web app, CloudKit sync, dashboard, and game_datetime_utc export
Adds the full Django application layer on top of sportstime_parser:
- core: Sport, Team, Stadium, Game models with aliases and league structure
- scraper: orchestration engine, adapter, job management, Celery tasks
- cloudkit: CloudKit sync client, sync state tracking, sync jobs
- dashboard: staff dashboard for monitoring scrapers, sync, review queue
- notifications: email reports for scrape/sync results
- Docker setup for deployment (Dockerfile, docker-compose, entrypoint)

Game exports now use game_datetime_utc (ISO 8601 UTC) instead of
venue-local date+time strings, matching the canonical format used
by the iOS app.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 14:04:27 -06:00

702 lines
26 KiB
Python

import logging
import traceback
from celery import shared_task
from django.utils import timezone
logger = logging.getLogger('cloudkit')
@shared_task(bind=True, max_retries=3)
def run_cloudkit_sync(self, config_id: int, triggered_by: str = 'manual',
sport_code: str = None, record_type: str = None):
"""
Run a CloudKit sync job.
"""
from cloudkit.models import CloudKitConfiguration, CloudKitSyncJob, CloudKitSyncState
from notifications.tasks import send_sync_notification
# Get configuration
try:
config = CloudKitConfiguration.objects.get(id=config_id)
except CloudKitConfiguration.DoesNotExist:
logger.error(f"CloudKitConfiguration {config_id} not found")
return {'error': 'Configuration not found'}
# Create job record
job = CloudKitSyncJob.objects.create(
configuration=config,
status='running',
triggered_by=triggered_by,
started_at=timezone.now(),
celery_task_id=self.request.id,
sport_filter_id=sport_code,
record_type_filter=record_type or '',
)
try:
logger.info(f'Starting CloudKit sync to {config.environment}')
# Run sync
result = perform_sync(config, job, sport_code, record_type)
# Update job with results
job.finished_at = timezone.now()
job.records_synced = result.get('synced', 0)
job.records_created = result.get('created', 0)
job.records_updated = result.get('updated', 0)
job.records_deleted = result.get('deleted', 0)
job.records_failed = result.get('failed', 0)
# Set status based on results
if job.records_failed > 0 and job.records_synced == 0:
job.status = 'failed'
job.error_message = f'All {job.records_failed} records failed to sync'
logger.error(f'Sync failed: {job.records_failed} failed, 0 synced')
elif job.records_failed > 0:
job.status = 'completed_with_errors'
logger.warning(f'Sync completed with errors: {job.records_synced} synced, {job.records_failed} failed')
else:
job.status = 'completed'
logger.info(f'Sync completed: {job.records_synced} synced')
job.save()
# Send notification if configured
send_sync_notification.delay(job.id)
return {
'job_id': job.id,
'status': 'completed',
'records_synced': job.records_synced,
}
except Exception as e:
error_msg = str(e)
error_tb = traceback.format_exc()
job.status = 'failed'
job.finished_at = timezone.now()
job.error_message = error_msg
job.save()
logger.error(f'Sync failed: {error_msg}')
# Send failure notification
send_sync_notification.delay(job.id)
# Retry if applicable
if self.request.retries < self.max_retries:
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
return {
'job_id': job.id,
'status': 'failed',
'error': error_msg,
}
def perform_sync(config, job, sport_code=None, record_type=None):
"""
Perform the actual CloudKit sync.
Syncs ALL local records to CloudKit (creates new, updates existing).
"""
from cloudkit.client import CloudKitClient
from cloudkit.models import CloudKitSyncState
from core.models import Sport, Conference, Division, Game, Team, Stadium, TeamAlias, StadiumAlias
# Initialize CloudKit client from config
client = config.get_client()
# Test connection first
try:
client._get_token()
except Exception as e:
logger.error(f'CloudKit authentication failed: {e}')
raise ValueError(f'CloudKit authentication failed: {e}')
results = {
'synced': 0,
'created': 0,
'updated': 0,
'deleted': 0,
'failed': 0,
}
batch_size = config.batch_size
# Sync Sports first (no dependencies)
if not record_type or record_type == 'Sport':
sports = Sport.objects.filter(is_active=True)
job.sports_total = sports.count()
job.current_record_type = 'Sport'
job.save(update_fields=['sports_total', 'current_record_type'])
sport_results = sync_model_records(client, 'Sport', sports, sport_to_dict, batch_size, job)
results['synced'] += sport_results['synced']
results['failed'] += sport_results['failed']
# Sync Conferences (FK to Sport)
if not record_type or record_type == 'Conference':
conferences = Conference.objects.select_related('sport').all()
job.conferences_total = conferences.count()
job.current_record_type = 'Conference'
job.save(update_fields=['conferences_total', 'current_record_type'])
conf_results = sync_model_records(client, 'Conference', conferences, conference_to_dict, batch_size, job)
results['synced'] += conf_results['synced']
results['failed'] += conf_results['failed']
# Sync Divisions (FK to Conference)
if not record_type or record_type == 'Division':
divisions = Division.objects.select_related('conference', 'conference__sport').all()
job.divisions_total = divisions.count()
job.current_record_type = 'Division'
job.save(update_fields=['divisions_total', 'current_record_type'])
div_results = sync_model_records(client, 'Division', divisions, division_to_dict, batch_size, job)
results['synced'] += div_results['synced']
results['failed'] += div_results['failed']
# Sync Teams (dependencies for Games, TeamAliases)
if not record_type or record_type == 'Team':
teams = Team.objects.select_related('sport', 'home_stadium', 'division', 'division__conference').all()
job.teams_total = teams.count()
job.current_record_type = 'Team'
job.save(update_fields=['teams_total', 'current_record_type'])
team_results = sync_model_records(client, 'Team', teams, team_to_dict, batch_size, job)
results['synced'] += team_results['synced']
results['failed'] += team_results['failed']
# Sync Stadiums (dependencies for Games, StadiumAliases)
if not record_type or record_type == 'Stadium':
stadiums = Stadium.objects.select_related('sport').all()
job.stadiums_total = stadiums.count()
job.current_record_type = 'Stadium'
job.save(update_fields=['stadiums_total', 'current_record_type'])
stadium_results = sync_model_records(client, 'Stadium', stadiums, stadium_to_dict, batch_size, job)
results['synced'] += stadium_results['synced']
results['failed'] += stadium_results['failed']
# Sync TeamAliases (FK to Team)
if not record_type or record_type == 'TeamAlias':
team_aliases = TeamAlias.objects.select_related('team').all()
job.team_aliases_total = team_aliases.count()
job.current_record_type = 'TeamAlias'
job.save(update_fields=['team_aliases_total', 'current_record_type'])
ta_results = sync_model_records(client, 'TeamAlias', team_aliases, team_alias_to_dict, batch_size, job)
results['synced'] += ta_results['synced']
results['failed'] += ta_results['failed']
# Sync StadiumAliases (FK to Stadium)
if not record_type or record_type == 'StadiumAlias':
stadium_aliases = StadiumAlias.objects.select_related('stadium').all()
job.stadium_aliases_total = stadium_aliases.count()
job.current_record_type = 'StadiumAlias'
job.save(update_fields=['stadium_aliases_total', 'current_record_type'])
sa_results = sync_model_records(client, 'StadiumAlias', stadium_aliases, stadium_alias_to_dict, batch_size, job)
results['synced'] += sa_results['synced']
results['failed'] += sa_results['failed']
# Sync LeagueStructure (flattened hierarchy: league + conference + division)
if not record_type or record_type == 'LeagueStructure':
ls_records = build_league_structure_records()
job.current_record_type = 'LeagueStructure'
job.save(update_fields=['current_record_type'])
ls_results = sync_dict_records(client, 'LeagueStructure', ls_records, batch_size, job)
results['synced'] += ls_results['synced']
results['failed'] += ls_results['failed']
# Sync Games (depends on Teams, Stadiums)
if not record_type or record_type == 'Game':
games = Game.objects.select_related('home_team', 'away_team', 'stadium', 'sport').all()
job.games_total = games.count()
job.current_record_type = 'Game'
job.save(update_fields=['games_total', 'current_record_type'])
game_results = sync_model_records(client, 'Game', games, game_to_dict, batch_size, job)
results['synced'] += game_results['synced']
results['failed'] += game_results['failed']
job.current_record_type = ''
job.save(update_fields=['current_record_type'])
return results
def sync_model_records(client, record_type, queryset, to_dict_func, batch_size, job=None):
"""
Sync all records from a queryset to CloudKit.
Updates progress frequently for real-time UI feedback.
"""
results = {'synced': 0, 'failed': 0}
records = list(queryset)
total = len(records)
logger.info(f'[{record_type}] Starting sync: {total} total records')
# Field names for job updates
field_map = {
'Sport': ('sports_synced', 'sports_failed'),
'Conference': ('conferences_synced', 'conferences_failed'),
'Division': ('divisions_synced', 'divisions_failed'),
'Team': ('teams_synced', 'teams_failed'),
'Stadium': ('stadiums_synced', 'stadiums_failed'),
'TeamAlias': ('team_aliases_synced', 'team_aliases_failed'),
'StadiumAlias': ('stadium_aliases_synced', 'stadium_aliases_failed'),
'Game': ('games_synced', 'games_failed'),
}
synced_field, failed_field = field_map.get(record_type, (None, None))
# Use smaller batches for more frequent progress updates
# CloudKit API batch size vs progress update frequency
api_batch_size = min(batch_size, 50) # Max 50 per API call for frequent updates
progress_update_interval = 10 # Update DB every 10 records
records_since_last_update = 0
for i in range(0, total, api_batch_size):
batch = records[i:i + api_batch_size]
batch_num = (i // api_batch_size) + 1
total_batches = (total + api_batch_size - 1) // api_batch_size
# Convert to CloudKit format
cloudkit_records = []
for record in batch:
try:
data = to_dict_func(record)
ck_record = client.to_cloudkit_record(record_type, data)
cloudkit_records.append(ck_record)
except Exception as e:
logger.error(f'Failed to convert {record_type}:{record.id}: {e}')
results['failed'] += 1
records_since_last_update += 1
if cloudkit_records:
try:
response = client.save_records(cloudkit_records)
response_records = response.get('records', [])
batch_synced = 0
batch_failed = 0
for rec in response_records:
if 'serverErrorCode' in rec:
logger.error(f'CloudKit error for {rec.get("recordName")}: {rec.get("reason")}')
results['failed'] += 1
batch_failed += 1
else:
results['synced'] += 1
batch_synced += 1
records_since_last_update += 1
# Update progress frequently for real-time UI
if job and synced_field and records_since_last_update >= progress_update_interval:
setattr(job, synced_field, results['synced'])
setattr(job, failed_field, results['failed'])
job.save(update_fields=[synced_field, failed_field])
records_since_last_update = 0
# Always update after each batch completes
if job and synced_field:
setattr(job, synced_field, results['synced'])
setattr(job, failed_field, results['failed'])
job.save(update_fields=[synced_field, failed_field])
records_since_last_update = 0
# Log progress after each batch
remaining = total - (results['synced'] + results['failed'])
logger.info(
f'[{record_type}] Batch {batch_num}/{total_batches}: '
f'+{batch_synced} synced, +{batch_failed} failed | '
f'Progress: {results["synced"]}/{total} synced, {remaining} remaining'
)
except Exception as e:
logger.error(f'Batch save failed: {e}')
results['failed'] += len(cloudkit_records)
# Update job progress
if job and failed_field:
setattr(job, failed_field, results['failed'])
job.save(update_fields=[failed_field])
remaining = total - (results['synced'] + results['failed'])
logger.info(
f'[{record_type}] Batch {batch_num}/{total_batches} FAILED | '
f'Progress: {results["synced"]}/{total} synced, {remaining} remaining'
)
logger.info(f'[{record_type}] Complete: {results["synced"]} synced, {results["failed"]} failed')
return results
def build_league_structure_records():
"""Build flat LeagueStructure dicts from Sport, Conference, Division models."""
from core.models import Sport, Conference, Division
records = []
for sport in Sport.objects.filter(is_active=True).order_by('code'):
league_id = f'ls_{sport.code}_league'
records.append({
'id': league_id,
'structureId': league_id,
'sport': sport.code,
'type': 'league',
'name': sport.name,
'abbreviation': sport.short_name,
'parentId': '',
'displayOrder': 0,
})
for conf in Conference.objects.filter(sport=sport).order_by('order', 'name'):
raw_conf_id = conf.canonical_id or f'conf_{conf.id}'
conf_id = f'ls_{raw_conf_id}'
records.append({
'id': conf_id,
'structureId': conf_id,
'sport': sport.code,
'type': 'conference',
'name': conf.name,
'abbreviation': conf.short_name or '',
'parentId': league_id,
'displayOrder': conf.order,
})
for div in Division.objects.filter(conference=conf).order_by('order', 'name'):
raw_div_id = div.canonical_id or f'div_{div.id}'
div_id = f'ls_{raw_div_id}'
records.append({
'id': div_id,
'structureId': div_id,
'sport': sport.code,
'type': 'division',
'name': div.name,
'abbreviation': div.short_name or '',
'parentId': conf_id,
'displayOrder': div.order,
})
return records
def sync_dict_records(client, record_type, dict_records, batch_size, job=None):
"""Sync pre-built dict records to CloudKit (no model/queryset needed)."""
results = {'synced': 0, 'failed': 0}
total = len(dict_records)
logger.info(f'[{record_type}] Starting sync: {total} total records')
api_batch_size = min(batch_size, 50)
for i in range(0, total, api_batch_size):
batch = dict_records[i:i + api_batch_size]
batch_num = (i // api_batch_size) + 1
total_batches = (total + api_batch_size - 1) // api_batch_size
cloudkit_records = []
for data in batch:
try:
ck_record = client.to_cloudkit_record(record_type, data)
cloudkit_records.append(ck_record)
except Exception as e:
logger.error(f'Failed to convert {record_type}:{data.get("id")}: {e}')
results['failed'] += 1
if cloudkit_records:
try:
response = client.save_records(cloudkit_records)
batch_synced = 0
batch_failed = 0
for rec in response.get('records', []):
if 'serverErrorCode' in rec:
logger.error(f'CloudKit error for {rec.get("recordName")}: {rec.get("reason")}')
results['failed'] += 1
batch_failed += 1
else:
results['synced'] += 1
batch_synced += 1
remaining = total - (results['synced'] + results['failed'])
logger.info(
f'[{record_type}] Batch {batch_num}/{total_batches}: '
f'+{batch_synced} synced, +{batch_failed} failed | '
f'Progress: {results["synced"]}/{total} synced, {remaining} remaining'
)
except Exception as e:
logger.error(f'Batch save failed: {e}')
results['failed'] += len(cloudkit_records)
logger.info(f'[{record_type}] Complete: {results["synced"]} synced, {results["failed"]} failed')
return results
def sync_batch(client, states):
"""
Sync a batch of records to CloudKit.
"""
from core.models import Game, Team, Stadium
result = {'synced': 0, 'created': 0, 'updated': 0, 'failed': 0}
records_to_save = []
for state in states:
try:
# Get the local record
record_data = get_record_data(state.record_type, state.record_id)
if record_data:
records_to_save.append({
'state': state,
'data': record_data,
})
except Exception as e:
logger.error(f'Failed to get record {state.record_type}:{state.record_id}: {e}')
state.mark_failed(str(e))
result['failed'] += 1
if records_to_save:
# Convert to CloudKit format and upload
cloudkit_records = [
client.to_cloudkit_record(r['state'].record_type, r['data'])
for r in records_to_save
]
try:
response = client.save_records(cloudkit_records)
for i, r in enumerate(records_to_save):
if i < len(response.get('records', [])):
change_tag = response['records'][i].get('recordChangeTag', '')
r['state'].mark_synced(change_tag)
result['synced'] += 1
if r['state'].cloudkit_record_name:
result['updated'] += 1
else:
result['created'] += 1
else:
r['state'].mark_failed('No response for record')
result['failed'] += 1
except Exception as e:
logger.error(f'CloudKit save failed: {e}')
for r in records_to_save:
r['state'].mark_failed(str(e))
result['failed'] += len(records_to_save)
return result
def get_record_data(record_type, record_id):
"""
Get the local record data for a given type and ID.
"""
from core.models import Sport, Conference, Division, Game, Team, Stadium, TeamAlias, StadiumAlias
if record_type == 'Sport':
try:
sport = Sport.objects.get(code=record_id)
return sport_to_dict(sport)
except Sport.DoesNotExist:
return None
elif record_type == 'Conference':
try:
conf = Conference.objects.select_related('sport').get(id=record_id)
return conference_to_dict(conf)
except Conference.DoesNotExist:
return None
elif record_type == 'Division':
try:
div = Division.objects.select_related('conference', 'conference__sport').get(id=record_id)
return division_to_dict(div)
except Division.DoesNotExist:
return None
elif record_type == 'Game':
try:
game = Game.objects.select_related(
'home_team', 'away_team', 'stadium', 'sport'
).get(id=record_id)
return game_to_dict(game)
except Game.DoesNotExist:
return None
elif record_type == 'Team':
try:
team = Team.objects.select_related('sport', 'home_stadium').get(id=record_id)
return team_to_dict(team)
except Team.DoesNotExist:
return None
elif record_type == 'Stadium':
try:
stadium = Stadium.objects.select_related('sport').get(id=record_id)
return stadium_to_dict(stadium)
except Stadium.DoesNotExist:
return None
elif record_type == 'TeamAlias':
try:
alias = TeamAlias.objects.select_related('team').get(id=record_id)
return team_alias_to_dict(alias)
except TeamAlias.DoesNotExist:
return None
elif record_type == 'StadiumAlias':
try:
alias = StadiumAlias.objects.select_related('stadium').get(id=record_id)
return stadium_alias_to_dict(alias)
except StadiumAlias.DoesNotExist:
return None
return None
def sport_to_dict(sport):
"""Convert Sport model to dict for CloudKit."""
return {
'id': sport.code,
'abbreviation': sport.short_name,
'displayName': sport.name,
'iconName': sport.icon_name,
'colorHex': sport.color_hex,
'seasonStartMonth': sport.season_start_month,
'seasonEndMonth': sport.season_end_month,
'isActive': sport.is_active,
}
def game_to_dict(game):
"""Convert Game model to dict for CloudKit."""
return {
'id': game.id,
'sport': game.sport.code,
'season': game.season,
'homeTeamId': game.home_team_id,
'awayTeamId': game.away_team_id,
'stadiumId': game.stadium_id,
'gameDate': game.game_date.isoformat(),
'gameNumber': game.game_number,
'homeScore': game.home_score,
'awayScore': game.away_score,
'status': game.status,
'isNeutralSite': game.is_neutral_site,
'isPlayoff': game.is_playoff,
'playoffRound': game.playoff_round,
}
def team_to_dict(team):
"""Convert Team model to dict for CloudKit."""
division_id = None
conference_id = None
if team.division:
division_id = team.division.canonical_id or f'div_{team.division.id}'
conference_id = team.division.conference.canonical_id or f'conf_{team.division.conference.id}'
return {
'id': team.id,
'sport': team.sport.code,
'city': team.city,
'name': team.name,
'fullName': team.full_name,
'abbreviation': team.abbreviation,
'homeStadiumId': team.home_stadium_id,
'primaryColor': team.primary_color,
'secondaryColor': team.secondary_color,
'logoUrl': team.logo_url,
'divisionId': division_id,
'conferenceId': conference_id,
}
def stadium_to_dict(stadium):
"""Convert Stadium model to dict for CloudKit."""
return {
'id': stadium.id,
'sport': stadium.sport.code,
'name': stadium.name,
'city': stadium.city,
'state': stadium.state,
'country': stadium.country,
'latitude': float(stadium.latitude) if stadium.latitude else None,
'longitude': float(stadium.longitude) if stadium.longitude else None,
'capacity': stadium.capacity,
'yearOpened': stadium.opened_year,
'imageUrl': stadium.image_url,
'surface': stadium.surface,
'roofType': stadium.roof_type,
'timezone': stadium.timezone,
}
def conference_to_dict(conf):
"""Convert Conference model to dict for CloudKit."""
return {
'id': conf.canonical_id or f'conf_{conf.id}',
'sport': conf.sport.code,
'name': conf.name,
'shortName': conf.short_name,
'order': conf.order,
}
def division_to_dict(div):
"""Convert Division model to dict for CloudKit."""
return {
'id': div.canonical_id or f'div_{div.id}',
'conferenceId': div.conference.canonical_id or f'conf_{div.conference.id}',
'sport': div.conference.sport.code,
'name': div.name,
'shortName': div.short_name,
'order': div.order,
}
def team_alias_to_dict(alias):
"""Convert TeamAlias model to dict for CloudKit."""
return {
'id': f'team_alias_{alias.id}',
'teamId': alias.team.id,
'alias': alias.alias,
'aliasType': alias.alias_type,
'validFrom': alias.valid_from.isoformat() if alias.valid_from else None,
'validUntil': alias.valid_until.isoformat() if alias.valid_until else None,
'isPrimary': alias.is_primary,
}
def stadium_alias_to_dict(alias):
"""Convert StadiumAlias model to dict for CloudKit."""
return {
'id': f'stadium_alias_{alias.id}',
'stadiumId': alias.stadium.id,
'alias': alias.alias,
'aliasType': alias.alias_type,
'validFrom': alias.valid_from.isoformat() if alias.valid_from else None,
'validUntil': alias.valid_until.isoformat() if alias.valid_until else None,
'isPrimary': alias.is_primary,
}
@shared_task
def mark_records_for_sync(record_type: str, record_ids: list):
"""
Mark records as needing sync after local changes.
"""
from cloudkit.models import CloudKitSyncState
for record_id in record_ids:
state, created = CloudKitSyncState.objects.get_or_create(
record_type=record_type,
record_id=record_id,
)
state.mark_pending()
return {'marked': len(record_ids)}