Files
Sportstime/Scripts/validate_data.py
Trey t 63fb06c41a fix: update pipeline imports to use sport modules
After Phase 1 refactoring moved scraper functions to sport-specific
modules (nba.py, mlb.py, etc.), these pipeline scripts still imported
from scrape_schedules.py.

- run_pipeline.py: import from core.py and sport modules
- validate_data.py: import from core.py and sport modules
- run_canonicalization_pipeline.py: import from core.py and sport modules

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 10:52:13 -06:00

600 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Cross-Validation System for SportsTime App
Compares scraped data from multiple sources and flags discrepancies.
Usage:
python validate_data.py --data-dir ./data
python validate_data.py --scrape-and-validate --season 2025
"""
import argparse
import json
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, asdict, field
from typing import Optional
from collections import defaultdict
# Import from core module
from core import Game, Stadium, assign_stable_ids
# Import from sport modules
from nba import scrape_nba_basketball_reference, NBA_TEAMS
from mlb import scrape_mlb_statsapi, scrape_mlb_baseball_reference, MLB_TEAMS
from nhl import scrape_nhl_hockey_reference, NHL_TEAMS
from nfl import scrape_nfl_espn, NFL_TEAMS
# Import secondary sports from scrape_schedules (stubs)
from scrape_schedules import (
scrape_wnba_espn, scrape_mls_espn, scrape_nwsl_espn, scrape_cbb_espn,
WNBA_TEAMS, MLS_TEAMS, NWSL_TEAMS,
)
# =============================================================================
# VALIDATION DATA CLASSES
# =============================================================================
@dataclass
class Discrepancy:
"""Represents a discrepancy between sources."""
game_key: str
field: str # 'date', 'time', 'venue', 'teams', 'missing'
source1: str
source2: str
value1: str
value2: str
severity: str # 'high', 'medium', 'low'
@dataclass
class ValidationReport:
"""Summary of validation results."""
sport: str
season: str
sources: list
total_games_source1: int = 0
total_games_source2: int = 0
games_matched: int = 0
games_missing_source1: int = 0
games_missing_source2: int = 0
discrepancies: list = field(default_factory=list)
def to_dict(self):
return {
'sport': self.sport,
'season': self.season,
'sources': self.sources,
'total_games_source1': self.total_games_source1,
'total_games_source2': self.total_games_source2,
'games_matched': self.games_matched,
'games_missing_source1': self.games_missing_source1,
'games_missing_source2': self.games_missing_source2,
'discrepancies': [asdict(d) for d in self.discrepancies],
'discrepancy_summary': self.get_summary()
}
def get_summary(self):
by_field = defaultdict(int)
by_severity = defaultdict(int)
for d in self.discrepancies:
by_field[d.field] += 1
by_severity[d.severity] += 1
return {
'by_field': dict(by_field),
'by_severity': dict(by_severity)
}
# =============================================================================
# GAME KEY GENERATION
# =============================================================================
def normalize_abbrev(abbrev: str, sport: str) -> str:
"""Normalize team abbreviations across different sources."""
abbrev = abbrev.upper().strip()
if sport == 'MLB':
# MLB abbreviation mappings between sources
mlb_mappings = {
'AZ': 'ARI', 'ARI': 'ARI', # Arizona
'ATH': 'OAK', 'OAK': 'OAK', # Oakland/Athletics
'CWS': 'CHW', 'CHW': 'CHW', # Chicago White Sox
'KC': 'KCR', 'KCR': 'KCR', # Kansas City
'SD': 'SDP', 'SDP': 'SDP', # San Diego
'SF': 'SFG', 'SFG': 'SFG', # San Francisco
'TB': 'TBR', 'TBR': 'TBR', # Tampa Bay
'WSH': 'WSN', 'WSN': 'WSN', # Washington
}
return mlb_mappings.get(abbrev, abbrev)
elif sport == 'NBA':
nba_mappings = {
'PHX': 'PHO', 'PHO': 'PHO', # Phoenix
'BKN': 'BRK', 'BRK': 'BRK', # Brooklyn
'CHA': 'CHO', 'CHO': 'CHO', # Charlotte
'NOP': 'NOP', 'NO': 'NOP', # New Orleans
}
return nba_mappings.get(abbrev, abbrev)
elif sport == 'NHL':
nhl_mappings = {
'ARI': 'UTA', 'UTA': 'UTA', # Arizona moved to Utah
'VGS': 'VGK', 'VGK': 'VGK', # Vegas
}
return nhl_mappings.get(abbrev, abbrev)
return abbrev
def generate_game_key(game: Game) -> str:
"""
Generate a unique key for matching games across sources.
Uses date + normalized team abbreviations (sorted) to match.
"""
home = normalize_abbrev(game.home_team_abbrev, game.sport)
away = normalize_abbrev(game.away_team_abbrev, game.sport)
teams = sorted([home, away])
return f"{game.date}_{teams[0]}_{teams[1]}"
def normalize_team_name(name: str, sport: str) -> str:
"""Normalize team name variations."""
teams = {
'NBA': NBA_TEAMS, 'MLB': MLB_TEAMS, 'NHL': NHL_TEAMS,
'WNBA': WNBA_TEAMS, 'MLS': MLS_TEAMS, 'NWSL': NWSL_TEAMS,
'NFL': NFL_TEAMS,
}.get(sport, {})
name_lower = name.lower().strip()
# Check against known team names
for abbrev, info in teams.items():
if name_lower == info['name'].lower():
return abbrev
# Check city match
if name_lower == info['city'].lower():
return abbrev
# Check partial match
if name_lower in info['name'].lower() or info['name'].lower() in name_lower:
return abbrev
return name[:3].upper()
def normalize_venue(venue: str) -> str:
"""Normalize venue name for comparison."""
# Remove common variations
normalized = venue.lower().strip()
# Remove sponsorship prefixes that change
replacements = [
('at ', ''),
('the ', ''),
(' stadium', ''),
(' arena', ''),
(' center', ''),
(' field', ''),
(' park', ''),
('.com', ''),
('crypto', 'crypto.com'),
]
for old, new in replacements:
normalized = normalized.replace(old, new)
return normalized.strip()
def normalize_time(time_str: Optional[str]) -> Optional[str]:
"""Normalize time format to HH:MM."""
if not time_str:
return None
time_str = time_str.strip().lower()
# Handle various formats
if 'pm' in time_str or 'am' in time_str:
# 12-hour format
try:
for fmt in ['%I:%M%p', '%I:%M %p', '%I%p']:
try:
dt = datetime.strptime(time_str.replace(' ', ''), fmt)
return dt.strftime('%H:%M')
except:
continue
except:
pass
# Already 24-hour or just numbers
if ':' in time_str:
parts = time_str.split(':')
if len(parts) >= 2:
try:
hour = int(parts[0])
minute = int(parts[1][:2])
return f"{hour:02d}:{minute:02d}"
except:
pass
return time_str
# =============================================================================
# CROSS-VALIDATION LOGIC
# =============================================================================
def validate_games(
games1: list[Game],
games2: list[Game],
source1_name: str,
source2_name: str,
sport: str,
season: str
) -> ValidationReport:
"""
Compare two lists of games and find discrepancies.
"""
report = ValidationReport(
sport=sport,
season=season,
sources=[source1_name, source2_name],
total_games_source1=len(games1),
total_games_source2=len(games2)
)
# Index games by key
games1_by_key = {}
for g in games1:
key = generate_game_key(g)
games1_by_key[key] = g
games2_by_key = {}
for g in games2:
key = generate_game_key(g)
games2_by_key[key] = g
# Find matches and discrepancies
all_keys = set(games1_by_key.keys()) | set(games2_by_key.keys())
for key in all_keys:
g1 = games1_by_key.get(key)
g2 = games2_by_key.get(key)
if g1 and g2:
# Both sources have this game - compare fields
report.games_matched += 1
# Compare dates (should match by key, but double-check)
if g1.date != g2.date:
report.discrepancies.append(Discrepancy(
game_key=key,
field='date',
source1=source1_name,
source2=source2_name,
value1=g1.date,
value2=g2.date,
severity='high'
))
# Compare times
time1 = normalize_time(g1.time)
time2 = normalize_time(g2.time)
if time1 and time2 and time1 != time2:
# Check if times are close (within 1 hour - could be timezone)
try:
t1 = datetime.strptime(time1, '%H:%M')
t2 = datetime.strptime(time2, '%H:%M')
diff_minutes = abs((t1 - t2).total_seconds() / 60)
severity = 'low' if diff_minutes <= 60 else 'medium'
except:
severity = 'medium'
report.discrepancies.append(Discrepancy(
game_key=key,
field='time',
source1=source1_name,
source2=source2_name,
value1=time1 or '',
value2=time2 or '',
severity=severity
))
# Compare venues
venue1 = normalize_venue(g1.venue) if g1.venue else ''
venue2 = normalize_venue(g2.venue) if g2.venue else ''
if venue1 and venue2 and venue1 != venue2:
# Check for partial match
if venue1 not in venue2 and venue2 not in venue1:
report.discrepancies.append(Discrepancy(
game_key=key,
field='venue',
source1=source1_name,
source2=source2_name,
value1=g1.venue,
value2=g2.venue,
severity='low'
))
elif g1 and not g2:
# Game only in source 1
report.games_missing_source2 += 1
# Determine severity based on date
# Spring training (March before ~25th) and playoffs (Oct+) are expected differences
severity = 'high'
try:
game_date = datetime.strptime(g1.date, '%Y-%m-%d')
month = game_date.month
day = game_date.day
if month == 3 and day < 26: # Spring training
severity = 'medium'
elif month >= 10: # Playoffs/postseason
severity = 'medium'
except:
pass
report.discrepancies.append(Discrepancy(
game_key=key,
field='missing',
source1=source1_name,
source2=source2_name,
value1=f"{g1.away_team} @ {g1.home_team}",
value2='NOT FOUND',
severity=severity
))
else:
# Game only in source 2
report.games_missing_source1 += 1
# Determine severity based on date
severity = 'high'
try:
game_date = datetime.strptime(g2.date, '%Y-%m-%d')
month = game_date.month
day = game_date.day
if month == 3 and day < 26: # Spring training
severity = 'medium'
elif month >= 10: # Playoffs/postseason
severity = 'medium'
except:
pass
report.discrepancies.append(Discrepancy(
game_key=key,
field='missing',
source1=source1_name,
source2=source2_name,
value1='NOT FOUND',
value2=f"{g2.away_team} @ {g2.home_team}",
severity=severity
))
return report
def validate_stadiums(stadiums: list[Stadium]) -> list[dict]:
"""
Validate stadium data for completeness and accuracy.
"""
issues = []
for s in stadiums:
# Check for missing coordinates
if s.latitude == 0 or s.longitude == 0:
issues.append({
'stadium': s.name,
'sport': s.sport,
'issue': 'Missing coordinates',
'severity': 'high'
})
# Check for missing capacity
if s.capacity == 0:
issues.append({
'stadium': s.name,
'sport': s.sport,
'issue': 'Missing capacity',
'severity': 'low'
})
# Check coordinate bounds (roughly North America)
if s.latitude != 0:
if not (24 < s.latitude < 55):
issues.append({
'stadium': s.name,
'sport': s.sport,
'issue': f'Latitude {s.latitude} outside expected range',
'severity': 'medium'
})
if s.longitude != 0:
if not (-130 < s.longitude < -60):
issues.append({
'stadium': s.name,
'sport': s.sport,
'issue': f'Longitude {s.longitude} outside expected range',
'severity': 'medium'
})
return issues
# =============================================================================
# MULTI-SOURCE SCRAPING
# =============================================================================
def scrape_nba_all_sources(season: int) -> dict:
"""Scrape NBA from all available sources."""
nba_season = f"{season-1}-{str(season)[2:]}"
games = scrape_nba_basketball_reference(season)
games = assign_stable_ids(games, 'NBA', nba_season)
return {
'basketball-reference': games,
# ESPN requires JS rendering, skip for now
}
def scrape_mlb_all_sources(season: int) -> dict:
"""Scrape MLB from all available sources."""
mlb_season = str(season)
# MLB API uses official gamePk - already stable
api_games = scrape_mlb_statsapi(season)
# Baseball-Reference needs stable IDs
br_games = scrape_mlb_baseball_reference(season)
br_games = assign_stable_ids(br_games, 'MLB', mlb_season)
return {
'statsapi.mlb.com': api_games,
'baseball-reference': br_games,
}
def scrape_nhl_all_sources(season: int) -> dict:
"""Scrape NHL from all available sources."""
nhl_season = f"{season-1}-{str(season)[2:]}"
games = scrape_nhl_hockey_reference(season)
games = assign_stable_ids(games, 'NHL', nhl_season)
return {
'hockey-reference': games,
# NHL API requires date iteration, skip for now
}
# =============================================================================
# MAIN
# =============================================================================
def main():
parser = argparse.ArgumentParser(description='Validate sports data')
parser.add_argument('--data-dir', type=str, default='./data', help='Data directory')
parser.add_argument('--scrape-and-validate', action='store_true', help='Scrape fresh and validate')
parser.add_argument('--season', type=int, default=2025, help='Season year')
parser.add_argument('--sport', choices=['nba', 'mlb', 'nhl', 'nfl', 'wnba', 'mls', 'nwsl', 'cbb', 'all'], default='all')
parser.add_argument('--output', type=str, default='./data/validation_report.json')
args = parser.parse_args()
reports = []
stadium_issues = []
if args.scrape_and_validate:
print("\n" + "="*60)
print("CROSS-VALIDATION MODE")
print("="*60)
# MLB has two good sources - validate
if args.sport in ['mlb', 'all']:
print(f"\n--- MLB {args.season} ---")
mlb_sources = scrape_mlb_all_sources(args.season)
source_names = list(mlb_sources.keys())
if len(source_names) >= 2:
games1 = mlb_sources[source_names[0]]
games2 = mlb_sources[source_names[1]]
if games1 and games2:
report = validate_games(
games1, games2,
source_names[0], source_names[1],
'MLB', str(args.season)
)
reports.append(report)
print(f" Compared {report.total_games_source1} vs {report.total_games_source2} games")
print(f" Matched: {report.games_matched}")
print(f" Discrepancies: {len(report.discrepancies)}")
# NBA (single source for now, but validate data quality)
if args.sport in ['nba', 'all']:
print(f"\n--- NBA {args.season} ---")
nba_sources = scrape_nba_all_sources(args.season)
games = nba_sources.get('basketball-reference', [])
print(f" Got {len(games)} games from Basketball-Reference")
# Validate internal consistency
teams_seen = defaultdict(int)
for g in games:
teams_seen[g.home_team_abbrev] += 1
teams_seen[g.away_team_abbrev] += 1
# Each team should have ~82 games
for team, count in teams_seen.items():
if count < 70 or count > 95:
print(f" Warning: {team} has {count} games (expected ~82)")
else:
# Load existing data and validate
data_dir = Path(args.data_dir)
# Load games
games_file = data_dir / 'games.json'
if games_file.exists():
with open(games_file) as f:
games_data = json.load(f)
print(f"\nLoaded {len(games_data)} games from {games_file}")
# Group by sport and validate counts
by_sport = defaultdict(list)
for g in games_data:
by_sport[g['sport']].append(g)
for sport, sport_games in by_sport.items():
print(f" {sport}: {len(sport_games)} games")
# Load and validate stadiums
stadiums_file = data_dir / 'stadiums.json'
if stadiums_file.exists():
with open(stadiums_file) as f:
stadiums_data = json.load(f)
stadiums = [Stadium(**s) for s in stadiums_data]
print(f"\nLoaded {len(stadiums)} stadiums from {stadiums_file}")
stadium_issues = validate_stadiums(stadiums)
if stadium_issues:
print(f"\nStadium validation issues ({len(stadium_issues)}):")
for issue in stadium_issues[:10]:
print(f" [{issue['severity'].upper()}] {issue['stadium']}: {issue['issue']}")
if len(stadium_issues) > 10:
print(f" ... and {len(stadium_issues) - 10} more")
# Save validation report
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
full_report = {
'generated_at': datetime.now().isoformat(),
'season': args.season,
'game_validations': [r.to_dict() for r in reports],
'stadium_issues': stadium_issues
}
with open(output_path, 'w') as f:
json.dump(full_report, f, indent=2)
print(f"\n Validation report saved to {output_path}")
# Summary
print("\n" + "="*60)
print("VALIDATION SUMMARY")
print("="*60)
total_discrepancies = sum(len(r.discrepancies) for r in reports)
high_severity = sum(
1 for r in reports
for d in r.discrepancies
if d.severity == 'high'
)
print(f"Total game validation reports: {len(reports)}")
print(f"Total discrepancies found: {total_discrepancies}")
print(f"High severity issues: {high_severity}")
print(f"Stadium data issues: {len(stadium_issues)}")
if __name__ == '__main__':
main()