#!/usr/bin/env python3 """ Cross-Validation System for SportsTime App Compares scraped data from multiple sources and flags discrepancies. Usage: python validate_data.py --data-dir ./data python validate_data.py --scrape-and-validate --season 2025 """ import argparse import json from datetime import datetime from pathlib import Path from dataclasses import dataclass, asdict, field from typing import Optional from collections import defaultdict # Import from core module from core import Game, Stadium, assign_stable_ids # Import from sport modules from nba import scrape_nba_basketball_reference, NBA_TEAMS from mlb import scrape_mlb_statsapi, scrape_mlb_baseball_reference, MLB_TEAMS from nhl import scrape_nhl_hockey_reference, NHL_TEAMS from nfl import scrape_nfl_espn, NFL_TEAMS # Import secondary sports from scrape_schedules (stubs) from scrape_schedules import ( scrape_wnba_espn, scrape_mls_espn, scrape_nwsl_espn, scrape_cbb_espn, WNBA_TEAMS, MLS_TEAMS, NWSL_TEAMS, ) # ============================================================================= # VALIDATION DATA CLASSES # ============================================================================= @dataclass class Discrepancy: """Represents a discrepancy between sources.""" game_key: str field: str # 'date', 'time', 'venue', 'teams', 'missing' source1: str source2: str value1: str value2: str severity: str # 'high', 'medium', 'low' @dataclass class ValidationReport: """Summary of validation results.""" sport: str season: str sources: list total_games_source1: int = 0 total_games_source2: int = 0 games_matched: int = 0 games_missing_source1: int = 0 games_missing_source2: int = 0 discrepancies: list = field(default_factory=list) def to_dict(self): return { 'sport': self.sport, 'season': self.season, 'sources': self.sources, 'total_games_source1': self.total_games_source1, 'total_games_source2': self.total_games_source2, 'games_matched': self.games_matched, 'games_missing_source1': self.games_missing_source1, 'games_missing_source2': self.games_missing_source2, 'discrepancies': [asdict(d) for d in self.discrepancies], 'discrepancy_summary': self.get_summary() } def get_summary(self): by_field = defaultdict(int) by_severity = defaultdict(int) for d in self.discrepancies: by_field[d.field] += 1 by_severity[d.severity] += 1 return { 'by_field': dict(by_field), 'by_severity': dict(by_severity) } # ============================================================================= # GAME KEY GENERATION # ============================================================================= def normalize_abbrev(abbrev: str, sport: str) -> str: """Normalize team abbreviations across different sources.""" abbrev = abbrev.upper().strip() if sport == 'MLB': # MLB abbreviation mappings between sources mlb_mappings = { 'AZ': 'ARI', 'ARI': 'ARI', # Arizona 'ATH': 'OAK', 'OAK': 'OAK', # Oakland/Athletics 'CWS': 'CHW', 'CHW': 'CHW', # Chicago White Sox 'KC': 'KCR', 'KCR': 'KCR', # Kansas City 'SD': 'SDP', 'SDP': 'SDP', # San Diego 'SF': 'SFG', 'SFG': 'SFG', # San Francisco 'TB': 'TBR', 'TBR': 'TBR', # Tampa Bay 'WSH': 'WSN', 'WSN': 'WSN', # Washington } return mlb_mappings.get(abbrev, abbrev) elif sport == 'NBA': nba_mappings = { 'PHX': 'PHO', 'PHO': 'PHO', # Phoenix 'BKN': 'BRK', 'BRK': 'BRK', # Brooklyn 'CHA': 'CHO', 'CHO': 'CHO', # Charlotte 'NOP': 'NOP', 'NO': 'NOP', # New Orleans } return nba_mappings.get(abbrev, abbrev) elif sport == 'NHL': nhl_mappings = { 'ARI': 'UTA', 'UTA': 'UTA', # Arizona moved to Utah 'VGS': 'VGK', 'VGK': 'VGK', # Vegas } return nhl_mappings.get(abbrev, abbrev) return abbrev def generate_game_key(game: Game) -> str: """ Generate a unique key for matching games across sources. Uses date + normalized team abbreviations (sorted) to match. """ home = normalize_abbrev(game.home_team_abbrev, game.sport) away = normalize_abbrev(game.away_team_abbrev, game.sport) teams = sorted([home, away]) return f"{game.date}_{teams[0]}_{teams[1]}" def normalize_team_name(name: str, sport: str) -> str: """Normalize team name variations.""" teams = { 'NBA': NBA_TEAMS, 'MLB': MLB_TEAMS, 'NHL': NHL_TEAMS, 'WNBA': WNBA_TEAMS, 'MLS': MLS_TEAMS, 'NWSL': NWSL_TEAMS, 'NFL': NFL_TEAMS, }.get(sport, {}) name_lower = name.lower().strip() # Check against known team names for abbrev, info in teams.items(): if name_lower == info['name'].lower(): return abbrev # Check city match if name_lower == info['city'].lower(): return abbrev # Check partial match if name_lower in info['name'].lower() or info['name'].lower() in name_lower: return abbrev return name[:3].upper() def normalize_venue(venue: str) -> str: """Normalize venue name for comparison.""" # Remove common variations normalized = venue.lower().strip() # Remove sponsorship prefixes that change replacements = [ ('at ', ''), ('the ', ''), (' stadium', ''), (' arena', ''), (' center', ''), (' field', ''), (' park', ''), ('.com', ''), ('crypto', 'crypto.com'), ] for old, new in replacements: normalized = normalized.replace(old, new) return normalized.strip() def normalize_time(time_str: Optional[str]) -> Optional[str]: """Normalize time format to HH:MM.""" if not time_str: return None time_str = time_str.strip().lower() # Handle various formats if 'pm' in time_str or 'am' in time_str: # 12-hour format try: for fmt in ['%I:%M%p', '%I:%M %p', '%I%p']: try: dt = datetime.strptime(time_str.replace(' ', ''), fmt) return dt.strftime('%H:%M') except: continue except: pass # Already 24-hour or just numbers if ':' in time_str: parts = time_str.split(':') if len(parts) >= 2: try: hour = int(parts[0]) minute = int(parts[1][:2]) return f"{hour:02d}:{minute:02d}" except: pass return time_str # ============================================================================= # CROSS-VALIDATION LOGIC # ============================================================================= def validate_games( games1: list[Game], games2: list[Game], source1_name: str, source2_name: str, sport: str, season: str ) -> ValidationReport: """ Compare two lists of games and find discrepancies. """ report = ValidationReport( sport=sport, season=season, sources=[source1_name, source2_name], total_games_source1=len(games1), total_games_source2=len(games2) ) # Index games by key games1_by_key = {} for g in games1: key = generate_game_key(g) games1_by_key[key] = g games2_by_key = {} for g in games2: key = generate_game_key(g) games2_by_key[key] = g # Find matches and discrepancies all_keys = set(games1_by_key.keys()) | set(games2_by_key.keys()) for key in all_keys: g1 = games1_by_key.get(key) g2 = games2_by_key.get(key) if g1 and g2: # Both sources have this game - compare fields report.games_matched += 1 # Compare dates (should match by key, but double-check) if g1.date != g2.date: report.discrepancies.append(Discrepancy( game_key=key, field='date', source1=source1_name, source2=source2_name, value1=g1.date, value2=g2.date, severity='high' )) # Compare times time1 = normalize_time(g1.time) time2 = normalize_time(g2.time) if time1 and time2 and time1 != time2: # Check if times are close (within 1 hour - could be timezone) try: t1 = datetime.strptime(time1, '%H:%M') t2 = datetime.strptime(time2, '%H:%M') diff_minutes = abs((t1 - t2).total_seconds() / 60) severity = 'low' if diff_minutes <= 60 else 'medium' except: severity = 'medium' report.discrepancies.append(Discrepancy( game_key=key, field='time', source1=source1_name, source2=source2_name, value1=time1 or '', value2=time2 or '', severity=severity )) # Compare venues venue1 = normalize_venue(g1.venue) if g1.venue else '' venue2 = normalize_venue(g2.venue) if g2.venue else '' if venue1 and venue2 and venue1 != venue2: # Check for partial match if venue1 not in venue2 and venue2 not in venue1: report.discrepancies.append(Discrepancy( game_key=key, field='venue', source1=source1_name, source2=source2_name, value1=g1.venue, value2=g2.venue, severity='low' )) elif g1 and not g2: # Game only in source 1 report.games_missing_source2 += 1 # Determine severity based on date # Spring training (March before ~25th) and playoffs (Oct+) are expected differences severity = 'high' try: game_date = datetime.strptime(g1.date, '%Y-%m-%d') month = game_date.month day = game_date.day if month == 3 and day < 26: # Spring training severity = 'medium' elif month >= 10: # Playoffs/postseason severity = 'medium' except: pass report.discrepancies.append(Discrepancy( game_key=key, field='missing', source1=source1_name, source2=source2_name, value1=f"{g1.away_team} @ {g1.home_team}", value2='NOT FOUND', severity=severity )) else: # Game only in source 2 report.games_missing_source1 += 1 # Determine severity based on date severity = 'high' try: game_date = datetime.strptime(g2.date, '%Y-%m-%d') month = game_date.month day = game_date.day if month == 3 and day < 26: # Spring training severity = 'medium' elif month >= 10: # Playoffs/postseason severity = 'medium' except: pass report.discrepancies.append(Discrepancy( game_key=key, field='missing', source1=source1_name, source2=source2_name, value1='NOT FOUND', value2=f"{g2.away_team} @ {g2.home_team}", severity=severity )) return report def validate_stadiums(stadiums: list[Stadium]) -> list[dict]: """ Validate stadium data for completeness and accuracy. """ issues = [] for s in stadiums: # Check for missing coordinates if s.latitude == 0 or s.longitude == 0: issues.append({ 'stadium': s.name, 'sport': s.sport, 'issue': 'Missing coordinates', 'severity': 'high' }) # Check for missing capacity if s.capacity == 0: issues.append({ 'stadium': s.name, 'sport': s.sport, 'issue': 'Missing capacity', 'severity': 'low' }) # Check coordinate bounds (roughly North America) if s.latitude != 0: if not (24 < s.latitude < 55): issues.append({ 'stadium': s.name, 'sport': s.sport, 'issue': f'Latitude {s.latitude} outside expected range', 'severity': 'medium' }) if s.longitude != 0: if not (-130 < s.longitude < -60): issues.append({ 'stadium': s.name, 'sport': s.sport, 'issue': f'Longitude {s.longitude} outside expected range', 'severity': 'medium' }) return issues # ============================================================================= # MULTI-SOURCE SCRAPING # ============================================================================= def scrape_nba_all_sources(season: int) -> dict: """Scrape NBA from all available sources.""" nba_season = f"{season-1}-{str(season)[2:]}" games = scrape_nba_basketball_reference(season) games = assign_stable_ids(games, 'NBA', nba_season) return { 'basketball-reference': games, # ESPN requires JS rendering, skip for now } def scrape_mlb_all_sources(season: int) -> dict: """Scrape MLB from all available sources.""" mlb_season = str(season) # MLB API uses official gamePk - already stable api_games = scrape_mlb_statsapi(season) # Baseball-Reference needs stable IDs br_games = scrape_mlb_baseball_reference(season) br_games = assign_stable_ids(br_games, 'MLB', mlb_season) return { 'statsapi.mlb.com': api_games, 'baseball-reference': br_games, } def scrape_nhl_all_sources(season: int) -> dict: """Scrape NHL from all available sources.""" nhl_season = f"{season-1}-{str(season)[2:]}" games = scrape_nhl_hockey_reference(season) games = assign_stable_ids(games, 'NHL', nhl_season) return { 'hockey-reference': games, # NHL API requires date iteration, skip for now } # ============================================================================= # MAIN # ============================================================================= def main(): parser = argparse.ArgumentParser(description='Validate sports data') parser.add_argument('--data-dir', type=str, default='./data', help='Data directory') parser.add_argument('--scrape-and-validate', action='store_true', help='Scrape fresh and validate') parser.add_argument('--season', type=int, default=2025, help='Season year') parser.add_argument('--sport', choices=['nba', 'mlb', 'nhl', 'nfl', 'wnba', 'mls', 'nwsl', 'cbb', 'all'], default='all') parser.add_argument('--output', type=str, default='./data/validation_report.json') args = parser.parse_args() reports = [] stadium_issues = [] if args.scrape_and_validate: print("\n" + "="*60) print("CROSS-VALIDATION MODE") print("="*60) # MLB has two good sources - validate if args.sport in ['mlb', 'all']: print(f"\n--- MLB {args.season} ---") mlb_sources = scrape_mlb_all_sources(args.season) source_names = list(mlb_sources.keys()) if len(source_names) >= 2: games1 = mlb_sources[source_names[0]] games2 = mlb_sources[source_names[1]] if games1 and games2: report = validate_games( games1, games2, source_names[0], source_names[1], 'MLB', str(args.season) ) reports.append(report) print(f" Compared {report.total_games_source1} vs {report.total_games_source2} games") print(f" Matched: {report.games_matched}") print(f" Discrepancies: {len(report.discrepancies)}") # NBA (single source for now, but validate data quality) if args.sport in ['nba', 'all']: print(f"\n--- NBA {args.season} ---") nba_sources = scrape_nba_all_sources(args.season) games = nba_sources.get('basketball-reference', []) print(f" Got {len(games)} games from Basketball-Reference") # Validate internal consistency teams_seen = defaultdict(int) for g in games: teams_seen[g.home_team_abbrev] += 1 teams_seen[g.away_team_abbrev] += 1 # Each team should have ~82 games for team, count in teams_seen.items(): if count < 70 or count > 95: print(f" Warning: {team} has {count} games (expected ~82)") else: # Load existing data and validate data_dir = Path(args.data_dir) # Load games games_file = data_dir / 'games.json' if games_file.exists(): with open(games_file) as f: games_data = json.load(f) print(f"\nLoaded {len(games_data)} games from {games_file}") # Group by sport and validate counts by_sport = defaultdict(list) for g in games_data: by_sport[g['sport']].append(g) for sport, sport_games in by_sport.items(): print(f" {sport}: {len(sport_games)} games") # Load and validate stadiums stadiums_file = data_dir / 'stadiums.json' if stadiums_file.exists(): with open(stadiums_file) as f: stadiums_data = json.load(f) stadiums = [Stadium(**s) for s in stadiums_data] print(f"\nLoaded {len(stadiums)} stadiums from {stadiums_file}") stadium_issues = validate_stadiums(stadiums) if stadium_issues: print(f"\nStadium validation issues ({len(stadium_issues)}):") for issue in stadium_issues[:10]: print(f" [{issue['severity'].upper()}] {issue['stadium']}: {issue['issue']}") if len(stadium_issues) > 10: print(f" ... and {len(stadium_issues) - 10} more") # Save validation report output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) full_report = { 'generated_at': datetime.now().isoformat(), 'season': args.season, 'game_validations': [r.to_dict() for r in reports], 'stadium_issues': stadium_issues } with open(output_path, 'w') as f: json.dump(full_report, f, indent=2) print(f"\n Validation report saved to {output_path}") # Summary print("\n" + "="*60) print("VALIDATION SUMMARY") print("="*60) total_discrepancies = sum(len(r.discrepancies) for r in reports) high_severity = sum( 1 for r in reports for d in r.discrepancies if d.severity == 'high' ) print(f"Total game validation reports: {len(reports)}") print(f"Total discrepancies found: {total_discrepancies}") print(f"High severity issues: {high_severity}") print(f"Stadium data issues: {len(stadium_issues)}") if __name__ == '__main__': main()