"""Upload state manager for resumable uploads. This module tracks upload progress to enable resuming interrupted uploads. State is persisted to JSON files in the .parser_state directory. """ import json from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Optional from ..config import STATE_DIR @dataclass class RecordState: """State of an individual record upload. Attributes: record_name: Canonical record ID record_type: CloudKit record type uploaded_at: Timestamp when successfully uploaded record_change_tag: CloudKit version tag status: 'pending', 'uploaded', 'failed' error_message: Error message if failed retry_count: Number of retry attempts """ record_name: str record_type: str uploaded_at: Optional[datetime] = None record_change_tag: Optional[str] = None status: str = "pending" error_message: Optional[str] = None retry_count: int = 0 def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { "record_name": self.record_name, "record_type": self.record_type, "uploaded_at": self.uploaded_at.isoformat() if self.uploaded_at else None, "record_change_tag": self.record_change_tag, "status": self.status, "error_message": self.error_message, "retry_count": self.retry_count, } @classmethod def from_dict(cls, data: dict) -> "RecordState": """Create RecordState from dictionary.""" uploaded_at = data.get("uploaded_at") if uploaded_at: uploaded_at = datetime.fromisoformat(uploaded_at) return cls( record_name=data["record_name"], record_type=data["record_type"], uploaded_at=uploaded_at, record_change_tag=data.get("record_change_tag"), status=data.get("status", "pending"), error_message=data.get("error_message"), retry_count=data.get("retry_count", 0), ) @dataclass class UploadSession: """Tracks the state of an upload session. Attributes: sport: Sport code season: Season start year environment: CloudKit environment started_at: When the upload session started last_updated: When the state was last updated records: Dictionary of record_name -> RecordState total_count: Total number of records to upload """ sport: str season: int environment: str started_at: datetime = field(default_factory=datetime.utcnow) last_updated: datetime = field(default_factory=datetime.utcnow) records: dict[str, RecordState] = field(default_factory=dict) total_count: int = 0 @property def uploaded_count(self) -> int: """Count of successfully uploaded records.""" return sum(1 for r in self.records.values() if r.status == "uploaded") @property def pending_count(self) -> int: """Count of pending records.""" return sum(1 for r in self.records.values() if r.status == "pending") @property def failed_count(self) -> int: """Count of failed records.""" return sum(1 for r in self.records.values() if r.status == "failed") @property def is_complete(self) -> bool: """Check if all records have been processed.""" return self.pending_count == 0 @property def progress_percent(self) -> float: """Calculate upload progress as percentage.""" if self.total_count == 0: return 100.0 return (self.uploaded_count / self.total_count) * 100 def get_pending_records(self) -> list[str]: """Get list of record names that still need to be uploaded.""" return [ name for name, state in self.records.items() if state.status == "pending" ] def get_failed_records(self) -> list[str]: """Get list of record names that failed to upload.""" return [ name for name, state in self.records.items() if state.status == "failed" ] def get_retryable_records(self, max_retries: int = 3) -> list[str]: """Get failed records that can be retried.""" return [ name for name, state in self.records.items() if state.status == "failed" and state.retry_count < max_retries ] def mark_uploaded( self, record_name: str, record_change_tag: Optional[str] = None, ) -> None: """Mark a record as successfully uploaded.""" if record_name in self.records: state = self.records[record_name] state.status = "uploaded" state.uploaded_at = datetime.utcnow() state.record_change_tag = record_change_tag state.error_message = None self.last_updated = datetime.utcnow() def mark_failed(self, record_name: str, error_message: str) -> None: """Mark a record as failed.""" if record_name in self.records: state = self.records[record_name] state.status = "failed" state.error_message = error_message state.retry_count += 1 self.last_updated = datetime.utcnow() def mark_pending(self, record_name: str) -> None: """Mark a record as pending (for retry).""" if record_name in self.records: state = self.records[record_name] state.status = "pending" state.error_message = None self.last_updated = datetime.utcnow() def add_record(self, record_name: str, record_type: str) -> None: """Add a new record to track.""" if record_name not in self.records: self.records[record_name] = RecordState( record_name=record_name, record_type=record_type, ) self.total_count = len(self.records) def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { "sport": self.sport, "season": self.season, "environment": self.environment, "started_at": self.started_at.isoformat(), "last_updated": self.last_updated.isoformat(), "total_count": self.total_count, "records": { name: state.to_dict() for name, state in self.records.items() }, } @classmethod def from_dict(cls, data: dict) -> "UploadSession": """Create UploadSession from dictionary.""" session = cls( sport=data["sport"], season=data["season"], environment=data["environment"], started_at=datetime.fromisoformat(data["started_at"]), last_updated=datetime.fromisoformat(data["last_updated"]), total_count=data.get("total_count", 0), ) for name, record_data in data.get("records", {}).items(): session.records[name] = RecordState.from_dict(record_data) return session class StateManager: """Manages upload state persistence. State files are stored in .parser_state/ with naming convention: upload_state_{sport}_{season}_{environment}.json """ def __init__(self, state_dir: Optional[Path] = None): """Initialize the state manager. Args: state_dir: Directory for state files (default: .parser_state/) """ self.state_dir = state_dir or STATE_DIR self.state_dir.mkdir(parents=True, exist_ok=True) def _get_state_file(self, sport: str, season: int, environment: str) -> Path: """Get the path to a state file.""" return self.state_dir / f"upload_state_{sport}_{season}_{environment}.json" def load_session( self, sport: str, season: int, environment: str, ) -> Optional[UploadSession]: """Load an existing upload session. Args: sport: Sport code season: Season start year environment: CloudKit environment Returns: UploadSession if exists, None otherwise """ state_file = self._get_state_file(sport, season, environment) if not state_file.exists(): return None try: with open(state_file, "r", encoding="utf-8") as f: data = json.load(f) return UploadSession.from_dict(data) except (json.JSONDecodeError, KeyError) as e: # Corrupted state file return None def save_session(self, session: UploadSession) -> None: """Save an upload session to disk. Args: session: The session to save """ state_file = self._get_state_file( session.sport, session.season, session.environment, ) session.last_updated = datetime.utcnow() with open(state_file, "w", encoding="utf-8") as f: json.dump(session.to_dict(), f, indent=2) def create_session( self, sport: str, season: int, environment: str, record_names: list[tuple[str, str]], # (record_name, record_type) ) -> UploadSession: """Create a new upload session. Args: sport: Sport code season: Season start year environment: CloudKit environment record_names: List of (record_name, record_type) tuples Returns: New UploadSession """ session = UploadSession( sport=sport, season=season, environment=environment, ) for record_name, record_type in record_names: session.add_record(record_name, record_type) self.save_session(session) return session def delete_session(self, sport: str, season: int, environment: str) -> bool: """Delete an upload session state file. Args: sport: Sport code season: Season start year environment: CloudKit environment Returns: True if deleted, False if not found """ state_file = self._get_state_file(sport, season, environment) if state_file.exists(): state_file.unlink() return True return False def list_sessions(self) -> list[dict]: """List all upload sessions. Returns: List of session summaries """ sessions = [] for state_file in self.state_dir.glob("upload_state_*.json"): try: with open(state_file, "r", encoding="utf-8") as f: data = json.load(f) session = UploadSession.from_dict(data) sessions.append({ "sport": session.sport, "season": session.season, "environment": session.environment, "started_at": session.started_at.isoformat(), "last_updated": session.last_updated.isoformat(), "progress": f"{session.uploaded_count}/{session.total_count}", "progress_percent": f"{session.progress_percent:.1f}%", "status": "complete" if session.is_complete else "in_progress", "failed_count": session.failed_count, }) except (json.JSONDecodeError, KeyError): continue return sessions def get_session_or_create( self, sport: str, season: int, environment: str, record_names: list[tuple[str, str]], resume: bool = False, ) -> UploadSession: """Get existing session or create new one. Args: sport: Sport code season: Season start year environment: CloudKit environment record_names: List of (record_name, record_type) tuples resume: Whether to resume existing session Returns: UploadSession (existing or new) """ if resume: existing = self.load_session(sport, season, environment) if existing: # Add any new records not in existing session existing_names = set(existing.records.keys()) for record_name, record_type in record_names: if record_name not in existing_names: existing.add_record(record_name, record_type) return existing # Create new session (overwrites existing) return self.create_session(sport, season, environment, record_names)