import logging import traceback from celery import shared_task from django.utils import timezone logger = logging.getLogger('cloudkit') @shared_task(bind=True, max_retries=3) def run_cloudkit_sync(self, config_id: int, triggered_by: str = 'manual', sport_code: str = None, record_type: str = None): """ Run a CloudKit sync job. """ from cloudkit.models import CloudKitConfiguration, CloudKitSyncJob, CloudKitSyncState from notifications.tasks import send_sync_notification # Get configuration try: config = CloudKitConfiguration.objects.get(id=config_id) except CloudKitConfiguration.DoesNotExist: logger.error(f"CloudKitConfiguration {config_id} not found") return {'error': 'Configuration not found'} # Create job record job = CloudKitSyncJob.objects.create( configuration=config, status='running', triggered_by=triggered_by, started_at=timezone.now(), celery_task_id=self.request.id, sport_filter_id=sport_code, record_type_filter=record_type or '', ) try: logger.info(f'Starting CloudKit sync to {config.environment}') # Run sync result = perform_sync(config, job, sport_code, record_type) # Update job with results job.finished_at = timezone.now() job.records_synced = result.get('synced', 0) job.records_created = result.get('created', 0) job.records_updated = result.get('updated', 0) job.records_deleted = result.get('deleted', 0) job.records_failed = result.get('failed', 0) # Set status based on results if job.records_failed > 0 and job.records_synced == 0: job.status = 'failed' job.error_message = f'All {job.records_failed} records failed to sync' logger.error(f'Sync failed: {job.records_failed} failed, 0 synced') elif job.records_failed > 0: job.status = 'completed_with_errors' logger.warning(f'Sync completed with errors: {job.records_synced} synced, {job.records_failed} failed') else: job.status = 'completed' logger.info(f'Sync completed: {job.records_synced} synced') job.save() # Send notification if configured send_sync_notification.delay(job.id) return { 'job_id': job.id, 'status': 'completed', 'records_synced': job.records_synced, } except Exception as e: error_msg = str(e) error_tb = traceback.format_exc() job.status = 'failed' job.finished_at = timezone.now() job.error_message = error_msg job.save() logger.error(f'Sync failed: {error_msg}') # Send failure notification send_sync_notification.delay(job.id) # Retry if applicable if self.request.retries < self.max_retries: raise self.retry(exc=e, countdown=60 * (self.request.retries + 1)) return { 'job_id': job.id, 'status': 'failed', 'error': error_msg, } def perform_sync(config, job, sport_code=None, record_type=None): """ Perform the actual CloudKit sync. Syncs ALL local records to CloudKit (creates new, updates existing). """ from cloudkit.client import CloudKitClient from cloudkit.models import CloudKitSyncState from core.models import Sport, Conference, Division, Game, Team, Stadium, TeamAlias, StadiumAlias # Initialize CloudKit client from config client = config.get_client() # Test connection first try: client._get_token() except Exception as e: logger.error(f'CloudKit authentication failed: {e}') raise ValueError(f'CloudKit authentication failed: {e}') results = { 'synced': 0, 'created': 0, 'updated': 0, 'deleted': 0, 'failed': 0, } batch_size = config.batch_size # Sync Sports first (no dependencies) if not record_type or record_type == 'Sport': sports = Sport.objects.filter(is_active=True) job.sports_total = sports.count() job.current_record_type = 'Sport' job.save(update_fields=['sports_total', 'current_record_type']) sport_results = sync_model_records(client, 'Sport', sports, sport_to_dict, batch_size, job) results['synced'] += sport_results['synced'] results['failed'] += sport_results['failed'] # Sync Conferences (FK to Sport) if not record_type or record_type == 'Conference': conferences = Conference.objects.select_related('sport').all() job.conferences_total = conferences.count() job.current_record_type = 'Conference' job.save(update_fields=['conferences_total', 'current_record_type']) conf_results = sync_model_records(client, 'Conference', conferences, conference_to_dict, batch_size, job) results['synced'] += conf_results['synced'] results['failed'] += conf_results['failed'] # Sync Divisions (FK to Conference) if not record_type or record_type == 'Division': divisions = Division.objects.select_related('conference', 'conference__sport').all() job.divisions_total = divisions.count() job.current_record_type = 'Division' job.save(update_fields=['divisions_total', 'current_record_type']) div_results = sync_model_records(client, 'Division', divisions, division_to_dict, batch_size, job) results['synced'] += div_results['synced'] results['failed'] += div_results['failed'] # Sync Teams (dependencies for Games, TeamAliases) if not record_type or record_type == 'Team': teams = Team.objects.select_related('sport', 'home_stadium', 'division', 'division__conference').all() job.teams_total = teams.count() job.current_record_type = 'Team' job.save(update_fields=['teams_total', 'current_record_type']) team_results = sync_model_records(client, 'Team', teams, team_to_dict, batch_size, job) results['synced'] += team_results['synced'] results['failed'] += team_results['failed'] # Sync Stadiums (dependencies for Games, StadiumAliases) if not record_type or record_type == 'Stadium': stadiums = Stadium.objects.select_related('sport').all() job.stadiums_total = stadiums.count() job.current_record_type = 'Stadium' job.save(update_fields=['stadiums_total', 'current_record_type']) stadium_results = sync_model_records(client, 'Stadium', stadiums, stadium_to_dict, batch_size, job) results['synced'] += stadium_results['synced'] results['failed'] += stadium_results['failed'] # Sync TeamAliases (FK to Team) if not record_type or record_type == 'TeamAlias': team_aliases = TeamAlias.objects.select_related('team').all() job.team_aliases_total = team_aliases.count() job.current_record_type = 'TeamAlias' job.save(update_fields=['team_aliases_total', 'current_record_type']) ta_results = sync_model_records(client, 'TeamAlias', team_aliases, team_alias_to_dict, batch_size, job) results['synced'] += ta_results['synced'] results['failed'] += ta_results['failed'] # Sync StadiumAliases (FK to Stadium) if not record_type or record_type == 'StadiumAlias': stadium_aliases = StadiumAlias.objects.select_related('stadium').all() job.stadium_aliases_total = stadium_aliases.count() job.current_record_type = 'StadiumAlias' job.save(update_fields=['stadium_aliases_total', 'current_record_type']) sa_results = sync_model_records(client, 'StadiumAlias', stadium_aliases, stadium_alias_to_dict, batch_size, job) results['synced'] += sa_results['synced'] results['failed'] += sa_results['failed'] # Sync LeagueStructure (flattened hierarchy: league + conference + division) if not record_type or record_type == 'LeagueStructure': ls_records = build_league_structure_records() job.current_record_type = 'LeagueStructure' job.save(update_fields=['current_record_type']) ls_results = sync_dict_records(client, 'LeagueStructure', ls_records, batch_size, job) results['synced'] += ls_results['synced'] results['failed'] += ls_results['failed'] # Sync Games (depends on Teams, Stadiums) if not record_type or record_type == 'Game': games = Game.objects.select_related('home_team', 'away_team', 'stadium', 'sport').all() job.games_total = games.count() job.current_record_type = 'Game' job.save(update_fields=['games_total', 'current_record_type']) game_results = sync_model_records(client, 'Game', games, game_to_dict, batch_size, job) results['synced'] += game_results['synced'] results['failed'] += game_results['failed'] job.current_record_type = '' job.save(update_fields=['current_record_type']) return results def sync_model_records(client, record_type, queryset, to_dict_func, batch_size, job=None): """ Sync all records from a queryset to CloudKit. Updates progress frequently for real-time UI feedback. """ results = {'synced': 0, 'failed': 0} records = list(queryset) total = len(records) logger.info(f'[{record_type}] Starting sync: {total} total records') # Field names for job updates field_map = { 'Sport': ('sports_synced', 'sports_failed'), 'Conference': ('conferences_synced', 'conferences_failed'), 'Division': ('divisions_synced', 'divisions_failed'), 'Team': ('teams_synced', 'teams_failed'), 'Stadium': ('stadiums_synced', 'stadiums_failed'), 'TeamAlias': ('team_aliases_synced', 'team_aliases_failed'), 'StadiumAlias': ('stadium_aliases_synced', 'stadium_aliases_failed'), 'Game': ('games_synced', 'games_failed'), } synced_field, failed_field = field_map.get(record_type, (None, None)) # Use smaller batches for more frequent progress updates # CloudKit API batch size vs progress update frequency api_batch_size = min(batch_size, 50) # Max 50 per API call for frequent updates progress_update_interval = 10 # Update DB every 10 records records_since_last_update = 0 for i in range(0, total, api_batch_size): batch = records[i:i + api_batch_size] batch_num = (i // api_batch_size) + 1 total_batches = (total + api_batch_size - 1) // api_batch_size # Convert to CloudKit format cloudkit_records = [] for record in batch: try: data = to_dict_func(record) ck_record = client.to_cloudkit_record(record_type, data) cloudkit_records.append(ck_record) except Exception as e: logger.error(f'Failed to convert {record_type}:{record.id}: {e}') results['failed'] += 1 records_since_last_update += 1 if cloudkit_records: try: response = client.save_records(cloudkit_records) response_records = response.get('records', []) batch_synced = 0 batch_failed = 0 for rec in response_records: if 'serverErrorCode' in rec: logger.error(f'CloudKit error for {rec.get("recordName")}: {rec.get("reason")}') results['failed'] += 1 batch_failed += 1 else: results['synced'] += 1 batch_synced += 1 records_since_last_update += 1 # Update progress frequently for real-time UI if job and synced_field and records_since_last_update >= progress_update_interval: setattr(job, synced_field, results['synced']) setattr(job, failed_field, results['failed']) job.save(update_fields=[synced_field, failed_field]) records_since_last_update = 0 # Always update after each batch completes if job and synced_field: setattr(job, synced_field, results['synced']) setattr(job, failed_field, results['failed']) job.save(update_fields=[synced_field, failed_field]) records_since_last_update = 0 # Log progress after each batch remaining = total - (results['synced'] + results['failed']) logger.info( f'[{record_type}] Batch {batch_num}/{total_batches}: ' f'+{batch_synced} synced, +{batch_failed} failed | ' f'Progress: {results["synced"]}/{total} synced, {remaining} remaining' ) except Exception as e: logger.error(f'Batch save failed: {e}') results['failed'] += len(cloudkit_records) # Update job progress if job and failed_field: setattr(job, failed_field, results['failed']) job.save(update_fields=[failed_field]) remaining = total - (results['synced'] + results['failed']) logger.info( f'[{record_type}] Batch {batch_num}/{total_batches} FAILED | ' f'Progress: {results["synced"]}/{total} synced, {remaining} remaining' ) logger.info(f'[{record_type}] Complete: {results["synced"]} synced, {results["failed"]} failed') return results def build_league_structure_records(): """Build flat LeagueStructure dicts from Sport, Conference, Division models.""" from core.models import Sport, Conference, Division records = [] for sport in Sport.objects.filter(is_active=True).order_by('code'): league_id = f'ls_{sport.code}_league' records.append({ 'id': league_id, 'structureId': league_id, 'sport': sport.code, 'type': 'league', 'name': sport.name, 'abbreviation': sport.short_name, 'parentId': '', 'displayOrder': 0, }) for conf in Conference.objects.filter(sport=sport).order_by('order', 'name'): raw_conf_id = conf.canonical_id or f'conf_{conf.id}' conf_id = f'ls_{raw_conf_id}' records.append({ 'id': conf_id, 'structureId': conf_id, 'sport': sport.code, 'type': 'conference', 'name': conf.name, 'abbreviation': conf.short_name or '', 'parentId': league_id, 'displayOrder': conf.order, }) for div in Division.objects.filter(conference=conf).order_by('order', 'name'): raw_div_id = div.canonical_id or f'div_{div.id}' div_id = f'ls_{raw_div_id}' records.append({ 'id': div_id, 'structureId': div_id, 'sport': sport.code, 'type': 'division', 'name': div.name, 'abbreviation': div.short_name or '', 'parentId': conf_id, 'displayOrder': div.order, }) return records def sync_dict_records(client, record_type, dict_records, batch_size, job=None): """Sync pre-built dict records to CloudKit (no model/queryset needed).""" results = {'synced': 0, 'failed': 0} total = len(dict_records) logger.info(f'[{record_type}] Starting sync: {total} total records') api_batch_size = min(batch_size, 50) for i in range(0, total, api_batch_size): batch = dict_records[i:i + api_batch_size] batch_num = (i // api_batch_size) + 1 total_batches = (total + api_batch_size - 1) // api_batch_size cloudkit_records = [] for data in batch: try: ck_record = client.to_cloudkit_record(record_type, data) cloudkit_records.append(ck_record) except Exception as e: logger.error(f'Failed to convert {record_type}:{data.get("id")}: {e}') results['failed'] += 1 if cloudkit_records: try: response = client.save_records(cloudkit_records) batch_synced = 0 batch_failed = 0 for rec in response.get('records', []): if 'serverErrorCode' in rec: logger.error(f'CloudKit error for {rec.get("recordName")}: {rec.get("reason")}') results['failed'] += 1 batch_failed += 1 else: results['synced'] += 1 batch_synced += 1 remaining = total - (results['synced'] + results['failed']) logger.info( f'[{record_type}] Batch {batch_num}/{total_batches}: ' f'+{batch_synced} synced, +{batch_failed} failed | ' f'Progress: {results["synced"]}/{total} synced, {remaining} remaining' ) except Exception as e: logger.error(f'Batch save failed: {e}') results['failed'] += len(cloudkit_records) logger.info(f'[{record_type}] Complete: {results["synced"]} synced, {results["failed"]} failed') return results def sync_batch(client, states): """ Sync a batch of records to CloudKit. """ from core.models import Game, Team, Stadium result = {'synced': 0, 'created': 0, 'updated': 0, 'failed': 0} records_to_save = [] for state in states: try: # Get the local record record_data = get_record_data(state.record_type, state.record_id) if record_data: records_to_save.append({ 'state': state, 'data': record_data, }) except Exception as e: logger.error(f'Failed to get record {state.record_type}:{state.record_id}: {e}') state.mark_failed(str(e)) result['failed'] += 1 if records_to_save: # Convert to CloudKit format and upload cloudkit_records = [ client.to_cloudkit_record(r['state'].record_type, r['data']) for r in records_to_save ] try: response = client.save_records(cloudkit_records) for i, r in enumerate(records_to_save): if i < len(response.get('records', [])): change_tag = response['records'][i].get('recordChangeTag', '') r['state'].mark_synced(change_tag) result['synced'] += 1 if r['state'].cloudkit_record_name: result['updated'] += 1 else: result['created'] += 1 else: r['state'].mark_failed('No response for record') result['failed'] += 1 except Exception as e: logger.error(f'CloudKit save failed: {e}') for r in records_to_save: r['state'].mark_failed(str(e)) result['failed'] += len(records_to_save) return result def get_record_data(record_type, record_id): """ Get the local record data for a given type and ID. """ from core.models import Sport, Conference, Division, Game, Team, Stadium, TeamAlias, StadiumAlias if record_type == 'Sport': try: sport = Sport.objects.get(code=record_id) return sport_to_dict(sport) except Sport.DoesNotExist: return None elif record_type == 'Conference': try: conf = Conference.objects.select_related('sport').get(id=record_id) return conference_to_dict(conf) except Conference.DoesNotExist: return None elif record_type == 'Division': try: div = Division.objects.select_related('conference', 'conference__sport').get(id=record_id) return division_to_dict(div) except Division.DoesNotExist: return None elif record_type == 'Game': try: game = Game.objects.select_related( 'home_team', 'away_team', 'stadium', 'sport' ).get(id=record_id) return game_to_dict(game) except Game.DoesNotExist: return None elif record_type == 'Team': try: team = Team.objects.select_related('sport', 'home_stadium').get(id=record_id) return team_to_dict(team) except Team.DoesNotExist: return None elif record_type == 'Stadium': try: stadium = Stadium.objects.select_related('sport').get(id=record_id) return stadium_to_dict(stadium) except Stadium.DoesNotExist: return None elif record_type == 'TeamAlias': try: alias = TeamAlias.objects.select_related('team').get(id=record_id) return team_alias_to_dict(alias) except TeamAlias.DoesNotExist: return None elif record_type == 'StadiumAlias': try: alias = StadiumAlias.objects.select_related('stadium').get(id=record_id) return stadium_alias_to_dict(alias) except StadiumAlias.DoesNotExist: return None return None def sport_to_dict(sport): """Convert Sport model to dict for CloudKit.""" return { 'id': sport.code, 'abbreviation': sport.short_name, 'displayName': sport.name, 'iconName': sport.icon_name, 'colorHex': sport.color_hex, 'seasonStartMonth': sport.season_start_month, 'seasonEndMonth': sport.season_end_month, 'isActive': sport.is_active, } def game_to_dict(game): """Convert Game model to dict for CloudKit.""" return { 'id': game.id, 'sport': game.sport.code, 'season': game.season, 'homeTeamId': game.home_team_id, 'awayTeamId': game.away_team_id, 'stadiumId': game.stadium_id, 'gameDate': game.game_date.isoformat(), 'gameNumber': game.game_number, 'homeScore': game.home_score, 'awayScore': game.away_score, 'status': game.status, 'isNeutralSite': game.is_neutral_site, 'isPlayoff': game.is_playoff, 'playoffRound': game.playoff_round, } def team_to_dict(team): """Convert Team model to dict for CloudKit.""" division_id = None conference_id = None if team.division: division_id = team.division.canonical_id or f'div_{team.division.id}' conference_id = team.division.conference.canonical_id or f'conf_{team.division.conference.id}' return { 'id': team.id, 'sport': team.sport.code, 'city': team.city, 'name': team.name, 'fullName': team.full_name, 'abbreviation': team.abbreviation, 'homeStadiumId': team.home_stadium_id, 'primaryColor': team.primary_color, 'secondaryColor': team.secondary_color, 'logoUrl': team.logo_url, 'divisionId': division_id, 'conferenceId': conference_id, } def stadium_to_dict(stadium): """Convert Stadium model to dict for CloudKit.""" return { 'id': stadium.id, 'sport': stadium.sport.code, 'name': stadium.name, 'city': stadium.city, 'state': stadium.state, 'country': stadium.country, 'latitude': float(stadium.latitude) if stadium.latitude else None, 'longitude': float(stadium.longitude) if stadium.longitude else None, 'capacity': stadium.capacity, 'yearOpened': stadium.opened_year, 'imageUrl': stadium.image_url, 'surface': stadium.surface, 'roofType': stadium.roof_type, 'timezone': stadium.timezone, } def conference_to_dict(conf): """Convert Conference model to dict for CloudKit.""" return { 'id': conf.canonical_id or f'conf_{conf.id}', 'sport': conf.sport.code, 'name': conf.name, 'shortName': conf.short_name, 'order': conf.order, } def division_to_dict(div): """Convert Division model to dict for CloudKit.""" return { 'id': div.canonical_id or f'div_{div.id}', 'conferenceId': div.conference.canonical_id or f'conf_{div.conference.id}', 'sport': div.conference.sport.code, 'name': div.name, 'shortName': div.short_name, 'order': div.order, } def team_alias_to_dict(alias): """Convert TeamAlias model to dict for CloudKit.""" return { 'id': f'team_alias_{alias.id}', 'teamId': alias.team.id, 'alias': alias.alias, 'aliasType': alias.alias_type, 'validFrom': alias.valid_from.isoformat() if alias.valid_from else None, 'validUntil': alias.valid_until.isoformat() if alias.valid_until else None, 'isPrimary': alias.is_primary, } def stadium_alias_to_dict(alias): """Convert StadiumAlias model to dict for CloudKit.""" return { 'id': f'stadium_alias_{alias.id}', 'stadiumId': alias.stadium.id, 'alias': alias.alias, 'aliasType': alias.alias_type, 'validFrom': alias.valid_from.isoformat() if alias.valid_from else None, 'validUntil': alias.valid_until.isoformat() if alias.valid_until else None, 'isPrimary': alias.is_primary, } @shared_task def mark_records_for_sync(record_type: str, record_ids: list): """ Mark records as needing sync after local changes. """ from cloudkit.models import CloudKitSyncState for record_id in record_ids: state, created = CloudKitSyncState.objects.get_or_create( record_type=record_type, record_id=record_id, ) state.mark_pending() return {'marked': len(record_ids)}