Files
Sportstime/Scripts/run_canonicalization_pipeline.py
Trey t 7efcea7bd4 Add canonical ID pipeline and fix UUID consistency for CloudKit sync
- Add local canonicalization pipeline (stadiums, teams, games) that generates
  deterministic canonical IDs before CloudKit upload
- Fix CanonicalSyncService to use deterministic UUIDs from canonical IDs
  instead of random UUIDs from CloudKit records
- Add SyncStadium/SyncTeam/SyncGame types to CloudKitService that preserve
  canonical ID relationships during sync
- Add canonical ID field keys to CKModels for reading from CloudKit records
- Bundle canonical JSON files (stadiums_canonical, teams_canonical,
  games_canonical, stadium_aliases) for consistent bootstrap data
- Update BootstrapService to prefer canonical format files over legacy format

This ensures all entities use consistent deterministic UUIDs derived from
their canonical IDs, preventing duplicate records when syncing CloudKit
data with bootstrapped local data.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 10:30:09 -06:00

413 lines
14 KiB
Python

#!/usr/bin/env python3
"""
SportsTime Canonicalization Pipeline
====================================
Master script that orchestrates all data canonicalization steps.
This is the NEW pipeline that performs local identity resolution
BEFORE any CloudKit upload.
Pipeline Stages:
1. SCRAPE: Fetch raw data from web sources
2. CANONICALIZE STADIUMS: Generate canonical stadium IDs and aliases
3. CANONICALIZE TEAMS: Match teams to stadiums, generate canonical IDs
4. CANONICALIZE GAMES: Resolve all references, generate canonical IDs
5. VALIDATE: Verify all data is internally consistent
6. (Optional) UPLOAD: CloudKit upload (separate script)
Usage:
python run_canonicalization_pipeline.py # Full pipeline
python run_canonicalization_pipeline.py --season 2026 # Specify season
python run_canonicalization_pipeline.py --skip-scrape # Use existing raw data
python run_canonicalization_pipeline.py --verbose # Detailed output
"""
import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, asdict
# Import pipeline components
from scrape_schedules import (
scrape_nba_basketball_reference,
scrape_mlb_statsapi,
scrape_nhl_hockey_reference,
generate_stadiums_from_teams,
assign_stable_ids,
export_to_json,
)
from canonicalize_stadiums import (
canonicalize_stadiums,
add_historical_aliases,
deduplicate_aliases,
)
from canonicalize_teams import canonicalize_all_teams
from canonicalize_games import canonicalize_games
from validate_canonical import validate_canonical_data
@dataclass
class PipelineResult:
"""Result of the full canonicalization pipeline."""
success: bool
stadiums_count: int
teams_count: int
games_count: int
aliases_count: int
validation_errors: int
validation_warnings: int
duration_seconds: float
output_dir: str
def print_header(text: str):
"""Print a formatted header."""
print()
print("=" * 70)
print(f" {text}")
print("=" * 70)
def print_section(text: str):
"""Print a section header."""
print()
print(f"--- {text} ---")
def run_pipeline(
season: int = 2026,
output_dir: Path = Path('./data'),
skip_scrape: bool = False,
validate: bool = True,
verbose: bool = False,
) -> PipelineResult:
"""
Run the complete canonicalization pipeline.
Args:
season: Season year (e.g., 2026)
output_dir: Directory for output files
skip_scrape: Skip scraping, use existing raw data
validate: Run validation step
verbose: Print detailed output
Returns:
PipelineResult with statistics
"""
start_time = datetime.now()
output_dir.mkdir(parents=True, exist_ok=True)
# =========================================================================
# STAGE 1: SCRAPE RAW DATA
# =========================================================================
if not skip_scrape:
print_header("STAGE 1: SCRAPING RAW DATA")
all_games = []
all_stadiums = []
# Scrape stadiums from team mappings
print_section("Stadiums")
all_stadiums = generate_stadiums_from_teams()
print(f" Generated {len(all_stadiums)} stadiums from team data")
# Scrape NBA
print_section(f"NBA {season}")
nba_games = scrape_nba_basketball_reference(season)
nba_season = f"{season-1}-{str(season)[2:]}"
nba_games = assign_stable_ids(nba_games, 'NBA', nba_season)
all_games.extend(nba_games)
print(f" Scraped {len(nba_games)} NBA games")
# Scrape MLB
print_section(f"MLB {season}")
mlb_games = scrape_mlb_statsapi(season)
mlb_games = assign_stable_ids(mlb_games, 'MLB', str(season))
all_games.extend(mlb_games)
print(f" Scraped {len(mlb_games)} MLB games")
# Scrape NHL
print_section(f"NHL {season}")
nhl_games = scrape_nhl_hockey_reference(season)
nhl_season = f"{season-1}-{str(season)[2:]}"
nhl_games = assign_stable_ids(nhl_games, 'NHL', nhl_season)
all_games.extend(nhl_games)
print(f" Scraped {len(nhl_games)} NHL games")
# Export raw data
print_section("Exporting Raw Data")
export_to_json(all_games, all_stadiums, output_dir)
print(f" Exported to {output_dir}")
raw_games = [g.__dict__ for g in all_games]
raw_stadiums = [s.__dict__ for s in all_stadiums]
else:
print_header("LOADING EXISTING RAW DATA")
games_file = output_dir / 'games.json'
stadiums_file = output_dir / 'stadiums.json'
with open(games_file) as f:
raw_games = json.load(f)
print(f" Loaded {len(raw_games)} raw games")
with open(stadiums_file) as f:
raw_stadiums = json.load(f)
print(f" Loaded {len(raw_stadiums)} raw stadiums")
# =========================================================================
# STAGE 2: CANONICALIZE STADIUMS
# =========================================================================
print_header("STAGE 2: CANONICALIZING STADIUMS")
canonical_stadiums, stadium_aliases = canonicalize_stadiums(
raw_stadiums, verbose=verbose
)
print(f" Created {len(canonical_stadiums)} canonical stadiums")
# Add historical aliases
canonical_ids = {s.canonical_id for s in canonical_stadiums}
stadium_aliases = add_historical_aliases(stadium_aliases, canonical_ids)
stadium_aliases = deduplicate_aliases(stadium_aliases)
print(f" Created {len(stadium_aliases)} stadium aliases")
# Export
stadiums_canonical_path = output_dir / 'stadiums_canonical.json'
aliases_path = output_dir / 'stadium_aliases.json'
with open(stadiums_canonical_path, 'w') as f:
json.dump([asdict(s) for s in canonical_stadiums], f, indent=2)
with open(aliases_path, 'w') as f:
json.dump([asdict(a) for a in stadium_aliases], f, indent=2)
print(f" Exported to {stadiums_canonical_path}")
print(f" Exported to {aliases_path}")
# =========================================================================
# STAGE 3: CANONICALIZE TEAMS
# =========================================================================
print_header("STAGE 3: CANONICALIZING TEAMS")
# Convert canonical stadiums to dicts for team matching
stadiums_list = [asdict(s) for s in canonical_stadiums]
canonical_teams, team_warnings = canonicalize_all_teams(
stadiums_list, verbose=verbose
)
print(f" Created {len(canonical_teams)} canonical teams")
if team_warnings:
print(f" Warnings: {len(team_warnings)}")
if verbose:
for w in team_warnings:
print(f" - {w.team_canonical_id}: {w.issue}")
# Export
teams_canonical_path = output_dir / 'teams_canonical.json'
with open(teams_canonical_path, 'w') as f:
json.dump([asdict(t) for t in canonical_teams], f, indent=2)
print(f" Exported to {teams_canonical_path}")
# =========================================================================
# STAGE 4: CANONICALIZE GAMES
# =========================================================================
print_header("STAGE 4: CANONICALIZING GAMES")
# Convert data to dicts for game canonicalization
teams_list = [asdict(t) for t in canonical_teams]
aliases_list = [asdict(a) for a in stadium_aliases]
canonical_games_list, game_warnings = canonicalize_games(
raw_games, teams_list, aliases_list, verbose=verbose
)
print(f" Created {len(canonical_games_list)} canonical games")
if game_warnings:
print(f" Warnings: {len(game_warnings)}")
if verbose:
from collections import defaultdict
by_issue = defaultdict(int)
for w in game_warnings:
by_issue[w.issue] += 1
for issue, count in by_issue.items():
print(f" - {issue}: {count}")
# Export
games_canonical_path = output_dir / 'games_canonical.json'
with open(games_canonical_path, 'w') as f:
json.dump([asdict(g) for g in canonical_games_list], f, indent=2)
print(f" Exported to {games_canonical_path}")
# =========================================================================
# STAGE 5: VALIDATE
# =========================================================================
validation_result = None
if validate:
print_header("STAGE 5: VALIDATION")
# Reload as dicts for validation
canonical_stadiums_dicts = [asdict(s) for s in canonical_stadiums]
canonical_teams_dicts = [asdict(t) for t in canonical_teams]
canonical_games_dicts = [asdict(g) for g in canonical_games_list]
aliases_dicts = [asdict(a) for a in stadium_aliases]
validation_result = validate_canonical_data(
canonical_stadiums_dicts,
canonical_teams_dicts,
canonical_games_dicts,
aliases_dicts,
verbose=verbose
)
if validation_result.is_valid:
print(f" STATUS: PASSED")
else:
print(f" STATUS: FAILED")
print(f" Errors: {validation_result.error_count}")
print(f" Warnings: {validation_result.warning_count}")
# Export validation report
validation_path = output_dir / 'canonicalization_validation.json'
with open(validation_path, 'w') as f:
json.dump({
'is_valid': validation_result.is_valid,
'error_count': validation_result.error_count,
'warning_count': validation_result.warning_count,
'summary': validation_result.summary,
'errors': validation_result.errors[:100], # Limit to 100 for readability
}, f, indent=2)
print(f" Report exported to {validation_path}")
# =========================================================================
# SUMMARY
# =========================================================================
duration = (datetime.now() - start_time).total_seconds()
print_header("PIPELINE COMPLETE")
print()
print(f" Duration: {duration:.1f} seconds")
print(f" Stadiums: {len(canonical_stadiums)}")
print(f" Teams: {len(canonical_teams)}")
print(f" Games: {len(canonical_games_list)}")
print(f" Aliases: {len(stadium_aliases)}")
print()
# Games by sport
print(" Games by sport:")
by_sport = {}
for g in canonical_games_list:
by_sport[g.sport] = by_sport.get(g.sport, 0) + 1
for sport, count in sorted(by_sport.items()):
print(f" {sport}: {count:,} games")
print()
print(" Output files:")
print(f" - {output_dir / 'stadiums_canonical.json'}")
print(f" - {output_dir / 'stadium_aliases.json'}")
print(f" - {output_dir / 'teams_canonical.json'}")
print(f" - {output_dir / 'games_canonical.json'}")
print(f" - {output_dir / 'canonicalization_validation.json'}")
print()
# Final status
success = True
if validation_result and not validation_result.is_valid:
success = False
print(" PIPELINE FAILED - Validation errors detected")
print(" CloudKit upload should NOT proceed until errors are fixed")
else:
print(" PIPELINE SUCCEEDED - Ready for CloudKit upload")
print()
return PipelineResult(
success=success,
stadiums_count=len(canonical_stadiums),
teams_count=len(canonical_teams),
games_count=len(canonical_games_list),
aliases_count=len(stadium_aliases),
validation_errors=validation_result.error_count if validation_result else 0,
validation_warnings=validation_result.warning_count if validation_result else 0,
duration_seconds=duration,
output_dir=str(output_dir),
)
def main():
parser = argparse.ArgumentParser(
description='SportsTime Canonicalization Pipeline',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Pipeline Stages:
1. SCRAPE: Fetch raw data from web sources
2. CANONICALIZE STADIUMS: Generate canonical IDs and aliases
3. CANONICALIZE TEAMS: Match teams to stadiums
4. CANONICALIZE GAMES: Resolve all references
5. VALIDATE: Verify internal consistency
Examples:
python run_canonicalization_pipeline.py # Full pipeline
python run_canonicalization_pipeline.py --season 2026 # Different season
python run_canonicalization_pipeline.py --skip-scrape # Use existing raw data
python run_canonicalization_pipeline.py --verbose # Show all details
"""
)
parser.add_argument(
'--season', type=int, default=2026,
help='Season year (default: 2026)'
)
parser.add_argument(
'--output', type=str, default='./data',
help='Output directory (default: ./data)'
)
parser.add_argument(
'--skip-scrape', action='store_true',
help='Skip scraping, use existing raw data files'
)
parser.add_argument(
'--no-validate', action='store_true',
help='Skip validation step'
)
parser.add_argument(
'--verbose', '-v', action='store_true',
help='Verbose output'
)
parser.add_argument(
'--strict', action='store_true',
help='Exit with error code if validation fails'
)
args = parser.parse_args()
result = run_pipeline(
season=args.season,
output_dir=Path(args.output),
skip_scrape=args.skip_scrape,
validate=not args.no_validate,
verbose=args.verbose,
)
# Exit with error code if requested and validation failed
if args.strict and not result.success:
sys.exit(1)
if __name__ == '__main__':
main()