feat(scripts): add sportstime-parser data pipeline

Complete Python package for scraping, normalizing, and uploading
sports schedule data to CloudKit. Includes:

- Multi-source scrapers for NBA, MLB, NFL, NHL, MLS, WNBA, NWSL
- Canonical ID system for teams, stadiums, and games
- Fuzzy matching with manual alias support
- CloudKit uploader with batch operations and deduplication
- Comprehensive test suite with fixtures
- WNBA abbreviation aliases for improved team resolution
- Alias validation script to detect orphan references

All 5 phases of data remediation plan completed:
- Phase 1: Alias fixes (team/stadium alias additions)
- Phase 2: NHL stadium coordinate fixes
- Phase 3: Re-scrape validation
- Phase 4: iOS bundle update
- Phase 5: Code quality improvements (WNBA aliases)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Trey t
2026-01-20 18:56:25 -06:00
parent ac78042a7e
commit 52d445bca4
76 changed files with 25065 additions and 0 deletions

View File

@@ -0,0 +1,32 @@
"""Validators for scraped data."""
from .report import (
ValidationReport,
ValidationSummary,
generate_report,
detect_duplicate_games,
validate_games,
)
from .schema import (
SchemaValidationError,
validate_canonical_stadium,
validate_canonical_team,
validate_canonical_game,
validate_and_raise,
validate_batch,
)
__all__ = [
"ValidationReport",
"ValidationSummary",
"generate_report",
"detect_duplicate_games",
"validate_games",
"SchemaValidationError",
"validate_canonical_stadium",
"validate_canonical_team",
"validate_canonical_game",
"validate_and_raise",
"validate_batch",
]

View File

@@ -0,0 +1,409 @@
"""Validation report generator for scraped data."""
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional
from ..config import EXPECTED_GAME_COUNTS, OUTPUT_DIR
from ..models.game import Game
from ..models.team import Team
from ..models.stadium import Stadium
from ..models.aliases import ManualReviewItem, ReviewReason
@dataclass
class ValidationSummary:
"""Summary statistics for validation report.
Attributes:
total_games: Total number of games scraped
valid_games: Number of games with all data resolved
review_count: Number of items needing manual review
unresolved_teams: Count of unresolved team names
unresolved_stadiums: Count of unresolved stadium names
duplicate_games: Count of potential duplicate games
missing_data: Count of games with missing required data
expected_games: Expected number of games for this sport
"""
total_games: int = 0
valid_games: int = 0
review_count: int = 0
unresolved_teams: int = 0
unresolved_stadiums: int = 0
duplicate_games: int = 0
missing_data: int = 0
expected_games: int = 0
@property
def game_coverage(self) -> float:
"""Percentage of expected games scraped."""
if self.expected_games == 0:
return 100.0
return (self.total_games / self.expected_games) * 100
@property
def validity_rate(self) -> float:
"""Percentage of games that are valid."""
if self.total_games == 0:
return 100.0
return (self.valid_games / self.total_games) * 100
@property
def needs_review(self) -> bool:
"""Check if report requires manual review."""
return self.review_count > 0
@property
def status_emoji(self) -> str:
"""Get status emoji for report header."""
if self.review_count == 0 and self.game_coverage >= 95:
return ""
elif self.review_count <= 10 and self.game_coverage >= 80:
return "⚠️"
else:
return ""
@dataclass
class ValidationReport:
"""Complete validation report for a sport/season.
Attributes:
sport: Sport code
season: Season start year
source: Name of the data source used
summary: Summary statistics
review_items: Items requiring manual review
games: All scraped games
teams: All teams
stadiums: All stadiums
generated_at: Timestamp of report generation
"""
sport: str
season: int
source: str
summary: ValidationSummary
review_items: list[ManualReviewItem] = field(default_factory=list)
games: list[Game] = field(default_factory=list)
teams: list[Team] = field(default_factory=list)
stadiums: list[Stadium] = field(default_factory=list)
generated_at: datetime = field(default_factory=datetime.now)
def to_markdown(self) -> str:
"""Generate markdown report.
Returns:
Complete markdown report as string
"""
lines = []
# Header
season_str = f"{self.season}-{str(self.season + 1)[-2:]}"
lines.append(f"# Validation Report: {self.sport.upper()} {season_str}")
lines.append("")
lines.append(f"**Generated**: {self.generated_at.strftime('%Y-%m-%d %H:%M:%S')} UTC")
lines.append(f"**Source**: {self.source}")
lines.append(f"**Status**: {self.summary.status_emoji} {'Needs Review' if self.summary.needs_review else 'Ready'}")
lines.append("")
# Summary table
lines.append("## Summary")
lines.append("")
lines.append("| Metric | Count |")
lines.append("|--------|-------|")
lines.append(f"| Total Games | {self.summary.total_games:,} |")
lines.append(f"| Valid Games | {self.summary.valid_games:,} |")
lines.append(f"| Expected Games | {self.summary.expected_games:,} |")
lines.append(f"| Coverage | {self.summary.game_coverage:.1f}% |")
lines.append(f"| Manual Review | {self.summary.review_count} |")
lines.append(f"| Unresolved Teams | {self.summary.unresolved_teams} |")
lines.append(f"| Unresolved Stadiums | {self.summary.unresolved_stadiums} |")
lines.append(f"| Duplicate Games | {self.summary.duplicate_games} |")
lines.append(f"| Missing Data | {self.summary.missing_data} |")
lines.append("")
# Manual review section
if self.review_items:
lines.append("## Manual Review Required")
lines.append("")
# Group by reason
by_reason: dict[ReviewReason, list[ManualReviewItem]] = {}
for item in self.review_items:
if item.reason not in by_reason:
by_reason[item.reason] = []
by_reason[item.reason].append(item)
for reason, items in sorted(by_reason.items(), key=lambda x: x[0].value):
reason_title = reason.value.replace("_", " ").title()
lines.append(f"### {reason_title} ({len(items)})")
lines.append("")
for item in items[:10]: # Limit to first 10 per category
lines.append(item.to_markdown())
if len(items) > 10:
lines.append(f"*... and {len(items) - 10} more items*")
lines.append("")
# Teams section
lines.append("## Teams")
lines.append("")
lines.append(f"Total teams: {len(self.teams)}")
lines.append("")
if self.teams:
lines.append("| ID | Full Name | City | Conference | Division |")
lines.append("|-----|-----------|------|------------|----------|")
for team in sorted(self.teams, key=lambda t: t.full_name)[:20]:
lines.append(
f"| `{team.id}` | {team.full_name} | {team.city} | "
f"{team.conference or '-'} | {team.division or '-'} |"
)
if len(self.teams) > 20:
lines.append(f"*... and {len(self.teams) - 20} more teams*")
lines.append("")
# Stadiums section
lines.append("## Stadiums")
lines.append("")
lines.append(f"Total stadiums: {len(self.stadiums)}")
lines.append("")
if self.stadiums:
lines.append("| ID | Name | City | State |")
lines.append("|-----|------|------|-------|")
for stadium in sorted(self.stadiums, key=lambda s: s.name)[:20]:
lines.append(
f"| `{stadium.id}` | {stadium.name} | "
f"{stadium.city} | {stadium.state} |"
)
if len(self.stadiums) > 20:
lines.append(f"*... and {len(self.stadiums) - 20} more stadiums*")
lines.append("")
# Game samples section
lines.append("## Game Samples")
lines.append("")
if self.games:
# Show first 10 games
lines.append("### First 10 Games")
lines.append("")
lines.append("| ID | Date | Away | Home | Status |")
lines.append("|----|------|------|------|--------|")
for game in self.games[:10]:
date_str = game.game_date.strftime("%Y-%m-%d")
lines.append(
f"| `{game.id}` | {date_str} | {game.away_team_id} | "
f"{game.home_team_id} | {game.status} |"
)
lines.append("")
# Show games with issues
problem_games = [g for g in self.games if not g.stadium_id]
if problem_games:
lines.append("### Games Missing Stadium")
lines.append("")
lines.append("| ID | Date | Away | Home | Raw Stadium |")
lines.append("|----|------|------|------|-------------|")
for game in problem_games[:10]:
date_str = game.game_date.strftime("%Y-%m-%d")
lines.append(
f"| `{game.id}` | {date_str} | {game.away_team_id} | "
f"{game.home_team_id} | {game.raw_stadium or '-'} |"
)
if len(problem_games) > 10:
lines.append(f"*... and {len(problem_games) - 10} more*")
lines.append("")
lines.append("---")
lines.append("")
lines.append("*Generated by sportstime-parser*")
return "\n".join(lines)
def save(self, output_dir: Optional[Path] = None) -> Path:
"""Save report to markdown file.
Args:
output_dir: Directory to save to (default: OUTPUT_DIR)
Returns:
Path to saved file
"""
if output_dir is None:
output_dir = OUTPUT_DIR
output_dir.mkdir(parents=True, exist_ok=True)
filename = f"validation_{self.sport}_{self.season}.md"
filepath = output_dir / filename
with open(filepath, "w", encoding="utf-8") as f:
f.write(self.to_markdown())
return filepath
def generate_report(
sport: str,
season: int,
source: str,
games: list[Game],
teams: list[Team],
stadiums: list[Stadium],
review_items: list[ManualReviewItem],
) -> ValidationReport:
"""Generate a validation report from scraped data.
Args:
sport: Sport code
season: Season start year
source: Data source name
games: List of scraped games
teams: List of teams
stadiums: List of stadiums
review_items: Items requiring review
Returns:
Complete ValidationReport
"""
# Calculate summary
summary = ValidationSummary(
total_games=len(games),
expected_games=EXPECTED_GAME_COUNTS.get(sport, 0),
review_count=len(review_items),
)
# Count review item types
for item in review_items:
if item.reason == ReviewReason.UNRESOLVED_TEAM:
summary.unresolved_teams += 1
elif item.reason == ReviewReason.UNRESOLVED_STADIUM:
summary.unresolved_stadiums += 1
elif item.reason == ReviewReason.DUPLICATE_GAME:
summary.duplicate_games += 1
elif item.reason == ReviewReason.MISSING_DATA:
summary.missing_data += 1
# Count valid games (games with all required data)
valid_count = 0
for game in games:
if game.home_team_id and game.away_team_id:
valid_count += 1
summary.valid_games = valid_count
return ValidationReport(
sport=sport,
season=season,
source=source,
summary=summary,
review_items=review_items,
games=games,
teams=teams,
stadiums=stadiums,
)
def detect_duplicate_games(games: list[Game]) -> list[ManualReviewItem]:
"""Detect potential duplicate games.
Duplicates are identified by having the same:
- Home team
- Away team
- Date (ignoring time)
Args:
games: List of games to check
Returns:
List of ManualReviewItems for duplicates
"""
from uuid import uuid4
seen: dict[str, Game] = {}
duplicates: list[ManualReviewItem] = []
for game in games:
# Create a key for the game
key = (
f"{game.home_team_id}_{game.away_team_id}_"
f"{game.game_date.strftime('%Y%m%d')}"
)
if key in seen:
# Skip if it's a doubleheader (has game_number)
if game.game_number:
continue
existing = seen[key]
duplicates.append(
ManualReviewItem(
id=f"dup_{uuid4().hex[:8]}",
reason=ReviewReason.DUPLICATE_GAME,
sport=game.sport,
raw_value=f"{game.id} vs {existing.id}",
context={
"game1_id": existing.id,
"game2_id": game.id,
"date": game.game_date.strftime("%Y-%m-%d"),
"home": game.home_team_id,
"away": game.away_team_id,
},
game_date=game.game_date.date(),
)
)
else:
seen[key] = game
return duplicates
def validate_games(games: list[Game]) -> list[ManualReviewItem]:
"""Validate games and return issues found.
Checks:
- Missing stadium IDs
- Missing team IDs
- Invalid dates
- Duplicate games
Args:
games: List of games to validate
Returns:
List of ManualReviewItems for issues
"""
from uuid import uuid4
issues: list[ManualReviewItem] = []
for game in games:
# Check for missing stadium
if not game.stadium_id:
issues.append(
ManualReviewItem(
id=f"missing_{uuid4().hex[:8]}",
reason=ReviewReason.MISSING_DATA,
sport=game.sport,
raw_value=f"Game {game.id} missing stadium",
context={
"game_id": game.id,
"field": "stadium_id",
"raw_stadium": game.raw_stadium,
},
source_url=game.source_url,
game_date=game.game_date.date(),
)
)
# Check for duplicates
dup_issues = detect_duplicate_games(games)
issues.extend(dup_issues)
return issues

View File

@@ -0,0 +1,246 @@
"""JSON Schema validation for canonical output matching iOS app expectations.
This module defines schemas that match the Swift structs in BootstrapService.swift:
- JSONCanonicalStadium
- JSONCanonicalTeam
- JSONCanonicalGame
Validation is performed at runtime before outputting JSON to ensure
Python output matches what the iOS app expects.
"""
import re
from dataclasses import dataclass
from typing import Any, Callable, Optional, Union
class SchemaValidationError(Exception):
"""Raised when canonical output fails schema validation."""
def __init__(self, model_type: str, errors: list[str]):
self.model_type = model_type
self.errors = errors
super().__init__(f"{model_type} schema validation failed:\n" + "\n".join(f" - {e}" for e in errors))
# ISO8601 UTC datetime pattern: YYYY-MM-DDTHH:MM:SSZ
ISO8601_UTC_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$")
# Season format patterns
SEASON_SPLIT_PATTERN = re.compile(r"^\d{4}-\d{2}$") # e.g., "2025-26"
SEASON_SINGLE_PATTERN = re.compile(r"^\d{4}$") # e.g., "2025"
@dataclass
class FieldSpec:
"""Specification for a field in the canonical schema."""
name: str
required: bool
field_type: Union[type, tuple]
validator: Optional[Callable] = None
# Schema definitions matching Swift structs in BootstrapService.swift
STADIUM_SCHEMA: list[FieldSpec] = [
FieldSpec("canonical_id", required=True, field_type=str),
FieldSpec("name", required=True, field_type=str),
FieldSpec("city", required=True, field_type=str),
FieldSpec("state", required=True, field_type=str),
FieldSpec("latitude", required=True, field_type=(int, float)),
FieldSpec("longitude", required=True, field_type=(int, float)),
FieldSpec("capacity", required=True, field_type=int),
FieldSpec("sport", required=True, field_type=str),
FieldSpec("primary_team_abbrevs", required=True, field_type=list),
FieldSpec("year_opened", required=False, field_type=(int, type(None))),
]
TEAM_SCHEMA: list[FieldSpec] = [
FieldSpec("canonical_id", required=True, field_type=str),
FieldSpec("name", required=True, field_type=str),
FieldSpec("abbreviation", required=True, field_type=str),
FieldSpec("sport", required=True, field_type=str),
FieldSpec("city", required=True, field_type=str),
FieldSpec("stadium_canonical_id", required=True, field_type=str),
FieldSpec("conference_id", required=False, field_type=(str, type(None))),
FieldSpec("division_id", required=False, field_type=(str, type(None))),
FieldSpec("primary_color", required=False, field_type=(str, type(None))),
FieldSpec("secondary_color", required=False, field_type=(str, type(None))),
]
GAME_SCHEMA: list[FieldSpec] = [
FieldSpec("canonical_id", required=True, field_type=str),
FieldSpec("sport", required=True, field_type=str),
FieldSpec(
"season",
required=True,
field_type=str,
validator=lambda v: SEASON_SPLIT_PATTERN.match(v) or SEASON_SINGLE_PATTERN.match(v),
),
FieldSpec(
"game_datetime_utc",
required=True,
field_type=str,
validator=lambda v: ISO8601_UTC_PATTERN.match(v),
),
FieldSpec("home_team_canonical_id", required=True, field_type=str),
FieldSpec("away_team_canonical_id", required=True, field_type=str),
FieldSpec("stadium_canonical_id", required=True, field_type=str),
FieldSpec("is_playoff", required=True, field_type=bool),
FieldSpec("broadcast", required=False, field_type=(str, type(None))),
]
def validate_field(data: dict[str, Any], spec: FieldSpec) -> list[str]:
"""Validate a single field against its specification.
Args:
data: The dictionary to validate
spec: The field specification
Returns:
List of error messages (empty if valid)
"""
errors = []
if spec.name not in data:
if spec.required:
errors.append(f"Missing required field: {spec.name}")
return errors
value = data[spec.name]
# Check type
if not isinstance(value, spec.field_type):
expected = spec.field_type.__name__ if isinstance(spec.field_type, type) else str(spec.field_type)
actual = type(value).__name__
errors.append(f"Field '{spec.name}' has wrong type: expected {expected}, got {actual} (value: {value!r})")
return errors
# Check custom validator
if spec.validator and value is not None:
if not spec.validator(value):
errors.append(f"Field '{spec.name}' failed validation: {value!r}")
return errors
def validate_canonical_stadium(data: dict[str, Any]) -> list[str]:
"""Validate a canonical stadium dictionary.
Args:
data: Stadium dictionary from to_canonical_dict()
Returns:
List of error messages (empty if valid)
"""
errors = []
for spec in STADIUM_SCHEMA:
errors.extend(validate_field(data, spec))
# Additional validation: primary_team_abbrevs should contain strings
if "primary_team_abbrevs" in data and isinstance(data["primary_team_abbrevs"], list):
for i, abbrev in enumerate(data["primary_team_abbrevs"]):
if not isinstance(abbrev, str):
errors.append(f"primary_team_abbrevs[{i}] must be string, got {type(abbrev).__name__}")
return errors
def validate_canonical_team(data: dict[str, Any]) -> list[str]:
"""Validate a canonical team dictionary.
Args:
data: Team dictionary from to_canonical_dict()
Returns:
List of error messages (empty if valid)
"""
errors = []
for spec in TEAM_SCHEMA:
errors.extend(validate_field(data, spec))
return errors
def validate_canonical_game(data: dict[str, Any]) -> list[str]:
"""Validate a canonical game dictionary.
Args:
data: Game dictionary from to_canonical_dict()
Returns:
List of error messages (empty if valid)
"""
errors = []
for spec in GAME_SCHEMA:
errors.extend(validate_field(data, spec))
return errors
def validate_and_raise(data: dict[str, Any], model_type: str) -> None:
"""Validate a canonical dictionary and raise on error.
Args:
data: Dictionary from to_canonical_dict()
model_type: One of 'stadium', 'team', 'game'
Raises:
SchemaValidationError: If validation fails
ValueError: If model_type is unknown
"""
validators = {
"stadium": validate_canonical_stadium,
"team": validate_canonical_team,
"game": validate_canonical_game,
}
if model_type not in validators:
raise ValueError(f"Unknown model type: {model_type}")
errors = validators[model_type](data)
if errors:
raise SchemaValidationError(model_type, errors)
def validate_batch(
items: list[dict[str, Any]],
model_type: str,
fail_fast: bool = True,
) -> list[tuple[int, list[str]]]:
"""Validate a batch of canonical dictionaries.
Args:
items: List of dictionaries from to_canonical_dict()
model_type: One of 'stadium', 'team', 'game'
fail_fast: If True, raise on first error; if False, collect all errors
Returns:
List of (index, errors) tuples for items with validation errors
Raises:
SchemaValidationError: If fail_fast=True and validation fails
"""
validators = {
"stadium": validate_canonical_stadium,
"team": validate_canonical_team,
"game": validate_canonical_game,
}
if model_type not in validators:
raise ValueError(f"Unknown model type: {model_type}")
validator = validators[model_type]
all_errors = []
for i, item in enumerate(items):
errors = validator(item)
if errors:
if fail_fast:
raise SchemaValidationError(
model_type,
[f"Item {i}: {e}" for e in errors],
)
all_errors.append((i, errors))
return all_errors