Files
Sportstime/Scripts/sportstime_parser/scrapers/nwsl.py
Trey t eeaf900e5a feat(scripts): rewrite parser as modular Python CLI
Replace monolithic scraping scripts with sportstime_parser package:

- Multi-source scrapers with automatic fallback for 7 sports
- Canonical ID generation for games, teams, and stadiums
- Fuzzy matching with configurable thresholds for name resolution
- CloudKit Web Services uploader with JWT auth, diff-based updates
- Resumable uploads with checkpoint state persistence
- Validation reports with manual review items and suggested matches
- Comprehensive test suite (249 tests)

CLI: sportstime-parser scrape|validate|upload|status|retry|clear

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 21:06:12 -06:00

386 lines
12 KiB
Python

"""NWSL scraper implementation with multi-source fallback."""
from datetime import datetime, date
from typing import Optional
from .base import BaseScraper, RawGameData, ScrapeResult
from ..models.game import Game
from ..models.team import Team
from ..models.stadium import Stadium
from ..models.aliases import ManualReviewItem
from ..normalizers.canonical_id import generate_game_id
from ..normalizers.team_resolver import (
TeamResolver,
TEAM_MAPPINGS,
get_team_resolver,
)
from ..normalizers.stadium_resolver import (
StadiumResolver,
STADIUM_MAPPINGS,
get_stadium_resolver,
)
from ..utils.logging import get_logger, log_game, log_warning
class NWSLScraper(BaseScraper):
"""NWSL schedule scraper with multi-source fallback.
Sources (in priority order):
1. ESPN API - Most reliable for NWSL
2. NWSL official (via ESPN) - Backup option
"""
def __init__(self, season: int, **kwargs):
"""Initialize NWSL scraper.
Args:
season: Season year (e.g., 2026 for 2026 season)
"""
super().__init__("nwsl", season, **kwargs)
self._team_resolver = get_team_resolver("nwsl")
self._stadium_resolver = get_stadium_resolver("nwsl")
def _get_sources(self) -> list[str]:
"""Return source list in priority order."""
return ["espn"]
def _get_source_url(self, source: str, **kwargs) -> str:
"""Build URL for a source."""
if source == "espn":
date_str = kwargs.get("date", "")
return f"https://site.api.espn.com/apis/site/v2/sports/soccer/usa.nwsl/scoreboard?dates={date_str}"
raise ValueError(f"Unknown source: {source}")
def _get_season_months(self) -> list[tuple[int, int]]:
"""Get the months to scrape for NWSL season.
NWSL season runs March through November.
"""
months = []
# NWSL regular season + playoffs
for month in range(3, 12): # March-Nov
months.append((self.season, month))
return months
def _scrape_games_from_source(self, source: str) -> list[RawGameData]:
"""Scrape games from a specific source."""
if source == "espn":
return self._scrape_espn()
else:
raise ValueError(f"Unknown source: {source}")
def _scrape_espn(self) -> list[RawGameData]:
"""Scrape games from ESPN API."""
all_games: list[RawGameData] = []
for year, month in self._get_season_months():
# Get number of days in month
if month == 12:
next_month = date(year + 1, 1, 1)
else:
next_month = date(year, month + 1, 1)
days_in_month = (next_month - date(year, month, 1)).days
for day in range(1, days_in_month + 1):
try:
game_date = date(year, month, day)
date_str = game_date.strftime("%Y%m%d")
url = self._get_source_url("espn", date=date_str)
data = self.session.get_json(url)
games = self._parse_espn_response(data, url)
all_games.extend(games)
except Exception as e:
self._logger.debug(f"ESPN error for {year}-{month}-{day}: {e}")
continue
return all_games
def _parse_espn_response(
self,
data: dict,
source_url: str,
) -> list[RawGameData]:
"""Parse ESPN API response."""
games: list[RawGameData] = []
events = data.get("events", [])
for event in events:
try:
game = self._parse_espn_event(event, source_url)
if game:
games.append(game)
except Exception as e:
self._logger.debug(f"Failed to parse ESPN event: {e}")
continue
return games
def _parse_espn_event(
self,
event: dict,
source_url: str,
) -> Optional[RawGameData]:
"""Parse a single ESPN event."""
# Get date
date_str = event.get("date", "")
if not date_str:
return None
try:
game_date = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
except ValueError:
return None
# Get competitions
competitions = event.get("competitions", [])
if not competitions:
return None
competition = competitions[0]
# Get teams
competitors = competition.get("competitors", [])
if len(competitors) != 2:
return None
home_team = None
away_team = None
home_score = None
away_score = None
for competitor in competitors:
team_info = competitor.get("team", {})
team_name = team_info.get("displayName", "")
is_home = competitor.get("homeAway") == "home"
score = competitor.get("score")
if score:
try:
score = int(score)
except (ValueError, TypeError):
score = None
if is_home:
home_team = team_name
home_score = score
else:
away_team = team_name
away_score = score
if not home_team or not away_team:
return None
# Get venue
venue = competition.get("venue", {})
stadium = venue.get("fullName")
# Get status
status_info = competition.get("status", {})
status_type = status_info.get("type", {})
status_name = status_type.get("name", "").lower()
if status_name == "status_final":
status = "final"
elif status_name == "status_postponed":
status = "postponed"
elif status_name == "status_canceled":
status = "cancelled"
else:
status = "scheduled"
return RawGameData(
game_date=game_date,
home_team_raw=home_team,
away_team_raw=away_team,
stadium_raw=stadium,
home_score=home_score,
away_score=away_score,
status=status,
source_url=source_url,
)
def _normalize_games(
self,
raw_games: list[RawGameData],
) -> tuple[list[Game], list[ManualReviewItem]]:
"""Normalize raw games to Game objects with canonical IDs."""
games: list[Game] = []
review_items: list[ManualReviewItem] = []
for raw in raw_games:
game, item_reviews = self._normalize_single_game(raw)
if game:
games.append(game)
log_game(
self.sport,
game.id,
game.home_team_id,
game.away_team_id,
game.game_date.strftime("%Y-%m-%d"),
game.status,
)
review_items.extend(item_reviews)
return games, review_items
def _normalize_single_game(
self,
raw: RawGameData,
) -> tuple[Optional[Game], list[ManualReviewItem]]:
"""Normalize a single raw game."""
review_items: list[ManualReviewItem] = []
# Resolve home team
home_result = self._team_resolver.resolve(
raw.home_team_raw,
check_date=raw.game_date.date(),
source_url=raw.source_url,
)
if home_result.review_item:
review_items.append(home_result.review_item)
if not home_result.canonical_id:
log_warning(f"Could not resolve home team: {raw.home_team_raw}")
return None, review_items
# Resolve away team
away_result = self._team_resolver.resolve(
raw.away_team_raw,
check_date=raw.game_date.date(),
source_url=raw.source_url,
)
if away_result.review_item:
review_items.append(away_result.review_item)
if not away_result.canonical_id:
log_warning(f"Could not resolve away team: {raw.away_team_raw}")
return None, review_items
# Resolve stadium
stadium_id = None
if raw.stadium_raw:
stadium_result = self._stadium_resolver.resolve(
raw.stadium_raw,
check_date=raw.game_date.date(),
source_url=raw.source_url,
)
if stadium_result.review_item:
review_items.append(stadium_result.review_item)
stadium_id = stadium_result.canonical_id
# Get abbreviations for game ID
home_abbrev = self._get_abbreviation(home_result.canonical_id)
away_abbrev = self._get_abbreviation(away_result.canonical_id)
# Generate canonical game ID
game_id = generate_game_id(
sport=self.sport,
season=self.season,
away_abbrev=away_abbrev,
home_abbrev=home_abbrev,
game_date=raw.game_date,
game_number=None,
)
game = Game(
id=game_id,
sport=self.sport,
season=self.season,
home_team_id=home_result.canonical_id,
away_team_id=away_result.canonical_id,
stadium_id=stadium_id or "",
game_date=raw.game_date,
game_number=None,
home_score=raw.home_score,
away_score=raw.away_score,
status=raw.status,
source_url=raw.source_url,
raw_home_team=raw.home_team_raw,
raw_away_team=raw.away_team_raw,
raw_stadium=raw.stadium_raw,
)
return game, review_items
def _get_abbreviation(self, team_id: str) -> str:
"""Extract abbreviation from team ID."""
parts = team_id.split("_")
return parts[-1] if parts else ""
def scrape_teams(self) -> list[Team]:
"""Get all NWSL teams from hardcoded mappings."""
teams: list[Team] = []
seen: set[str] = set()
for abbrev, (team_id, full_name, city) in TEAM_MAPPINGS.get("nwsl", {}).items():
if team_id in seen:
continue
seen.add(team_id)
# Parse team name
team_name = full_name
# Get stadium ID
stadium_id = None
nwsl_stadiums = STADIUM_MAPPINGS.get("nwsl", {})
for sid, sinfo in nwsl_stadiums.items():
if city.lower() in sinfo.city.lower() or sinfo.city.lower() in city.lower():
stadium_id = sid
break
team = Team(
id=team_id,
sport="nwsl",
city=city,
name=team_name,
full_name=full_name,
abbreviation=abbrev,
conference=None, # NWSL uses single table
division=None,
stadium_id=stadium_id,
)
teams.append(team)
return teams
def scrape_stadiums(self) -> list[Stadium]:
"""Get all NWSL stadiums from hardcoded mappings."""
stadiums: list[Stadium] = []
nwsl_stadiums = STADIUM_MAPPINGS.get("nwsl", {})
for stadium_id, info in nwsl_stadiums.items():
stadium = Stadium(
id=stadium_id,
sport="nwsl",
name=info.name,
city=info.city,
state=info.state,
country=info.country,
latitude=info.latitude,
longitude=info.longitude,
surface="grass",
roof_type="open",
)
stadiums.append(stadium)
return stadiums
def create_nwsl_scraper(season: int) -> NWSLScraper:
"""Factory function to create an NWSL scraper."""
return NWSLScraper(season=season)