import time import logging from typing import Dict, Optional import httpx from sqlalchemy.orm import Session from app.scrapers.base import BaseScraper from app.models import Species, Image, ApiKey from app.workers.quality_tasks import download_and_process_image class WikimediaScraper(BaseScraper): """Scraper for Wikimedia Commons images.""" name = "wikimedia" requires_api_key = False BASE_URL = "https://commons.wikimedia.org/w/api.php" HEADERS = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_3) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15" } def scrape_species( self, species: Species, db: Session, logger: Optional[logging.Logger] = None ) -> Dict[str, int]: """Scrape images from Wikimedia Commons for a species.""" api_key = self.get_api_key(db) rate_limit = api_key.rate_limit_per_sec if api_key else 1.0 downloaded = 0 rejected = 0 try: # Search for images in the species category search_term = species.scientific_name params = { "action": "query", "format": "json", "generator": "search", "gsrsearch": f"filetype:bitmap {search_term}", "gsrnamespace": 6, # File namespace "gsrlimit": 50, "prop": "imageinfo", "iiprop": "url|extmetadata|size", } with httpx.Client(timeout=30, headers=self.HEADERS) as client: response = client.get(self.BASE_URL, params=params) response.raise_for_status() data = response.json() pages = data.get("query", {}).get("pages", {}) for page_id, page in pages.items(): if int(page_id) < 0: continue imageinfo = page.get("imageinfo", [{}])[0] url = imageinfo.get("url", "") if not url: continue # Check size width = imageinfo.get("width", 0) height = imageinfo.get("height", 0) if width < 256 or height < 256: rejected += 1 continue # Get license from metadata metadata = imageinfo.get("extmetadata", {}) license_info = metadata.get("LicenseShortName", {}).get("value", "") # Filter for commercial-safe licenses license_upper = license_info.upper() if "CC BY" in license_upper or "CC0" in license_upper or "PUBLIC DOMAIN" in license_upper: license_code = license_info else: rejected += 1 continue # Check if already exists source_id = str(page_id) existing = db.query(Image).filter( Image.source == self.name, Image.source_id == source_id, ).first() if existing: continue # Get attribution artist = metadata.get("Artist", {}).get("value", "Unknown") # Clean HTML from artist if "<" in artist: import re artist = re.sub(r"<[^>]+>", "", artist).strip() attribution = f"{artist} via Wikimedia Commons ({license_code})" # Create image record image = Image( species_id=species.id, source=self.name, source_id=source_id, url=url, license=license_code, attribution=attribution, width=width, height=height, status="pending", ) db.add(image) db.commit() # Queue for download download_and_process_image.delay(image.id) downloaded += 1 # Rate limiting time.sleep(1.0 / rate_limit) except Exception as e: print(f"Error scraping Wikimedia for {species.scientific_name}: {e}") return {"downloaded": downloaded, "rejected": rejected} def test_connection(self, api_key: ApiKey) -> str: """Test Wikimedia API connection.""" params = { "action": "query", "format": "json", "meta": "siteinfo", } with httpx.Client(timeout=10, headers=self.HEADERS) as client: response = client.get(self.BASE_URL, params=params) response.raise_for_status() return "Wikimedia Commons API connection successful"