#!/usr/bin/env python3 """ SportsTime Canonicalization Pipeline ==================================== Master script that orchestrates all data canonicalization steps. This is the NEW pipeline that performs local identity resolution BEFORE any CloudKit upload. Pipeline Stages: 1. SCRAPE: Fetch raw data from web sources 2. CANONICALIZE STADIUMS: Generate canonical stadium IDs and aliases 3. CANONICALIZE TEAMS: Match teams to stadiums, generate canonical IDs 4. CANONICALIZE GAMES: Resolve all references, generate canonical IDs 5. VALIDATE: Verify all data is internally consistent 6. (Optional) UPLOAD: CloudKit upload (separate script) Usage: python run_canonicalization_pipeline.py # Full pipeline python run_canonicalization_pipeline.py --season 2026 # Specify season python run_canonicalization_pipeline.py --skip-scrape # Use existing raw data python run_canonicalization_pipeline.py --verbose # Detailed output """ import argparse import json import sys from datetime import datetime from pathlib import Path from dataclasses import dataclass, asdict # Import pipeline components from scrape_schedules import ( ScraperSource, scrape_with_fallback, # NBA sources scrape_nba_basketball_reference, scrape_nba_espn, scrape_nba_cbssports, # MLB sources scrape_mlb_statsapi, scrape_mlb_baseball_reference, scrape_mlb_espn, # NHL sources scrape_nhl_hockey_reference, scrape_nhl_espn, scrape_nhl_api, # NFL sources scrape_nfl_espn, scrape_nfl_pro_football_reference, scrape_nfl_cbssports, # WNBA sources scrape_wnba_espn, scrape_wnba_basketball_reference, scrape_wnba_cbssports, # MLS sources scrape_mls_espn, scrape_mls_fbref, scrape_mls_mlssoccer, # NWSL sources scrape_nwsl_espn, scrape_nwsl_fbref, scrape_nwsl_nwslsoccer, # CBB sources scrape_cbb_espn, scrape_cbb_sports_reference, scrape_cbb_cbssports, # Utilities generate_stadiums_from_teams, assign_stable_ids, export_to_json, ) from canonicalize_stadiums import ( canonicalize_stadiums, add_historical_aliases, deduplicate_aliases, ) from canonicalize_teams import canonicalize_all_teams from canonicalize_games import canonicalize_games from validate_canonical import validate_canonical_data @dataclass class PipelineResult: """Result of the full canonicalization pipeline.""" success: bool stadiums_count: int teams_count: int games_count: int aliases_count: int validation_errors: int validation_warnings: int duration_seconds: float output_dir: str def print_header(text: str): """Print a formatted header.""" print() print("=" * 70) print(f" {text}") print("=" * 70) def print_section(text: str): """Print a section header.""" print() print(f"--- {text} ---") def run_pipeline( season: int = 2026, output_dir: Path = Path('./data'), skip_scrape: bool = False, validate: bool = True, verbose: bool = False, ) -> PipelineResult: """ Run the complete canonicalization pipeline. Args: season: Season year (e.g., 2026) output_dir: Directory for output files skip_scrape: Skip scraping, use existing raw data validate: Run validation step verbose: Print detailed output Returns: PipelineResult with statistics """ start_time = datetime.now() output_dir.mkdir(parents=True, exist_ok=True) # ========================================================================= # STAGE 1: SCRAPE RAW DATA # ========================================================================= if not skip_scrape: print_header("STAGE 1: SCRAPING RAW DATA") all_games = [] all_stadiums = [] # Scrape stadiums from team mappings print_section("Stadiums") all_stadiums = generate_stadiums_from_teams() print(f" Generated {len(all_stadiums)} stadiums from team data") # Scrape all sports with multi-source fallback print_section(f"NBA {season}") nba_sources = [ ScraperSource('Basketball-Reference', scrape_nba_basketball_reference, priority=1, min_games=500), ScraperSource('ESPN', scrape_nba_espn, priority=2, min_games=500), ScraperSource('CBS Sports', scrape_nba_cbssports, priority=3, min_games=100), ] nba_games = scrape_with_fallback('NBA', season, nba_sources) nba_season = f"{season-1}-{str(season)[2:]}" nba_games = assign_stable_ids(nba_games, 'NBA', nba_season) all_games.extend(nba_games) print_section(f"MLB {season}") mlb_sources = [ ScraperSource('MLB Stats API', scrape_mlb_statsapi, priority=1, min_games=1000), ScraperSource('Baseball-Reference', scrape_mlb_baseball_reference, priority=2, min_games=500), ScraperSource('ESPN', scrape_mlb_espn, priority=3, min_games=500), ] mlb_games = scrape_with_fallback('MLB', season, mlb_sources) mlb_games = assign_stable_ids(mlb_games, 'MLB', str(season)) all_games.extend(mlb_games) print_section(f"NHL {season}") nhl_sources = [ ScraperSource('Hockey-Reference', scrape_nhl_hockey_reference, priority=1, min_games=500), ScraperSource('ESPN', scrape_nhl_espn, priority=2, min_games=500), ScraperSource('NHL API', scrape_nhl_api, priority=3, min_games=100), ] nhl_games = scrape_with_fallback('NHL', season, nhl_sources) nhl_season = f"{season-1}-{str(season)[2:]}" nhl_games = assign_stable_ids(nhl_games, 'NHL', nhl_season) all_games.extend(nhl_games) print_section(f"NFL {season}") nfl_sources = [ ScraperSource('ESPN', scrape_nfl_espn, priority=1, min_games=200), ScraperSource('Pro-Football-Reference', scrape_nfl_pro_football_reference, priority=2, min_games=200), ScraperSource('CBS Sports', scrape_nfl_cbssports, priority=3, min_games=100), ] nfl_games = scrape_with_fallback('NFL', season, nfl_sources) nfl_season = f"{season-1}-{str(season)[2:]}" nfl_games = assign_stable_ids(nfl_games, 'NFL', nfl_season) all_games.extend(nfl_games) print_section(f"WNBA {season}") wnba_sources = [ ScraperSource('ESPN', scrape_wnba_espn, priority=1, min_games=100), ScraperSource('Basketball-Reference', scrape_wnba_basketball_reference, priority=2, min_games=100), ScraperSource('CBS Sports', scrape_wnba_cbssports, priority=3, min_games=50), ] wnba_games = scrape_with_fallback('WNBA', season, wnba_sources) wnba_games = assign_stable_ids(wnba_games, 'WNBA', str(season)) all_games.extend(wnba_games) print_section(f"MLS {season}") mls_sources = [ ScraperSource('ESPN', scrape_mls_espn, priority=1, min_games=200), ScraperSource('FBref', scrape_mls_fbref, priority=2, min_games=100), ScraperSource('MLSSoccer.com', scrape_mls_mlssoccer, priority=3, min_games=100), ] mls_games = scrape_with_fallback('MLS', season, mls_sources) mls_games = assign_stable_ids(mls_games, 'MLS', str(season)) all_games.extend(mls_games) print_section(f"NWSL {season}") nwsl_sources = [ ScraperSource('ESPN', scrape_nwsl_espn, priority=1, min_games=100), ScraperSource('FBref', scrape_nwsl_fbref, priority=2, min_games=50), ScraperSource('NWSL.com', scrape_nwsl_nwslsoccer, priority=3, min_games=50), ] nwsl_games = scrape_with_fallback('NWSL', season, nwsl_sources) nwsl_games = assign_stable_ids(nwsl_games, 'NWSL', str(season)) all_games.extend(nwsl_games) print_section(f"CBB {season}") cbb_sources = [ ScraperSource('ESPN', scrape_cbb_espn, priority=1, min_games=1000), ScraperSource('Sports-Reference', scrape_cbb_sports_reference, priority=2, min_games=500), ScraperSource('CBS Sports', scrape_cbb_cbssports, priority=3, min_games=300), ] cbb_games = scrape_with_fallback('CBB', season, cbb_sources) cbb_season = f"{season-1}-{str(season)[2:]}" cbb_games = assign_stable_ids(cbb_games, 'CBB', cbb_season) all_games.extend(cbb_games) # Export raw data print_section("Exporting Raw Data") export_to_json(all_games, all_stadiums, output_dir) print(f" Exported to {output_dir}") raw_games = [g.__dict__ for g in all_games] raw_stadiums = [s.__dict__ for s in all_stadiums] else: print_header("LOADING EXISTING RAW DATA") # Try loading from new structure first (games/*.json) games_dir = output_dir / 'games' raw_games = [] if games_dir.exists() and any(games_dir.glob('*.json')): print_section("Loading from games/ directory") for games_file in sorted(games_dir.glob('*.json')): with open(games_file) as f: file_games = json.load(f) raw_games.extend(file_games) print(f" Loaded {len(file_games):,} games from {games_file.name}") else: # Fallback to legacy games.json print_section("Loading from legacy games.json") games_file = output_dir / 'games.json' with open(games_file) as f: raw_games = json.load(f) print(f" Total: {len(raw_games):,} raw games") # Try loading stadiums from canonical/ first, then legacy canonical_dir = output_dir / 'canonical' if (canonical_dir / 'stadiums.json').exists(): with open(canonical_dir / 'stadiums.json') as f: raw_stadiums = json.load(f) print(f" Loaded {len(raw_stadiums)} raw stadiums from canonical/stadiums.json") else: with open(output_dir / 'stadiums.json') as f: raw_stadiums = json.load(f) print(f" Loaded {len(raw_stadiums)} raw stadiums from stadiums.json") # ========================================================================= # STAGE 2: CANONICALIZE STADIUMS # ========================================================================= print_header("STAGE 2: CANONICALIZING STADIUMS") canonical_stadiums, stadium_aliases = canonicalize_stadiums( raw_stadiums, verbose=verbose ) print(f" Created {len(canonical_stadiums)} canonical stadiums") # Add historical aliases canonical_ids = {s.canonical_id for s in canonical_stadiums} stadium_aliases = add_historical_aliases(stadium_aliases, canonical_ids) stadium_aliases = deduplicate_aliases(stadium_aliases) print(f" Created {len(stadium_aliases)} stadium aliases") # Export stadiums_canonical_path = output_dir / 'stadiums_canonical.json' aliases_path = output_dir / 'stadium_aliases.json' with open(stadiums_canonical_path, 'w') as f: json.dump([asdict(s) for s in canonical_stadiums], f, indent=2) with open(aliases_path, 'w') as f: json.dump([asdict(a) for a in stadium_aliases], f, indent=2) print(f" Exported to {stadiums_canonical_path}") print(f" Exported to {aliases_path}") # ========================================================================= # STAGE 3: CANONICALIZE TEAMS # ========================================================================= print_header("STAGE 3: CANONICALIZING TEAMS") # Convert canonical stadiums to dicts for team matching stadiums_list = [asdict(s) for s in canonical_stadiums] canonical_teams, team_warnings = canonicalize_all_teams( stadiums_list, verbose=verbose ) print(f" Created {len(canonical_teams)} canonical teams") if team_warnings: print(f" Warnings: {len(team_warnings)}") if verbose: for w in team_warnings: print(f" - {w.team_canonical_id}: {w.issue}") # Export teams_canonical_path = output_dir / 'teams_canonical.json' with open(teams_canonical_path, 'w') as f: json.dump([asdict(t) for t in canonical_teams], f, indent=2) print(f" Exported to {teams_canonical_path}") # ========================================================================= # STAGE 4: CANONICALIZE GAMES # ========================================================================= print_header("STAGE 4: CANONICALIZING GAMES") # Convert data to dicts for game canonicalization teams_list = [asdict(t) for t in canonical_teams] aliases_list = [asdict(a) for a in stadium_aliases] canonical_games_list, game_warnings = canonicalize_games( raw_games, teams_list, aliases_list, verbose=verbose ) print(f" Created {len(canonical_games_list)} canonical games") if game_warnings: print(f" Warnings: {len(game_warnings)}") if verbose: from collections import defaultdict by_issue = defaultdict(int) for w in game_warnings: by_issue[w.issue] += 1 for issue, count in by_issue.items(): print(f" - {issue}: {count}") # Export games to new structure: canonical/games/{sport}_{season}.json canonical_games_dir = output_dir / 'canonical' / 'games' canonical_games_dir.mkdir(parents=True, exist_ok=True) # Group games by sport and season games_by_sport_season = {} for game in canonical_games_list: sport = game.sport.lower() season = game.season key = f"{sport}_{season}" if key not in games_by_sport_season: games_by_sport_season[key] = [] games_by_sport_season[key].append(game) # Export each sport/season file for key, sport_games in sorted(games_by_sport_season.items()): filepath = canonical_games_dir / f"{key}.json" with open(filepath, 'w') as f: json.dump([asdict(g) for g in sport_games], f, indent=2) print(f" Exported {len(sport_games):,} games to canonical/games/{key}.json") # Also export combined games_canonical.json for backward compatibility games_canonical_path = output_dir / 'games_canonical.json' with open(games_canonical_path, 'w') as f: json.dump([asdict(g) for g in canonical_games_list], f, indent=2) print(f" Exported combined to {games_canonical_path}") # ========================================================================= # STAGE 5: VALIDATE # ========================================================================= validation_result = None if validate: print_header("STAGE 5: VALIDATION") # Reload as dicts for validation canonical_stadiums_dicts = [asdict(s) for s in canonical_stadiums] canonical_teams_dicts = [asdict(t) for t in canonical_teams] canonical_games_dicts = [asdict(g) for g in canonical_games_list] aliases_dicts = [asdict(a) for a in stadium_aliases] validation_result = validate_canonical_data( canonical_stadiums_dicts, canonical_teams_dicts, canonical_games_dicts, aliases_dicts, verbose=verbose ) if validation_result.is_valid: print(f" STATUS: PASSED") else: print(f" STATUS: FAILED") print(f" Errors: {validation_result.error_count}") print(f" Warnings: {validation_result.warning_count}") # Export validation report validation_path = output_dir / 'canonicalization_validation.json' with open(validation_path, 'w') as f: json.dump({ 'is_valid': validation_result.is_valid, 'error_count': validation_result.error_count, 'warning_count': validation_result.warning_count, 'summary': validation_result.summary, 'errors': validation_result.errors[:100], # Limit to 100 for readability }, f, indent=2) print(f" Report exported to {validation_path}") # ========================================================================= # SUMMARY # ========================================================================= duration = (datetime.now() - start_time).total_seconds() print_header("PIPELINE COMPLETE") print() print(f" Duration: {duration:.1f} seconds") print(f" Stadiums: {len(canonical_stadiums)}") print(f" Teams: {len(canonical_teams)}") print(f" Games: {len(canonical_games_list)}") print(f" Aliases: {len(stadium_aliases)}") print() # Games by sport print(" Games by sport:") by_sport = {} for g in canonical_games_list: by_sport[g.sport] = by_sport.get(g.sport, 0) + 1 for sport, count in sorted(by_sport.items()): print(f" {sport}: {count:,} games") print() print(" Output files:") print(f" - {output_dir / 'stadiums_canonical.json'}") print(f" - {output_dir / 'stadium_aliases.json'}") print(f" - {output_dir / 'teams_canonical.json'}") print(f" - {output_dir / 'games_canonical.json'} (combined)") print(f" - {output_dir / 'canonical' / 'games' / '*.json'} (by sport/season)") print(f" - {output_dir / 'canonicalization_validation.json'}") print() # Final status success = True if validation_result and not validation_result.is_valid: success = False print(" PIPELINE FAILED - Validation errors detected") print(" CloudKit upload should NOT proceed until errors are fixed") else: print(" PIPELINE SUCCEEDED - Ready for CloudKit upload") print() return PipelineResult( success=success, stadiums_count=len(canonical_stadiums), teams_count=len(canonical_teams), games_count=len(canonical_games_list), aliases_count=len(stadium_aliases), validation_errors=validation_result.error_count if validation_result else 0, validation_warnings=validation_result.warning_count if validation_result else 0, duration_seconds=duration, output_dir=str(output_dir), ) def main(): parser = argparse.ArgumentParser( description='SportsTime Canonicalization Pipeline', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Pipeline Stages: 1. SCRAPE: Fetch raw data from web sources 2. CANONICALIZE STADIUMS: Generate canonical IDs and aliases 3. CANONICALIZE TEAMS: Match teams to stadiums 4. CANONICALIZE GAMES: Resolve all references 5. VALIDATE: Verify internal consistency Examples: python run_canonicalization_pipeline.py # Full pipeline python run_canonicalization_pipeline.py --season 2026 # Different season python run_canonicalization_pipeline.py --skip-scrape # Use existing raw data python run_canonicalization_pipeline.py --verbose # Show all details """ ) parser.add_argument( '--season', type=int, default=2026, help='Season year (default: 2026)' ) parser.add_argument( '--output', type=str, default='./data', help='Output directory (default: ./data)' ) parser.add_argument( '--skip-scrape', action='store_true', help='Skip scraping, use existing raw data files' ) parser.add_argument( '--no-validate', action='store_true', help='Skip validation step' ) parser.add_argument( '--verbose', '-v', action='store_true', help='Verbose output' ) parser.add_argument( '--strict', action='store_true', help='Exit with error code if validation fails' ) args = parser.parse_args() result = run_pipeline( season=args.season, output_dir=Path(args.output), skip_scrape=args.skip_scrape, validate=not args.no_validate, verbose=args.verbose, ) # Exit with error code if requested and validation failed if args.strict and not result.success: sys.exit(1) if __name__ == '__main__': main()