import time import logging from typing import Dict, Optional import httpx from sqlalchemy.orm import Session from app.scrapers.base import BaseScraper from app.models import Species, Image, ApiKey from app.workers.quality_tasks import download_and_process_image class FlickrScraper(BaseScraper): """Scraper for Flickr images via their API.""" name = "flickr" requires_api_key = True BASE_URL = "https://api.flickr.com/services/rest/" HEADERS = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_3) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15" } # Commercial-safe license IDs # 4 = CC BY 2.0, 7 = No known copyright, 8 = US Gov, 9 = CC0 ALLOWED_LICENSES = "4,7,8,9" LICENSE_MAP = { "4": "CC-BY", "7": "NO-KNOWN-COPYRIGHT", "8": "US-GOV", "9": "CC0", } def scrape_species( self, species: Species, db: Session, logger: Optional[logging.Logger] = None ) -> Dict[str, int]: """Scrape images from Flickr for a species.""" api_key = self.get_api_key(db) if not api_key: return {"downloaded": 0, "rejected": 0, "error": "No API key configured"} rate_limit = api_key.rate_limit_per_sec downloaded = 0 rejected = 0 try: params = { "method": "flickr.photos.search", "api_key": api_key.api_key, "text": species.scientific_name, "license": self.ALLOWED_LICENSES, "content_type": 1, # Photos only "media": "photos", "extras": "license,url_l,url_o,owner_name", "per_page": 100, "format": "json", "nojsoncallback": 1, } with httpx.Client(timeout=30, headers=self.HEADERS) as client: response = client.get(self.BASE_URL, params=params) response.raise_for_status() data = response.json() if data.get("stat") != "ok": return {"downloaded": 0, "rejected": 0, "error": data.get("message")} photos = data.get("photos", {}).get("photo", []) for photo in photos: # Get best URL (original or large) url = photo.get("url_o") or photo.get("url_l") if not url: rejected += 1 continue # Get license license_id = str(photo.get("license", "")) license_code = self.LICENSE_MAP.get(license_id, "UNKNOWN") if license_code == "UNKNOWN": rejected += 1 continue # Check if already exists source_id = str(photo.get("id")) existing = db.query(Image).filter( Image.source == self.name, Image.source_id == source_id, ).first() if existing: continue # Build attribution owner = photo.get("ownername", "Unknown") attribution = f"Photo by {owner} on Flickr ({license_code})" # Create image record image = Image( species_id=species.id, source=self.name, source_id=source_id, url=url, license=license_code, attribution=attribution, status="pending", ) db.add(image) db.commit() # Queue for download download_and_process_image.delay(image.id) downloaded += 1 # Rate limiting time.sleep(1.0 / rate_limit) except Exception as e: print(f"Error scraping Flickr for {species.scientific_name}: {e}") return {"downloaded": downloaded, "rejected": rejected} def test_connection(self, api_key: ApiKey) -> str: """Test Flickr API connection.""" params = { "method": "flickr.test.echo", "api_key": api_key.api_key, "format": "json", "nojsoncallback": 1, } with httpx.Client(timeout=10, headers=self.HEADERS) as client: response = client.get(self.BASE_URL, params=params) response.raise_for_status() data = response.json() if data.get("stat") != "ok": raise Exception(data.get("message", "API test failed")) return "Flickr API connection successful"