feat(scripts): rewrite parser as modular Python CLI

Replace monolithic scraping scripts with sportstime_parser package:

- Multi-source scrapers with automatic fallback for 7 sports
- Canonical ID generation for games, teams, and stadiums
- Fuzzy matching with configurable thresholds for name resolution
- CloudKit Web Services uploader with JWT auth, diff-based updates
- Resumable uploads with checkpoint state persistence
- Validation reports with manual review items and suggested matches
- Comprehensive test suite (249 tests)

CLI: sportstime-parser scrape|validate|upload|status|retry|clear

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Trey t
2026-01-10 21:06:12 -06:00
parent 284a10d9e1
commit eeaf900e5a
109 changed files with 18415 additions and 266211 deletions

View File

@@ -0,0 +1,409 @@
"""Validation report generator for scraped data."""
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional
from ..config import EXPECTED_GAME_COUNTS, OUTPUT_DIR
from ..models.game import Game
from ..models.team import Team
from ..models.stadium import Stadium
from ..models.aliases import ManualReviewItem, ReviewReason
@dataclass
class ValidationSummary:
"""Summary statistics for validation report.
Attributes:
total_games: Total number of games scraped
valid_games: Number of games with all data resolved
review_count: Number of items needing manual review
unresolved_teams: Count of unresolved team names
unresolved_stadiums: Count of unresolved stadium names
duplicate_games: Count of potential duplicate games
missing_data: Count of games with missing required data
expected_games: Expected number of games for this sport
"""
total_games: int = 0
valid_games: int = 0
review_count: int = 0
unresolved_teams: int = 0
unresolved_stadiums: int = 0
duplicate_games: int = 0
missing_data: int = 0
expected_games: int = 0
@property
def game_coverage(self) -> float:
"""Percentage of expected games scraped."""
if self.expected_games == 0:
return 100.0
return (self.total_games / self.expected_games) * 100
@property
def validity_rate(self) -> float:
"""Percentage of games that are valid."""
if self.total_games == 0:
return 100.0
return (self.valid_games / self.total_games) * 100
@property
def needs_review(self) -> bool:
"""Check if report requires manual review."""
return self.review_count > 0
@property
def status_emoji(self) -> str:
"""Get status emoji for report header."""
if self.review_count == 0 and self.game_coverage >= 95:
return ""
elif self.review_count <= 10 and self.game_coverage >= 80:
return "⚠️"
else:
return ""
@dataclass
class ValidationReport:
"""Complete validation report for a sport/season.
Attributes:
sport: Sport code
season: Season start year
source: Name of the data source used
summary: Summary statistics
review_items: Items requiring manual review
games: All scraped games
teams: All teams
stadiums: All stadiums
generated_at: Timestamp of report generation
"""
sport: str
season: int
source: str
summary: ValidationSummary
review_items: list[ManualReviewItem] = field(default_factory=list)
games: list[Game] = field(default_factory=list)
teams: list[Team] = field(default_factory=list)
stadiums: list[Stadium] = field(default_factory=list)
generated_at: datetime = field(default_factory=datetime.now)
def to_markdown(self) -> str:
"""Generate markdown report.
Returns:
Complete markdown report as string
"""
lines = []
# Header
season_str = f"{self.season}-{str(self.season + 1)[-2:]}"
lines.append(f"# Validation Report: {self.sport.upper()} {season_str}")
lines.append("")
lines.append(f"**Generated**: {self.generated_at.strftime('%Y-%m-%d %H:%M:%S')} UTC")
lines.append(f"**Source**: {self.source}")
lines.append(f"**Status**: {self.summary.status_emoji} {'Needs Review' if self.summary.needs_review else 'Ready'}")
lines.append("")
# Summary table
lines.append("## Summary")
lines.append("")
lines.append("| Metric | Count |")
lines.append("|--------|-------|")
lines.append(f"| Total Games | {self.summary.total_games:,} |")
lines.append(f"| Valid Games | {self.summary.valid_games:,} |")
lines.append(f"| Expected Games | {self.summary.expected_games:,} |")
lines.append(f"| Coverage | {self.summary.game_coverage:.1f}% |")
lines.append(f"| Manual Review | {self.summary.review_count} |")
lines.append(f"| Unresolved Teams | {self.summary.unresolved_teams} |")
lines.append(f"| Unresolved Stadiums | {self.summary.unresolved_stadiums} |")
lines.append(f"| Duplicate Games | {self.summary.duplicate_games} |")
lines.append(f"| Missing Data | {self.summary.missing_data} |")
lines.append("")
# Manual review section
if self.review_items:
lines.append("## Manual Review Required")
lines.append("")
# Group by reason
by_reason: dict[ReviewReason, list[ManualReviewItem]] = {}
for item in self.review_items:
if item.reason not in by_reason:
by_reason[item.reason] = []
by_reason[item.reason].append(item)
for reason, items in sorted(by_reason.items(), key=lambda x: x[0].value):
reason_title = reason.value.replace("_", " ").title()
lines.append(f"### {reason_title} ({len(items)})")
lines.append("")
for item in items[:10]: # Limit to first 10 per category
lines.append(item.to_markdown())
if len(items) > 10:
lines.append(f"*... and {len(items) - 10} more items*")
lines.append("")
# Teams section
lines.append("## Teams")
lines.append("")
lines.append(f"Total teams: {len(self.teams)}")
lines.append("")
if self.teams:
lines.append("| ID | Full Name | City | Conference | Division |")
lines.append("|-----|-----------|------|------------|----------|")
for team in sorted(self.teams, key=lambda t: t.full_name)[:20]:
lines.append(
f"| `{team.id}` | {team.full_name} | {team.city} | "
f"{team.conference or '-'} | {team.division or '-'} |"
)
if len(self.teams) > 20:
lines.append(f"*... and {len(self.teams) - 20} more teams*")
lines.append("")
# Stadiums section
lines.append("## Stadiums")
lines.append("")
lines.append(f"Total stadiums: {len(self.stadiums)}")
lines.append("")
if self.stadiums:
lines.append("| ID | Name | City | State |")
lines.append("|-----|------|------|-------|")
for stadium in sorted(self.stadiums, key=lambda s: s.name)[:20]:
lines.append(
f"| `{stadium.id}` | {stadium.name} | "
f"{stadium.city} | {stadium.state} |"
)
if len(self.stadiums) > 20:
lines.append(f"*... and {len(self.stadiums) - 20} more stadiums*")
lines.append("")
# Game samples section
lines.append("## Game Samples")
lines.append("")
if self.games:
# Show first 10 games
lines.append("### First 10 Games")
lines.append("")
lines.append("| ID | Date | Away | Home | Status |")
lines.append("|----|------|------|------|--------|")
for game in self.games[:10]:
date_str = game.game_date.strftime("%Y-%m-%d")
lines.append(
f"| `{game.id}` | {date_str} | {game.away_team_id} | "
f"{game.home_team_id} | {game.status} |"
)
lines.append("")
# Show games with issues
problem_games = [g for g in self.games if not g.stadium_id]
if problem_games:
lines.append("### Games Missing Stadium")
lines.append("")
lines.append("| ID | Date | Away | Home | Raw Stadium |")
lines.append("|----|------|------|------|-------------|")
for game in problem_games[:10]:
date_str = game.game_date.strftime("%Y-%m-%d")
lines.append(
f"| `{game.id}` | {date_str} | {game.away_team_id} | "
f"{game.home_team_id} | {game.raw_stadium or '-'} |"
)
if len(problem_games) > 10:
lines.append(f"*... and {len(problem_games) - 10} more*")
lines.append("")
lines.append("---")
lines.append("")
lines.append("*Generated by sportstime-parser*")
return "\n".join(lines)
def save(self, output_dir: Optional[Path] = None) -> Path:
"""Save report to markdown file.
Args:
output_dir: Directory to save to (default: OUTPUT_DIR)
Returns:
Path to saved file
"""
if output_dir is None:
output_dir = OUTPUT_DIR
output_dir.mkdir(parents=True, exist_ok=True)
filename = f"validation_{self.sport}_{self.season}.md"
filepath = output_dir / filename
with open(filepath, "w", encoding="utf-8") as f:
f.write(self.to_markdown())
return filepath
def generate_report(
sport: str,
season: int,
source: str,
games: list[Game],
teams: list[Team],
stadiums: list[Stadium],
review_items: list[ManualReviewItem],
) -> ValidationReport:
"""Generate a validation report from scraped data.
Args:
sport: Sport code
season: Season start year
source: Data source name
games: List of scraped games
teams: List of teams
stadiums: List of stadiums
review_items: Items requiring review
Returns:
Complete ValidationReport
"""
# Calculate summary
summary = ValidationSummary(
total_games=len(games),
expected_games=EXPECTED_GAME_COUNTS.get(sport, 0),
review_count=len(review_items),
)
# Count review item types
for item in review_items:
if item.reason == ReviewReason.UNRESOLVED_TEAM:
summary.unresolved_teams += 1
elif item.reason == ReviewReason.UNRESOLVED_STADIUM:
summary.unresolved_stadiums += 1
elif item.reason == ReviewReason.DUPLICATE_GAME:
summary.duplicate_games += 1
elif item.reason == ReviewReason.MISSING_DATA:
summary.missing_data += 1
# Count valid games (games with all required data)
valid_count = 0
for game in games:
if game.home_team_id and game.away_team_id:
valid_count += 1
summary.valid_games = valid_count
return ValidationReport(
sport=sport,
season=season,
source=source,
summary=summary,
review_items=review_items,
games=games,
teams=teams,
stadiums=stadiums,
)
def detect_duplicate_games(games: list[Game]) -> list[ManualReviewItem]:
"""Detect potential duplicate games.
Duplicates are identified by having the same:
- Home team
- Away team
- Date (ignoring time)
Args:
games: List of games to check
Returns:
List of ManualReviewItems for duplicates
"""
from uuid import uuid4
seen: dict[str, Game] = {}
duplicates: list[ManualReviewItem] = []
for game in games:
# Create a key for the game
key = (
f"{game.home_team_id}_{game.away_team_id}_"
f"{game.game_date.strftime('%Y%m%d')}"
)
if key in seen:
# Skip if it's a doubleheader (has game_number)
if game.game_number:
continue
existing = seen[key]
duplicates.append(
ManualReviewItem(
id=f"dup_{uuid4().hex[:8]}",
reason=ReviewReason.DUPLICATE_GAME,
sport=game.sport,
raw_value=f"{game.id} vs {existing.id}",
context={
"game1_id": existing.id,
"game2_id": game.id,
"date": game.game_date.strftime("%Y-%m-%d"),
"home": game.home_team_id,
"away": game.away_team_id,
},
game_date=game.game_date.date(),
)
)
else:
seen[key] = game
return duplicates
def validate_games(games: list[Game]) -> list[ManualReviewItem]:
"""Validate games and return issues found.
Checks:
- Missing stadium IDs
- Missing team IDs
- Invalid dates
- Duplicate games
Args:
games: List of games to validate
Returns:
List of ManualReviewItems for issues
"""
from uuid import uuid4
issues: list[ManualReviewItem] = []
for game in games:
# Check for missing stadium
if not game.stadium_id:
issues.append(
ManualReviewItem(
id=f"missing_{uuid4().hex[:8]}",
reason=ReviewReason.MISSING_DATA,
sport=game.sport,
raw_value=f"Game {game.id} missing stadium",
context={
"game_id": game.id,
"field": "stadium_id",
"raw_stadium": game.raw_stadium,
},
source_url=game.source_url,
game_date=game.game_date.date(),
)
)
# Check for duplicates
dup_issues = detect_duplicate_games(games)
issues.extend(dup_issues)
return issues