"""Validation report generator for scraped data.""" from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Optional from ..config import EXPECTED_GAME_COUNTS, OUTPUT_DIR from ..models.game import Game from ..models.team import Team from ..models.stadium import Stadium from ..models.aliases import ManualReviewItem, ReviewReason @dataclass class ValidationSummary: """Summary statistics for validation report. Attributes: total_games: Total number of games scraped valid_games: Number of games with all data resolved review_count: Number of items needing manual review unresolved_teams: Count of unresolved team names unresolved_stadiums: Count of unresolved stadium names duplicate_games: Count of potential duplicate games missing_data: Count of games with missing required data expected_games: Expected number of games for this sport """ total_games: int = 0 valid_games: int = 0 review_count: int = 0 unresolved_teams: int = 0 unresolved_stadiums: int = 0 duplicate_games: int = 0 missing_data: int = 0 expected_games: int = 0 @property def game_coverage(self) -> float: """Percentage of expected games scraped.""" if self.expected_games == 0: return 100.0 return (self.total_games / self.expected_games) * 100 @property def validity_rate(self) -> float: """Percentage of games that are valid.""" if self.total_games == 0: return 100.0 return (self.valid_games / self.total_games) * 100 @property def needs_review(self) -> bool: """Check if report requires manual review.""" return self.review_count > 0 @property def status_emoji(self) -> str: """Get status emoji for report header.""" if self.review_count == 0 and self.game_coverage >= 95: return "✅" elif self.review_count <= 10 and self.game_coverage >= 80: return "⚠️" else: return "❌" @dataclass class ValidationReport: """Complete validation report for a sport/season. Attributes: sport: Sport code season: Season start year source: Name of the data source used summary: Summary statistics review_items: Items requiring manual review games: All scraped games teams: All teams stadiums: All stadiums generated_at: Timestamp of report generation """ sport: str season: int source: str summary: ValidationSummary review_items: list[ManualReviewItem] = field(default_factory=list) games: list[Game] = field(default_factory=list) teams: list[Team] = field(default_factory=list) stadiums: list[Stadium] = field(default_factory=list) generated_at: datetime = field(default_factory=datetime.now) def to_markdown(self) -> str: """Generate markdown report. Returns: Complete markdown report as string """ lines = [] # Header season_str = f"{self.season}-{str(self.season + 1)[-2:]}" lines.append(f"# Validation Report: {self.sport.upper()} {season_str}") lines.append("") lines.append(f"**Generated**: {self.generated_at.strftime('%Y-%m-%d %H:%M:%S')} UTC") lines.append(f"**Source**: {self.source}") lines.append(f"**Status**: {self.summary.status_emoji} {'Needs Review' if self.summary.needs_review else 'Ready'}") lines.append("") # Summary table lines.append("## Summary") lines.append("") lines.append("| Metric | Count |") lines.append("|--------|-------|") lines.append(f"| Total Games | {self.summary.total_games:,} |") lines.append(f"| Valid Games | {self.summary.valid_games:,} |") lines.append(f"| Expected Games | {self.summary.expected_games:,} |") lines.append(f"| Coverage | {self.summary.game_coverage:.1f}% |") lines.append(f"| Manual Review | {self.summary.review_count} |") lines.append(f"| Unresolved Teams | {self.summary.unresolved_teams} |") lines.append(f"| Unresolved Stadiums | {self.summary.unresolved_stadiums} |") lines.append(f"| Duplicate Games | {self.summary.duplicate_games} |") lines.append(f"| Missing Data | {self.summary.missing_data} |") lines.append("") # Manual review section if self.review_items: lines.append("## Manual Review Required") lines.append("") # Group by reason by_reason: dict[ReviewReason, list[ManualReviewItem]] = {} for item in self.review_items: if item.reason not in by_reason: by_reason[item.reason] = [] by_reason[item.reason].append(item) for reason, items in sorted(by_reason.items(), key=lambda x: x[0].value): reason_title = reason.value.replace("_", " ").title() lines.append(f"### {reason_title} ({len(items)})") lines.append("") for item in items[:10]: # Limit to first 10 per category lines.append(item.to_markdown()) if len(items) > 10: lines.append(f"*... and {len(items) - 10} more items*") lines.append("") # Teams section lines.append("## Teams") lines.append("") lines.append(f"Total teams: {len(self.teams)}") lines.append("") if self.teams: lines.append("| ID | Full Name | City | Conference | Division |") lines.append("|-----|-----------|------|------------|----------|") for team in sorted(self.teams, key=lambda t: t.full_name)[:20]: lines.append( f"| `{team.id}` | {team.full_name} | {team.city} | " f"{team.conference or '-'} | {team.division or '-'} |" ) if len(self.teams) > 20: lines.append(f"*... and {len(self.teams) - 20} more teams*") lines.append("") # Stadiums section lines.append("## Stadiums") lines.append("") lines.append(f"Total stadiums: {len(self.stadiums)}") lines.append("") if self.stadiums: lines.append("| ID | Name | City | State |") lines.append("|-----|------|------|-------|") for stadium in sorted(self.stadiums, key=lambda s: s.name)[:20]: lines.append( f"| `{stadium.id}` | {stadium.name} | " f"{stadium.city} | {stadium.state} |" ) if len(self.stadiums) > 20: lines.append(f"*... and {len(self.stadiums) - 20} more stadiums*") lines.append("") # Game samples section lines.append("## Game Samples") lines.append("") if self.games: # Show first 10 games lines.append("### First 10 Games") lines.append("") lines.append("| ID | Date | Away | Home | Status |") lines.append("|----|------|------|------|--------|") for game in self.games[:10]: date_str = game.game_date.strftime("%Y-%m-%d") lines.append( f"| `{game.id}` | {date_str} | {game.away_team_id} | " f"{game.home_team_id} | {game.status} |" ) lines.append("") # Show games with issues problem_games = [g for g in self.games if not g.stadium_id] if problem_games: lines.append("### Games Missing Stadium") lines.append("") lines.append("| ID | Date | Away | Home | Raw Stadium |") lines.append("|----|------|------|------|-------------|") for game in problem_games[:10]: date_str = game.game_date.strftime("%Y-%m-%d") lines.append( f"| `{game.id}` | {date_str} | {game.away_team_id} | " f"{game.home_team_id} | {game.raw_stadium or '-'} |" ) if len(problem_games) > 10: lines.append(f"*... and {len(problem_games) - 10} more*") lines.append("") lines.append("---") lines.append("") lines.append("*Generated by sportstime-parser*") return "\n".join(lines) def save(self, output_dir: Optional[Path] = None) -> Path: """Save report to markdown file. Args: output_dir: Directory to save to (default: OUTPUT_DIR) Returns: Path to saved file """ if output_dir is None: output_dir = OUTPUT_DIR output_dir.mkdir(parents=True, exist_ok=True) filename = f"validation_{self.sport}_{self.season}.md" filepath = output_dir / filename with open(filepath, "w", encoding="utf-8") as f: f.write(self.to_markdown()) return filepath def generate_report( sport: str, season: int, source: str, games: list[Game], teams: list[Team], stadiums: list[Stadium], review_items: list[ManualReviewItem], ) -> ValidationReport: """Generate a validation report from scraped data. Args: sport: Sport code season: Season start year source: Data source name games: List of scraped games teams: List of teams stadiums: List of stadiums review_items: Items requiring review Returns: Complete ValidationReport """ # Calculate summary summary = ValidationSummary( total_games=len(games), expected_games=EXPECTED_GAME_COUNTS.get(sport, 0), review_count=len(review_items), ) # Count review item types for item in review_items: if item.reason == ReviewReason.UNRESOLVED_TEAM: summary.unresolved_teams += 1 elif item.reason == ReviewReason.UNRESOLVED_STADIUM: summary.unresolved_stadiums += 1 elif item.reason == ReviewReason.DUPLICATE_GAME: summary.duplicate_games += 1 elif item.reason == ReviewReason.MISSING_DATA: summary.missing_data += 1 # Count valid games (games with all required data) valid_count = 0 for game in games: if game.home_team_id and game.away_team_id: valid_count += 1 summary.valid_games = valid_count return ValidationReport( sport=sport, season=season, source=source, summary=summary, review_items=review_items, games=games, teams=teams, stadiums=stadiums, ) def detect_duplicate_games(games: list[Game]) -> list[ManualReviewItem]: """Detect potential duplicate games. Duplicates are identified by having the same: - Home team - Away team - Date (ignoring time) Args: games: List of games to check Returns: List of ManualReviewItems for duplicates """ from uuid import uuid4 seen: dict[str, Game] = {} duplicates: list[ManualReviewItem] = [] for game in games: # Create a key for the game key = ( f"{game.home_team_id}_{game.away_team_id}_" f"{game.game_date.strftime('%Y%m%d')}" ) if key in seen: # Skip if it's a doubleheader (has game_number) if game.game_number: continue existing = seen[key] duplicates.append( ManualReviewItem( id=f"dup_{uuid4().hex[:8]}", reason=ReviewReason.DUPLICATE_GAME, sport=game.sport, raw_value=f"{game.id} vs {existing.id}", context={ "game1_id": existing.id, "game2_id": game.id, "date": game.game_date.strftime("%Y-%m-%d"), "home": game.home_team_id, "away": game.away_team_id, }, game_date=game.game_date.date(), ) ) else: seen[key] = game return duplicates def validate_games(games: list[Game]) -> list[ManualReviewItem]: """Validate games and return issues found. Checks: - Missing stadium IDs - Missing team IDs - Invalid dates - Duplicate games Args: games: List of games to validate Returns: List of ManualReviewItems for issues """ from uuid import uuid4 issues: list[ManualReviewItem] = [] for game in games: # Check for missing stadium if not game.stadium_id: issues.append( ManualReviewItem( id=f"missing_{uuid4().hex[:8]}", reason=ReviewReason.MISSING_DATA, sport=game.sport, raw_value=f"Game {game.id} missing stadium", context={ "game_id": game.id, "field": "stadium_id", "raw_stadium": game.raw_stadium, }, source_url=game.source_url, game_date=game.game_date.date(), ) ) # Check for duplicates dup_issues = detect_duplicate_games(games) issues.extend(dup_issues) return issues