#!/usr/bin/env python3 """ SportsTime Data Pipeline ======================== Master script that orchestrates all data fetching, validation, and reporting. Usage: python run_pipeline.py # Full pipeline with defaults python run_pipeline.py --season 2026 # Specify season python run_pipeline.py --sport nba # Single sport only python run_pipeline.py --skip-scrape # Validate existing data only python run_pipeline.py --verbose # Detailed output """ import argparse import json import sys from datetime import datetime from pathlib import Path from dataclasses import dataclass from typing import Optional from enum import Enum # Import our modules from scrape_schedules import ( Game, Stadium, scrape_nba_basketball_reference, scrape_mlb_statsapi, scrape_mlb_baseball_reference, scrape_nhl_hockey_reference, generate_stadiums_from_teams, export_to_json, assign_stable_ids, ) from validate_data import ( validate_games, validate_stadiums, scrape_mlb_all_sources, scrape_nba_all_sources, scrape_nhl_all_sources, ValidationReport, ) class Severity(Enum): HIGH = "high" MEDIUM = "medium" LOW = "low" @dataclass class PipelineResult: success: bool games_scraped: int stadiums_scraped: int games_by_sport: dict validation_reports: list stadium_issues: list high_severity_count: int medium_severity_count: int low_severity_count: int output_dir: Path duration_seconds: float def print_header(text: str): """Print a formatted header.""" print() print("=" * 70) print(f" {text}") print("=" * 70) def print_section(text: str): """Print a section header.""" print() print(f"--- {text} ---") def print_severity(severity: str, message: str): """Print a message with severity indicator.""" icons = { 'high': '🔴 HIGH', 'medium': '🟡 MEDIUM', 'low': '🟢 LOW', } print(f" {icons.get(severity, '⚪')} {message}") def run_pipeline( season: int = 2025, sport: str = 'all', output_dir: Path = Path('./data'), skip_scrape: bool = False, validate: bool = True, verbose: bool = False, ) -> PipelineResult: """ Run the complete data pipeline. """ start_time = datetime.now() all_games = [] all_stadiums = [] games_by_sport = {} validation_reports = [] stadium_issues = [] output_dir.mkdir(parents=True, exist_ok=True) # ========================================================================= # PHASE 1: SCRAPE DATA # ========================================================================= if not skip_scrape: print_header("PHASE 1: SCRAPING DATA") # Scrape stadiums print_section("Stadiums") all_stadiums = generate_stadiums_from_teams() print(f" Generated {len(all_stadiums)} stadiums from team data") # Scrape by sport if sport in ['nba', 'all']: print_section(f"NBA {season}") nba_games = scrape_nba_basketball_reference(season) nba_season = f"{season-1}-{str(season)[2:]}" nba_games = assign_stable_ids(nba_games, 'NBA', nba_season) all_games.extend(nba_games) games_by_sport['NBA'] = len(nba_games) if sport in ['mlb', 'all']: print_section(f"MLB {season}") mlb_games = scrape_mlb_statsapi(season) # MLB API uses official gamePk - already stable all_games.extend(mlb_games) games_by_sport['MLB'] = len(mlb_games) if sport in ['nhl', 'all']: print_section(f"NHL {season}") nhl_games = scrape_nhl_hockey_reference(season) nhl_season = f"{season-1}-{str(season)[2:]}" nhl_games = assign_stable_ids(nhl_games, 'NHL', nhl_season) all_games.extend(nhl_games) games_by_sport['NHL'] = len(nhl_games) # Export data print_section("Exporting Data") export_to_json(all_games, all_stadiums, output_dir) print(f" Exported to {output_dir}") else: # Load existing data print_header("LOADING EXISTING DATA") games_file = output_dir / 'games.json' stadiums_file = output_dir / 'stadiums.json' if games_file.exists(): with open(games_file) as f: games_data = json.load(f) all_games = [Game(**g) for g in games_data] for g in all_games: games_by_sport[g.sport] = games_by_sport.get(g.sport, 0) + 1 print(f" Loaded {len(all_games)} games") if stadiums_file.exists(): with open(stadiums_file) as f: stadiums_data = json.load(f) all_stadiums = [Stadium(**s) for s in stadiums_data] print(f" Loaded {len(all_stadiums)} stadiums") # ========================================================================= # PHASE 2: VALIDATE DATA # ========================================================================= if validate: print_header("PHASE 2: CROSS-VALIDATION") # MLB validation (has two good sources) if sport in ['mlb', 'all']: print_section("MLB Cross-Validation") try: mlb_sources = scrape_mlb_all_sources(season) source_names = list(mlb_sources.keys()) if len(source_names) >= 2: games1 = mlb_sources[source_names[0]] games2 = mlb_sources[source_names[1]] if games1 and games2: report = validate_games( games1, games2, source_names[0], source_names[1], 'MLB', str(season) ) validation_reports.append(report) print(f" Sources: {source_names[0]} vs {source_names[1]}") print(f" Games compared: {report.total_games_source1} vs {report.total_games_source2}") print(f" Matched: {report.games_matched}") print(f" Discrepancies: {len(report.discrepancies)}") except Exception as e: print(f" Error during MLB validation: {e}") # Stadium validation print_section("Stadium Validation") stadium_issues = validate_stadiums(all_stadiums) print(f" Issues found: {len(stadium_issues)}") # Data quality checks print_section("Data Quality Checks") # Check game counts per team if sport in ['nba', 'all']: nba_games = [g for g in all_games if g.sport == 'NBA'] team_counts = {} for g in nba_games: team_counts[g.home_team_abbrev] = team_counts.get(g.home_team_abbrev, 0) + 1 team_counts[g.away_team_abbrev] = team_counts.get(g.away_team_abbrev, 0) + 1 for team, count in sorted(team_counts.items()): if count < 75 or count > 90: print(f" NBA: {team} has {count} games (expected ~82)") if sport in ['nhl', 'all']: nhl_games = [g for g in all_games if g.sport == 'NHL'] team_counts = {} for g in nhl_games: team_counts[g.home_team_abbrev] = team_counts.get(g.home_team_abbrev, 0) + 1 team_counts[g.away_team_abbrev] = team_counts.get(g.away_team_abbrev, 0) + 1 for team, count in sorted(team_counts.items()): if count < 75 or count > 90: print(f" NHL: {team} has {count} games (expected ~82)") # ========================================================================= # PHASE 3: GENERATE REPORT # ========================================================================= print_header("PHASE 3: DISCREPANCY REPORT") # Count by severity high_count = 0 medium_count = 0 low_count = 0 # Game discrepancies for report in validation_reports: for d in report.discrepancies: if d.severity == 'high': high_count += 1 elif d.severity == 'medium': medium_count += 1 else: low_count += 1 # Stadium issues for issue in stadium_issues: if issue['severity'] == 'high': high_count += 1 elif issue['severity'] == 'medium': medium_count += 1 else: low_count += 1 # Print summary print() print(f" 🔴 HIGH severity: {high_count}") print(f" 🟡 MEDIUM severity: {medium_count}") print(f" 🟢 LOW severity: {low_count}") print() # Print high severity issues (always) if high_count > 0: print_section("HIGH Severity Issues (Requires Attention)") shown = 0 max_show = 10 if not verbose else 50 for report in validation_reports: for d in report.discrepancies: if d.severity == 'high' and shown < max_show: print_severity('high', f"[{report.sport}] {d.field}: {d.game_key}") if verbose: print(f" {d.source1}: {d.value1}") print(f" {d.source2}: {d.value2}") shown += 1 for issue in stadium_issues: if issue['severity'] == 'high' and shown < max_show: print_severity('high', f"[Stadium] {issue['stadium']}: {issue['issue']}") shown += 1 if high_count > max_show: print(f" ... and {high_count - max_show} more (use --verbose to see all)") # Print medium severity if verbose if medium_count > 0 and verbose: print_section("MEDIUM Severity Issues") for report in validation_reports: for d in report.discrepancies: if d.severity == 'medium': print_severity('medium', f"[{report.sport}] {d.field}: {d.game_key}") for issue in stadium_issues: if issue['severity'] == 'medium': print_severity('medium', f"[Stadium] {issue['stadium']}: {issue['issue']}") # Save full report report_path = output_dir / 'pipeline_report.json' full_report = { 'generated_at': datetime.now().isoformat(), 'season': season, 'sport': sport, 'summary': { 'games_scraped': len(all_games), 'stadiums_scraped': len(all_stadiums), 'games_by_sport': games_by_sport, 'high_severity': high_count, 'medium_severity': medium_count, 'low_severity': low_count, }, 'game_validations': [r.to_dict() for r in validation_reports], 'stadium_issues': stadium_issues, } with open(report_path, 'w') as f: json.dump(full_report, f, indent=2) # ========================================================================= # FINAL SUMMARY # ========================================================================= duration = (datetime.now() - start_time).total_seconds() print_header("PIPELINE COMPLETE") print() print(f" Duration: {duration:.1f} seconds") print(f" Games: {len(all_games):,}") print(f" Stadiums: {len(all_stadiums)}") print(f" Output: {output_dir.absolute()}") print() for sport_name, count in sorted(games_by_sport.items()): print(f" {sport_name}: {count:,} games") print() print(f" Reports saved to:") print(f" - {output_dir / 'games.json'}") print(f" - {output_dir / 'stadiums.json'}") print(f" - {output_dir / 'pipeline_report.json'}") print() # Status indicator if high_count > 0: print(" ⚠️ STATUS: Review required - high severity issues found") elif medium_count > 0: print(" ✓ STATUS: Complete with warnings") else: print(" ✅ STATUS: All checks passed") print() return PipelineResult( success=high_count == 0, games_scraped=len(all_games), stadiums_scraped=len(all_stadiums), games_by_sport=games_by_sport, validation_reports=validation_reports, stadium_issues=stadium_issues, high_severity_count=high_count, medium_severity_count=medium_count, low_severity_count=low_count, output_dir=output_dir, duration_seconds=duration, ) def main(): parser = argparse.ArgumentParser( description='SportsTime Data Pipeline - Fetch, validate, and report on sports data', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python run_pipeline.py # Full pipeline python run_pipeline.py --season 2026 # Different season python run_pipeline.py --sport mlb # MLB only python run_pipeline.py --skip-scrape # Validate existing data python run_pipeline.py --verbose # Show all issues """ ) parser.add_argument( '--season', type=int, default=2025, help='Season year (default: 2025)' ) parser.add_argument( '--sport', choices=['nba', 'mlb', 'nhl', 'all'], default='all', help='Sport to process (default: all)' ) parser.add_argument( '--output', type=str, default='./data', help='Output directory (default: ./data)' ) parser.add_argument( '--skip-scrape', action='store_true', help='Skip scraping, validate existing data only' ) parser.add_argument( '--no-validate', action='store_true', help='Skip validation step' ) parser.add_argument( '--verbose', '-v', action='store_true', help='Verbose output with all issues' ) args = parser.parse_args() result = run_pipeline( season=args.season, sport=args.sport, output_dir=Path(args.output), skip_scrape=args.skip_scrape, validate=not args.no_validate, verbose=args.verbose, ) # Exit with error code if high severity issues sys.exit(0 if result.success else 1) if __name__ == '__main__': main()