- Add local canonicalization pipeline (stadiums, teams, games) that generates deterministic canonical IDs before CloudKit upload - Fix CanonicalSyncService to use deterministic UUIDs from canonical IDs instead of random UUIDs from CloudKit records - Add SyncStadium/SyncTeam/SyncGame types to CloudKitService that preserve canonical ID relationships during sync - Add canonical ID field keys to CKModels for reading from CloudKit records - Bundle canonical JSON files (stadiums_canonical, teams_canonical, games_canonical, stadium_aliases) for consistent bootstrap data - Update BootstrapService to prefer canonical format files over legacy format This ensures all entities use consistent deterministic UUIDs derived from their canonical IDs, preventing duplicate records when syncing CloudKit data with bootstrapped local data. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
413 lines
14 KiB
Python
413 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
SportsTime Canonicalization Pipeline
|
|
====================================
|
|
Master script that orchestrates all data canonicalization steps.
|
|
|
|
This is the NEW pipeline that performs local identity resolution
|
|
BEFORE any CloudKit upload.
|
|
|
|
Pipeline Stages:
|
|
1. SCRAPE: Fetch raw data from web sources
|
|
2. CANONICALIZE STADIUMS: Generate canonical stadium IDs and aliases
|
|
3. CANONICALIZE TEAMS: Match teams to stadiums, generate canonical IDs
|
|
4. CANONICALIZE GAMES: Resolve all references, generate canonical IDs
|
|
5. VALIDATE: Verify all data is internally consistent
|
|
6. (Optional) UPLOAD: CloudKit upload (separate script)
|
|
|
|
Usage:
|
|
python run_canonicalization_pipeline.py # Full pipeline
|
|
python run_canonicalization_pipeline.py --season 2026 # Specify season
|
|
python run_canonicalization_pipeline.py --skip-scrape # Use existing raw data
|
|
python run_canonicalization_pipeline.py --verbose # Detailed output
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, asdict
|
|
|
|
# Import pipeline components
|
|
from scrape_schedules import (
|
|
scrape_nba_basketball_reference,
|
|
scrape_mlb_statsapi,
|
|
scrape_nhl_hockey_reference,
|
|
generate_stadiums_from_teams,
|
|
assign_stable_ids,
|
|
export_to_json,
|
|
)
|
|
from canonicalize_stadiums import (
|
|
canonicalize_stadiums,
|
|
add_historical_aliases,
|
|
deduplicate_aliases,
|
|
)
|
|
from canonicalize_teams import canonicalize_all_teams
|
|
from canonicalize_games import canonicalize_games
|
|
from validate_canonical import validate_canonical_data
|
|
|
|
|
|
@dataclass
|
|
class PipelineResult:
|
|
"""Result of the full canonicalization pipeline."""
|
|
success: bool
|
|
stadiums_count: int
|
|
teams_count: int
|
|
games_count: int
|
|
aliases_count: int
|
|
validation_errors: int
|
|
validation_warnings: int
|
|
duration_seconds: float
|
|
output_dir: str
|
|
|
|
|
|
def print_header(text: str):
|
|
"""Print a formatted header."""
|
|
print()
|
|
print("=" * 70)
|
|
print(f" {text}")
|
|
print("=" * 70)
|
|
|
|
|
|
def print_section(text: str):
|
|
"""Print a section header."""
|
|
print()
|
|
print(f"--- {text} ---")
|
|
|
|
|
|
def run_pipeline(
|
|
season: int = 2026,
|
|
output_dir: Path = Path('./data'),
|
|
skip_scrape: bool = False,
|
|
validate: bool = True,
|
|
verbose: bool = False,
|
|
) -> PipelineResult:
|
|
"""
|
|
Run the complete canonicalization pipeline.
|
|
|
|
Args:
|
|
season: Season year (e.g., 2026)
|
|
output_dir: Directory for output files
|
|
skip_scrape: Skip scraping, use existing raw data
|
|
validate: Run validation step
|
|
verbose: Print detailed output
|
|
|
|
Returns:
|
|
PipelineResult with statistics
|
|
"""
|
|
start_time = datetime.now()
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# =========================================================================
|
|
# STAGE 1: SCRAPE RAW DATA
|
|
# =========================================================================
|
|
|
|
if not skip_scrape:
|
|
print_header("STAGE 1: SCRAPING RAW DATA")
|
|
|
|
all_games = []
|
|
all_stadiums = []
|
|
|
|
# Scrape stadiums from team mappings
|
|
print_section("Stadiums")
|
|
all_stadiums = generate_stadiums_from_teams()
|
|
print(f" Generated {len(all_stadiums)} stadiums from team data")
|
|
|
|
# Scrape NBA
|
|
print_section(f"NBA {season}")
|
|
nba_games = scrape_nba_basketball_reference(season)
|
|
nba_season = f"{season-1}-{str(season)[2:]}"
|
|
nba_games = assign_stable_ids(nba_games, 'NBA', nba_season)
|
|
all_games.extend(nba_games)
|
|
print(f" Scraped {len(nba_games)} NBA games")
|
|
|
|
# Scrape MLB
|
|
print_section(f"MLB {season}")
|
|
mlb_games = scrape_mlb_statsapi(season)
|
|
mlb_games = assign_stable_ids(mlb_games, 'MLB', str(season))
|
|
all_games.extend(mlb_games)
|
|
print(f" Scraped {len(mlb_games)} MLB games")
|
|
|
|
# Scrape NHL
|
|
print_section(f"NHL {season}")
|
|
nhl_games = scrape_nhl_hockey_reference(season)
|
|
nhl_season = f"{season-1}-{str(season)[2:]}"
|
|
nhl_games = assign_stable_ids(nhl_games, 'NHL', nhl_season)
|
|
all_games.extend(nhl_games)
|
|
print(f" Scraped {len(nhl_games)} NHL games")
|
|
|
|
# Export raw data
|
|
print_section("Exporting Raw Data")
|
|
export_to_json(all_games, all_stadiums, output_dir)
|
|
print(f" Exported to {output_dir}")
|
|
|
|
raw_games = [g.__dict__ for g in all_games]
|
|
raw_stadiums = [s.__dict__ for s in all_stadiums]
|
|
|
|
else:
|
|
print_header("LOADING EXISTING RAW DATA")
|
|
|
|
games_file = output_dir / 'games.json'
|
|
stadiums_file = output_dir / 'stadiums.json'
|
|
|
|
with open(games_file) as f:
|
|
raw_games = json.load(f)
|
|
print(f" Loaded {len(raw_games)} raw games")
|
|
|
|
with open(stadiums_file) as f:
|
|
raw_stadiums = json.load(f)
|
|
print(f" Loaded {len(raw_stadiums)} raw stadiums")
|
|
|
|
# =========================================================================
|
|
# STAGE 2: CANONICALIZE STADIUMS
|
|
# =========================================================================
|
|
|
|
print_header("STAGE 2: CANONICALIZING STADIUMS")
|
|
|
|
canonical_stadiums, stadium_aliases = canonicalize_stadiums(
|
|
raw_stadiums, verbose=verbose
|
|
)
|
|
print(f" Created {len(canonical_stadiums)} canonical stadiums")
|
|
|
|
# Add historical aliases
|
|
canonical_ids = {s.canonical_id for s in canonical_stadiums}
|
|
stadium_aliases = add_historical_aliases(stadium_aliases, canonical_ids)
|
|
stadium_aliases = deduplicate_aliases(stadium_aliases)
|
|
print(f" Created {len(stadium_aliases)} stadium aliases")
|
|
|
|
# Export
|
|
stadiums_canonical_path = output_dir / 'stadiums_canonical.json'
|
|
aliases_path = output_dir / 'stadium_aliases.json'
|
|
|
|
with open(stadiums_canonical_path, 'w') as f:
|
|
json.dump([asdict(s) for s in canonical_stadiums], f, indent=2)
|
|
|
|
with open(aliases_path, 'w') as f:
|
|
json.dump([asdict(a) for a in stadium_aliases], f, indent=2)
|
|
|
|
print(f" Exported to {stadiums_canonical_path}")
|
|
print(f" Exported to {aliases_path}")
|
|
|
|
# =========================================================================
|
|
# STAGE 3: CANONICALIZE TEAMS
|
|
# =========================================================================
|
|
|
|
print_header("STAGE 3: CANONICALIZING TEAMS")
|
|
|
|
# Convert canonical stadiums to dicts for team matching
|
|
stadiums_list = [asdict(s) for s in canonical_stadiums]
|
|
|
|
canonical_teams, team_warnings = canonicalize_all_teams(
|
|
stadiums_list, verbose=verbose
|
|
)
|
|
print(f" Created {len(canonical_teams)} canonical teams")
|
|
|
|
if team_warnings:
|
|
print(f" Warnings: {len(team_warnings)}")
|
|
if verbose:
|
|
for w in team_warnings:
|
|
print(f" - {w.team_canonical_id}: {w.issue}")
|
|
|
|
# Export
|
|
teams_canonical_path = output_dir / 'teams_canonical.json'
|
|
|
|
with open(teams_canonical_path, 'w') as f:
|
|
json.dump([asdict(t) for t in canonical_teams], f, indent=2)
|
|
|
|
print(f" Exported to {teams_canonical_path}")
|
|
|
|
# =========================================================================
|
|
# STAGE 4: CANONICALIZE GAMES
|
|
# =========================================================================
|
|
|
|
print_header("STAGE 4: CANONICALIZING GAMES")
|
|
|
|
# Convert data to dicts for game canonicalization
|
|
teams_list = [asdict(t) for t in canonical_teams]
|
|
aliases_list = [asdict(a) for a in stadium_aliases]
|
|
|
|
canonical_games_list, game_warnings = canonicalize_games(
|
|
raw_games, teams_list, aliases_list, verbose=verbose
|
|
)
|
|
print(f" Created {len(canonical_games_list)} canonical games")
|
|
|
|
if game_warnings:
|
|
print(f" Warnings: {len(game_warnings)}")
|
|
if verbose:
|
|
from collections import defaultdict
|
|
by_issue = defaultdict(int)
|
|
for w in game_warnings:
|
|
by_issue[w.issue] += 1
|
|
for issue, count in by_issue.items():
|
|
print(f" - {issue}: {count}")
|
|
|
|
# Export
|
|
games_canonical_path = output_dir / 'games_canonical.json'
|
|
|
|
with open(games_canonical_path, 'w') as f:
|
|
json.dump([asdict(g) for g in canonical_games_list], f, indent=2)
|
|
|
|
print(f" Exported to {games_canonical_path}")
|
|
|
|
# =========================================================================
|
|
# STAGE 5: VALIDATE
|
|
# =========================================================================
|
|
|
|
validation_result = None
|
|
if validate:
|
|
print_header("STAGE 5: VALIDATION")
|
|
|
|
# Reload as dicts for validation
|
|
canonical_stadiums_dicts = [asdict(s) for s in canonical_stadiums]
|
|
canonical_teams_dicts = [asdict(t) for t in canonical_teams]
|
|
canonical_games_dicts = [asdict(g) for g in canonical_games_list]
|
|
aliases_dicts = [asdict(a) for a in stadium_aliases]
|
|
|
|
validation_result = validate_canonical_data(
|
|
canonical_stadiums_dicts,
|
|
canonical_teams_dicts,
|
|
canonical_games_dicts,
|
|
aliases_dicts,
|
|
verbose=verbose
|
|
)
|
|
|
|
if validation_result.is_valid:
|
|
print(f" STATUS: PASSED")
|
|
else:
|
|
print(f" STATUS: FAILED")
|
|
|
|
print(f" Errors: {validation_result.error_count}")
|
|
print(f" Warnings: {validation_result.warning_count}")
|
|
|
|
# Export validation report
|
|
validation_path = output_dir / 'canonicalization_validation.json'
|
|
with open(validation_path, 'w') as f:
|
|
json.dump({
|
|
'is_valid': validation_result.is_valid,
|
|
'error_count': validation_result.error_count,
|
|
'warning_count': validation_result.warning_count,
|
|
'summary': validation_result.summary,
|
|
'errors': validation_result.errors[:100], # Limit to 100 for readability
|
|
}, f, indent=2)
|
|
print(f" Report exported to {validation_path}")
|
|
|
|
# =========================================================================
|
|
# SUMMARY
|
|
# =========================================================================
|
|
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
|
|
print_header("PIPELINE COMPLETE")
|
|
print()
|
|
print(f" Duration: {duration:.1f} seconds")
|
|
print(f" Stadiums: {len(canonical_stadiums)}")
|
|
print(f" Teams: {len(canonical_teams)}")
|
|
print(f" Games: {len(canonical_games_list)}")
|
|
print(f" Aliases: {len(stadium_aliases)}")
|
|
print()
|
|
|
|
# Games by sport
|
|
print(" Games by sport:")
|
|
by_sport = {}
|
|
for g in canonical_games_list:
|
|
by_sport[g.sport] = by_sport.get(g.sport, 0) + 1
|
|
for sport, count in sorted(by_sport.items()):
|
|
print(f" {sport}: {count:,} games")
|
|
|
|
print()
|
|
print(" Output files:")
|
|
print(f" - {output_dir / 'stadiums_canonical.json'}")
|
|
print(f" - {output_dir / 'stadium_aliases.json'}")
|
|
print(f" - {output_dir / 'teams_canonical.json'}")
|
|
print(f" - {output_dir / 'games_canonical.json'}")
|
|
print(f" - {output_dir / 'canonicalization_validation.json'}")
|
|
print()
|
|
|
|
# Final status
|
|
success = True
|
|
if validation_result and not validation_result.is_valid:
|
|
success = False
|
|
print(" PIPELINE FAILED - Validation errors detected")
|
|
print(" CloudKit upload should NOT proceed until errors are fixed")
|
|
else:
|
|
print(" PIPELINE SUCCEEDED - Ready for CloudKit upload")
|
|
|
|
print()
|
|
|
|
return PipelineResult(
|
|
success=success,
|
|
stadiums_count=len(canonical_stadiums),
|
|
teams_count=len(canonical_teams),
|
|
games_count=len(canonical_games_list),
|
|
aliases_count=len(stadium_aliases),
|
|
validation_errors=validation_result.error_count if validation_result else 0,
|
|
validation_warnings=validation_result.warning_count if validation_result else 0,
|
|
duration_seconds=duration,
|
|
output_dir=str(output_dir),
|
|
)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='SportsTime Canonicalization Pipeline',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Pipeline Stages:
|
|
1. SCRAPE: Fetch raw data from web sources
|
|
2. CANONICALIZE STADIUMS: Generate canonical IDs and aliases
|
|
3. CANONICALIZE TEAMS: Match teams to stadiums
|
|
4. CANONICALIZE GAMES: Resolve all references
|
|
5. VALIDATE: Verify internal consistency
|
|
|
|
Examples:
|
|
python run_canonicalization_pipeline.py # Full pipeline
|
|
python run_canonicalization_pipeline.py --season 2026 # Different season
|
|
python run_canonicalization_pipeline.py --skip-scrape # Use existing raw data
|
|
python run_canonicalization_pipeline.py --verbose # Show all details
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--season', type=int, default=2026,
|
|
help='Season year (default: 2026)'
|
|
)
|
|
parser.add_argument(
|
|
'--output', type=str, default='./data',
|
|
help='Output directory (default: ./data)'
|
|
)
|
|
parser.add_argument(
|
|
'--skip-scrape', action='store_true',
|
|
help='Skip scraping, use existing raw data files'
|
|
)
|
|
parser.add_argument(
|
|
'--no-validate', action='store_true',
|
|
help='Skip validation step'
|
|
)
|
|
parser.add_argument(
|
|
'--verbose', '-v', action='store_true',
|
|
help='Verbose output'
|
|
)
|
|
parser.add_argument(
|
|
'--strict', action='store_true',
|
|
help='Exit with error code if validation fails'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
result = run_pipeline(
|
|
season=args.season,
|
|
output_dir=Path(args.output),
|
|
skip_scrape=args.skip_scrape,
|
|
validate=not args.no_validate,
|
|
verbose=args.verbose,
|
|
)
|
|
|
|
# Exit with error code if requested and validation failed
|
|
if args.strict and not result.success:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|