"""HTTP utilities with rate limiting and exponential backoff.""" import random import time from typing import Optional from urllib.parse import urlparse import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from ..config import ( DEFAULT_REQUEST_DELAY, MAX_RETRIES, BACKOFF_FACTOR, INITIAL_BACKOFF, ) from .logging import get_logger, log_warning # User agents for rotation to avoid blocks USER_AGENTS = [ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:121.0) Gecko/20100101 Firefox/121.0", ] class RateLimitedSession: """HTTP session with rate limiting and exponential backoff. Features: - Configurable delay between requests - Automatic 429 detection with exponential backoff - User-agent rotation - Connection pooling - Automatic retries for transient errors """ def __init__( self, delay: float = DEFAULT_REQUEST_DELAY, max_retries: int = MAX_RETRIES, backoff_factor: float = BACKOFF_FACTOR, initial_backoff: float = INITIAL_BACKOFF, ): """Initialize the rate-limited session. Args: delay: Minimum delay between requests in seconds max_retries: Maximum number of retry attempts backoff_factor: Multiplier for exponential backoff initial_backoff: Initial backoff duration in seconds """ self.delay = delay self.max_retries = max_retries self.backoff_factor = backoff_factor self.initial_backoff = initial_backoff self.last_request_time: float = 0.0 self._domain_delays: dict[str, float] = {} # Create session with retry adapter self.session = requests.Session() # Configure automatic retries for connection errors retry_strategy = Retry( total=max_retries, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504], allowed_methods=["GET", "HEAD"], ) adapter = HTTPAdapter(max_retries=retry_strategy, pool_maxsize=10) self.session.mount("http://", adapter) self.session.mount("https://", adapter) self._logger = get_logger() def _get_user_agent(self) -> str: """Get a random user agent.""" return random.choice(USER_AGENTS) def _get_domain(self, url: str) -> str: """Extract domain from URL.""" parsed = urlparse(url) return parsed.netloc def _wait_for_rate_limit(self, url: str) -> None: """Wait to respect rate limiting.""" domain = self._get_domain(url) # Get domain-specific delay (if 429 was received) domain_delay = self._domain_delays.get(domain, 0.0) effective_delay = max(self.delay, domain_delay) elapsed = time.time() - self.last_request_time if elapsed < effective_delay: sleep_time = effective_delay - elapsed self._logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s") time.sleep(sleep_time) def _handle_429(self, url: str, attempt: int) -> float: """Handle 429 Too Many Requests with exponential backoff. Returns the backoff duration in seconds. """ domain = self._get_domain(url) backoff = self.initial_backoff * (self.backoff_factor ** attempt) # Add jitter to prevent thundering herd backoff += random.uniform(0, 1) # Update domain-specific delay self._domain_delays[domain] = min(backoff * 2, 60.0) # Cap at 60s log_warning(f"Rate limited (429) for {domain}, backing off {backoff:.1f}s") return backoff def get( self, url: str, headers: Optional[dict] = None, params: Optional[dict] = None, timeout: float = 30.0, ) -> requests.Response: """Make a rate-limited GET request with automatic retries. Args: url: URL to fetch headers: Additional headers to include params: Query parameters timeout: Request timeout in seconds Returns: Response object Raises: requests.RequestException: If all retries fail """ # Prepare headers with user agent request_headers = {"User-Agent": self._get_user_agent()} if headers: request_headers.update(headers) last_exception: Optional[Exception] = None for attempt in range(self.max_retries + 1): try: # Wait for rate limit self._wait_for_rate_limit(url) # Make request self.last_request_time = time.time() response = self.session.get( url, headers=request_headers, params=params, timeout=timeout, ) # Handle 429 if response.status_code == 429: if attempt < self.max_retries: backoff = self._handle_429(url, attempt) time.sleep(backoff) continue else: response.raise_for_status() # Return successful response return response except requests.RequestException as e: last_exception = e if attempt < self.max_retries: backoff = self.initial_backoff * (self.backoff_factor ** attempt) self._logger.warning( f"Request failed (attempt {attempt + 1}): {e}, retrying in {backoff:.1f}s" ) time.sleep(backoff) else: raise # Should not reach here, but just in case if last_exception: raise last_exception raise requests.RequestException("Max retries exceeded") def get_json( self, url: str, headers: Optional[dict] = None, params: Optional[dict] = None, timeout: float = 30.0, ) -> dict: """Make a rate-limited GET request and parse JSON response. Args: url: URL to fetch headers: Additional headers to include params: Query parameters timeout: Request timeout in seconds Returns: Parsed JSON as dictionary Raises: requests.RequestException: If request fails ValueError: If response is not valid JSON """ response = self.get(url, headers=headers, params=params, timeout=timeout) response.raise_for_status() return response.json() def get_html( self, url: str, headers: Optional[dict] = None, params: Optional[dict] = None, timeout: float = 30.0, ) -> str: """Make a rate-limited GET request and return HTML text. Args: url: URL to fetch headers: Additional headers to include params: Query parameters timeout: Request timeout in seconds Returns: HTML text content Raises: requests.RequestException: If request fails """ response = self.get(url, headers=headers, params=params, timeout=timeout) response.raise_for_status() return response.text def reset_domain_delays(self) -> None: """Reset domain-specific delays (e.g., after a long pause).""" self._domain_delays.clear() def close(self) -> None: """Close the session and release resources.""" self.session.close() # Global session instance (lazy initialized) _global_session: Optional[RateLimitedSession] = None def get_session() -> RateLimitedSession: """Get the global rate-limited session instance.""" global _global_session if _global_session is None: _global_session = RateLimitedSession() return _global_session def fetch_url(url: str, **kwargs) -> requests.Response: """Convenience function to fetch a URL with rate limiting.""" return get_session().get(url, **kwargs) def fetch_json(url: str, **kwargs) -> dict: """Convenience function to fetch JSON with rate limiting.""" return get_session().get_json(url, **kwargs) def fetch_html(url: str, **kwargs) -> str: """Convenience function to fetch HTML with rate limiting.""" return get_session().get_html(url, **kwargs)