"""NWSL scraper implementation with multi-source fallback.""" from datetime import datetime, date, timedelta from typing import Optional from .base import BaseScraper, RawGameData, ScrapeResult from ..models.game import Game from ..models.team import Team from ..models.stadium import Stadium from ..models.aliases import ManualReviewItem from ..normalizers.canonical_id import generate_game_id from ..normalizers.team_resolver import ( TeamResolver, TEAM_MAPPINGS, get_team_resolver, ) from ..normalizers.stadium_resolver import ( StadiumResolver, STADIUM_MAPPINGS, get_stadium_resolver, ) from ..utils.logging import get_logger, log_game, log_warning class NWSLScraper(BaseScraper): """NWSL schedule scraper with multi-source fallback. Sources (in priority order): 1. ESPN API - Most reliable for NWSL Source Timezones: - espn: UTC - ISO 8601 format with "Z" suffix """ def __init__(self, season: int, **kwargs): """Initialize NWSL scraper. Args: season: Season year (e.g., 2026 for 2026 season) """ super().__init__("nwsl", season, **kwargs) self._team_resolver = get_team_resolver("nwsl") self._stadium_resolver = get_stadium_resolver("nwsl") def _get_sources(self) -> list[str]: """Return source list in priority order.""" return ["espn"] def _get_source_url(self, source: str, **kwargs) -> str: """Build URL for a source.""" if source == "espn": date_str = kwargs.get("date", "") return f"https://site.api.espn.com/apis/site/v2/sports/soccer/usa.nwsl/scoreboard?dates={date_str}" raise ValueError(f"Unknown source: {source}") def _get_season_months(self) -> list[tuple[int, int]]: """Get the months to scrape for NWSL season. NWSL season runs March through November. """ months = [] # NWSL regular season + playoffs for month in range(3, 12): # March-Nov months.append((self.season, month)) return months def _scrape_games_from_source(self, source: str) -> list[RawGameData]: """Scrape games from a specific source.""" if source == "espn": return self._scrape_espn() else: raise ValueError(f"Unknown source: {source}") def _scrape_espn(self) -> list[RawGameData]: """Scrape games from ESPN API using date range query.""" # Build date range for entire season (March-November) season_months = self._get_season_months() start_year, start_month = season_months[0] end_year, end_month = season_months[-1] # Get last day of end month if end_month == 12: end_date = date(end_year + 1, 1, 1) - timedelta(days=1) else: end_date = date(end_year, end_month + 1, 1) - timedelta(days=1) start_date = date(start_year, start_month, 1) date_range = f"{start_date.strftime('%Y%m%d')}-{end_date.strftime('%Y%m%d')}" url = f"https://site.api.espn.com/apis/site/v2/sports/soccer/usa.nwsl/scoreboard?limit=1000&dates={date_range}" self._logger.info(f"Fetching NWSL schedule: {date_range}") try: data = self.session.get_json(url) return self._parse_espn_response(data, url) except Exception as e: self._logger.error(f"ESPN error: {e}") return [] def _parse_espn_response( self, data: dict, source_url: str, ) -> list[RawGameData]: """Parse ESPN API response.""" games: list[RawGameData] = [] events = data.get("events", []) for event in events: try: game = self._parse_espn_event(event, source_url) if game: games.append(game) except Exception as e: self._logger.debug(f"Failed to parse ESPN event: {e}") continue return games def _parse_espn_event( self, event: dict, source_url: str, ) -> Optional[RawGameData]: """Parse a single ESPN event.""" # Get date date_str = event.get("date", "") if not date_str: return None try: game_date = datetime.fromisoformat(date_str.replace("Z", "+00:00")) except ValueError: return None # Get competitions competitions = event.get("competitions", []) if not competitions: return None competition = competitions[0] # Get teams competitors = competition.get("competitors", []) if len(competitors) != 2: return None home_team = None away_team = None home_score = None away_score = None for competitor in competitors: team_info = competitor.get("team", {}) team_name = team_info.get("displayName", "") is_home = competitor.get("homeAway") == "home" score = competitor.get("score") if score: try: score = int(score) except (ValueError, TypeError): score = None if is_home: home_team = team_name home_score = score else: away_team = team_name away_score = score if not home_team or not away_team: return None # Get venue venue = competition.get("venue", {}) stadium = venue.get("fullName") # Get status status_info = competition.get("status", {}) status_type = status_info.get("type", {}) status_name = status_type.get("name", "").lower() if status_name == "status_final": status = "final" elif status_name == "status_postponed": status = "postponed" elif status_name == "status_canceled": status = "cancelled" else: status = "scheduled" return RawGameData( game_date=game_date, home_team_raw=home_team, away_team_raw=away_team, stadium_raw=stadium, home_score=home_score, away_score=away_score, status=status, source_url=source_url, ) def _normalize_games( self, raw_games: list[RawGameData], ) -> tuple[list[Game], list[ManualReviewItem]]: """Normalize raw games to Game objects with canonical IDs.""" games: list[Game] = [] review_items: list[ManualReviewItem] = [] for raw in raw_games: game, item_reviews = self._normalize_single_game(raw) if game: games.append(game) log_game( self.sport, game.id, game.home_team_id, game.away_team_id, game.game_date.strftime("%Y-%m-%d"), game.status, ) review_items.extend(item_reviews) return games, review_items def _normalize_single_game( self, raw: RawGameData, ) -> tuple[Optional[Game], list[ManualReviewItem]]: """Normalize a single raw game.""" review_items: list[ManualReviewItem] = [] # Resolve home team home_result = self._team_resolver.resolve( raw.home_team_raw, check_date=raw.game_date.date(), source_url=raw.source_url, ) if home_result.review_item: review_items.append(home_result.review_item) if not home_result.canonical_id: log_warning(f"Could not resolve home team: {raw.home_team_raw}") return None, review_items # Resolve away team away_result = self._team_resolver.resolve( raw.away_team_raw, check_date=raw.game_date.date(), source_url=raw.source_url, ) if away_result.review_item: review_items.append(away_result.review_item) if not away_result.canonical_id: log_warning(f"Could not resolve away team: {raw.away_team_raw}") return None, review_items # Resolve stadium stadium_id = None if raw.stadium_raw: stadium_result = self._stadium_resolver.resolve( raw.stadium_raw, check_date=raw.game_date.date(), source_url=raw.source_url, ) if stadium_result.review_item: review_items.append(stadium_result.review_item) stadium_id = stadium_result.canonical_id # Get abbreviations for game ID home_abbrev = self._get_abbreviation(home_result.canonical_id) away_abbrev = self._get_abbreviation(away_result.canonical_id) # Generate canonical game ID game_id = generate_game_id( sport=self.sport, season=self.season, away_abbrev=away_abbrev, home_abbrev=home_abbrev, game_date=raw.game_date, game_number=None, ) game = Game( id=game_id, sport=self.sport, season=self.season, home_team_id=home_result.canonical_id, away_team_id=away_result.canonical_id, stadium_id=stadium_id or "", game_date=raw.game_date, game_number=None, home_score=raw.home_score, away_score=raw.away_score, status=raw.status, source_url=raw.source_url, raw_home_team=raw.home_team_raw, raw_away_team=raw.away_team_raw, raw_stadium=raw.stadium_raw, ) return game, review_items def _get_abbreviation(self, team_id: str) -> str: """Extract abbreviation from team ID.""" parts = team_id.split("_") return parts[-1] if parts else "" def scrape_teams(self) -> list[Team]: """Get all NWSL teams from hardcoded mappings.""" teams: list[Team] = [] seen: set[str] = set() for abbrev, (team_id, full_name, city, stadium_id) in TEAM_MAPPINGS.get("nwsl", {}).items(): if team_id in seen: continue seen.add(team_id) # Parse team name team_name = full_name team = Team( id=team_id, sport="nwsl", city=city, name=team_name, full_name=full_name, abbreviation=abbrev, conference=None, # NWSL uses single table division=None, stadium_id=stadium_id, ) teams.append(team) return teams def scrape_stadiums(self) -> list[Stadium]: """Get all NWSL stadiums from hardcoded mappings.""" stadiums: list[Stadium] = [] nwsl_stadiums = STADIUM_MAPPINGS.get("nwsl", {}) for stadium_id, info in nwsl_stadiums.items(): stadium = Stadium( id=stadium_id, sport="nwsl", name=info.name, city=info.city, state=info.state, country=info.country, latitude=info.latitude, longitude=info.longitude, timezone=info.timezone, surface="grass", roof_type="open", ) stadiums.append(stadium) return stadiums def create_nwsl_scraper(season: int) -> NWSLScraper: """Factory function to create an NWSL scraper.""" return NWSLScraper(season=season)