Initial commit — PlantGuideScraper project

This commit is contained in:
Trey T
2026-04-12 09:54:27 -05:00
commit 6926f502c5
87 changed files with 29120 additions and 0 deletions

View File

@@ -0,0 +1,41 @@
from typing import Optional
from app.scrapers.base import BaseScraper
from app.scrapers.inaturalist import INaturalistScraper
from app.scrapers.flickr import FlickrScraper
from app.scrapers.wikimedia import WikimediaScraper
from app.scrapers.trefle import TrefleScraper
from app.scrapers.gbif import GBIFScraper
from app.scrapers.duckduckgo import DuckDuckGoScraper
from app.scrapers.bing import BingScraper
def get_scraper(source: str) -> Optional[BaseScraper]:
"""Get scraper instance for a source."""
scrapers = {
"inaturalist": INaturalistScraper,
"flickr": FlickrScraper,
"wikimedia": WikimediaScraper,
"trefle": TrefleScraper,
"gbif": GBIFScraper,
"duckduckgo": DuckDuckGoScraper,
"bing": BingScraper,
}
scraper_class = scrapers.get(source)
if scraper_class:
return scraper_class()
return None
__all__ = [
"get_scraper",
"BaseScraper",
"INaturalistScraper",
"FlickrScraper",
"WikimediaScraper",
"TrefleScraper",
"GBIFScraper",
"DuckDuckGoScraper",
"BingScraper",
]

View File

@@ -0,0 +1,57 @@
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
import logging
from sqlalchemy.orm import Session
from app.models import Species, ApiKey
class BaseScraper(ABC):
"""Base class for all image scrapers."""
name: str = "base"
requires_api_key: bool = True
@abstractmethod
def scrape_species(
self,
species: Species,
db: Session,
logger: Optional[logging.Logger] = None
) -> Dict[str, int]:
"""
Scrape images for a species.
Args:
species: The species to scrape images for
db: Database session
logger: Optional logger for debugging
Returns:
Dict with 'downloaded' and 'rejected' counts
"""
pass
@abstractmethod
def test_connection(self, api_key: ApiKey) -> str:
"""
Test API connection.
Args:
api_key: The API key configuration
Returns:
Success message
Raises:
Exception if connection fails
"""
pass
def get_api_key(self, db: Session) -> ApiKey:
"""Get API key for this scraper."""
return db.query(ApiKey).filter(
ApiKey.source == self.name,
ApiKey.enabled == True
).first()

228
backend/app/scrapers/bhl.py Normal file
View File

@@ -0,0 +1,228 @@
import time
import logging
from typing import Dict, Optional
import httpx
from sqlalchemy.orm import Session
from app.scrapers.base import BaseScraper
from app.models import Species, Image, ApiKey
from app.workers.quality_tasks import download_and_process_image
class BHLScraper(BaseScraper):
"""Scraper for Biodiversity Heritage Library (BHL) images.
BHL provides access to digitized biodiversity literature and illustrations.
Most content is public domain (pre-1927) or CC-licensed.
Note: BHL images are primarily historical botanical illustrations,
which may differ from photographs but are valuable for training.
"""
name = "bhl"
requires_api_key = True # BHL requires free API key
BASE_URL = "https://www.biodiversitylibrary.org/api3"
HEADERS = {
"User-Agent": "PlantGuideScraper/1.0 (Plant image collection for ML training)",
"Accept": "application/json",
}
# BHL content is mostly public domain
ALLOWED_LICENSES = {"CC0", "CC-BY", "CC-BY-SA", "PD"}
def scrape_species(
self,
species: Species,
db: Session,
logger: Optional[logging.Logger] = None
) -> Dict[str, int]:
"""Scrape images from BHL for a species."""
api_key = self.get_api_key(db)
if not api_key:
return {"downloaded": 0, "rejected": 0, "error": "No API key configured"}
rate_limit = api_key.rate_limit_per_sec if api_key else 0.5
downloaded = 0
rejected = 0
def log(level: str, msg: str):
if logger:
getattr(logger, level)(msg)
try:
# Disable SSL verification - some Docker environments lack proper CA certificates
with httpx.Client(timeout=30, headers=self.HEADERS, verify=False) as client:
# Search for name in BHL
search_response = client.get(
f"{self.BASE_URL}",
params={
"op": "NameSearch",
"name": species.scientific_name,
"format": "json",
"apikey": api_key.api_key,
},
)
search_response.raise_for_status()
search_data = search_response.json()
results = search_data.get("Result", [])
if not results:
log("info", f" Species not found in BHL: {species.scientific_name}")
return {"downloaded": 0, "rejected": 0}
time.sleep(1.0 / rate_limit)
# Get pages with illustrations for each name result
for name_result in results[:5]: # Limit to top 5 matches
name_bank_id = name_result.get("NameBankID")
if not name_bank_id:
continue
# Get publications with this name
pub_response = client.get(
f"{self.BASE_URL}",
params={
"op": "NameGetDetail",
"namebankid": name_bank_id,
"format": "json",
"apikey": api_key.api_key,
},
)
pub_response.raise_for_status()
pub_data = pub_response.json()
time.sleep(1.0 / rate_limit)
# Extract titles and get page images
for title in pub_data.get("Result", []):
title_id = title.get("TitleID")
if not title_id:
continue
# Get pages for this title
pages_response = client.get(
f"{self.BASE_URL}",
params={
"op": "GetPageMetadata",
"titleid": title_id,
"format": "json",
"apikey": api_key.api_key,
"ocr": "false",
"names": "false",
},
)
if pages_response.status_code != 200:
continue
pages_data = pages_response.json()
pages = pages_data.get("Result", [])
time.sleep(1.0 / rate_limit)
# Look for pages that are likely illustrations
for page in pages[:100]: # Limit pages per title
page_types = page.get("PageTypes", [])
# Only get illustration/plate pages
is_illustration = any(
pt.get("PageTypeName", "").lower() in ["illustration", "plate", "figure", "map"]
for pt in page_types
) if page_types else False
if not is_illustration and page_types:
continue
page_id = page.get("PageID")
if not page_id:
continue
# Construct image URL
# BHL provides multiple image sizes
image_url = f"https://www.biodiversitylibrary.org/pageimage/{page_id}"
# Check if already exists
source_id = str(page_id)
existing = db.query(Image).filter(
Image.source == self.name,
Image.source_id == source_id,
).first()
if existing:
continue
# Determine license - BHL content is usually public domain
item_url = page.get("ItemUrl", "")
year = None
try:
# Try to extract year from ItemUrl or other fields
if "Year" in page:
year = int(page.get("Year", 0))
except (ValueError, TypeError):
pass
# Content before 1927 is public domain in US
if year and year < 1927:
license_code = "PD"
else:
license_code = "CC0" # BHL default for older works
# Build attribution
title_name = title.get("ShortTitle", title.get("FullTitle", "Unknown"))
attribution = f"From '{title_name}' via Biodiversity Heritage Library ({license_code})"
# Create image record
image = Image(
species_id=species.id,
source=self.name,
source_id=source_id,
url=image_url,
license=license_code,
attribution=attribution,
status="pending",
)
db.add(image)
db.commit()
# Queue for download
download_and_process_image.delay(image.id)
downloaded += 1
# Limit total per species
if downloaded >= 50:
break
if downloaded >= 50:
break
if downloaded >= 50:
break
except httpx.HTTPStatusError as e:
log("error", f" HTTP error for {species.scientific_name}: {e.response.status_code}")
except Exception as e:
log("error", f" Error scraping BHL for {species.scientific_name}: {e}")
return {"downloaded": downloaded, "rejected": rejected}
def test_connection(self, api_key: ApiKey) -> str:
"""Test BHL API connection."""
with httpx.Client(timeout=10, headers=self.HEADERS, verify=False) as client:
response = client.get(
f"{self.BASE_URL}",
params={
"op": "NameSearch",
"name": "Rosa",
"format": "json",
"apikey": api_key.api_key,
},
)
response.raise_for_status()
data = response.json()
results = data.get("Result", [])
return f"BHL API connection successful ({len(results)} results for 'Rosa')"

View File

@@ -0,0 +1,135 @@
import hashlib
import time
import logging
from typing import Dict, Optional
import httpx
from sqlalchemy.orm import Session
from app.scrapers.base import BaseScraper
from app.models import Species, Image, ApiKey
from app.workers.quality_tasks import download_and_process_image
class BingScraper(BaseScraper):
"""Scraper for Bing Image Search v7 API (Azure Cognitive Services)."""
name = "bing"
requires_api_key = True
BASE_URL = "https://api.bing.microsoft.com/v7.0/images/search"
NEGATIVE_TERMS = "-herbarium -specimen -illustration -drawing -diagram -dried -pressed"
LICENSE_MAP = {
"Public": "CC0",
"Share": "CC-BY-SA",
"ShareCommercially": "CC-BY",
"Modify": "CC-BY-SA",
"ModifyCommercially": "CC-BY",
}
def _build_queries(self, species: Species) -> list[str]:
queries = [f'"{species.scientific_name}" plant photo {self.NEGATIVE_TERMS}']
if species.common_name:
queries.append(f'"{species.common_name}" houseplant photo {self.NEGATIVE_TERMS}')
return queries
def scrape_species(
self,
species: Species,
db: Session,
logger: Optional[logging.Logger] = None,
) -> Dict[str, int]:
api_key = self.get_api_key(db)
if not api_key:
return {"downloaded": 0, "rejected": 0}
rate_limit = api_key.rate_limit_per_sec or 3.0
downloaded = 0
rejected = 0
seen_urls = set()
headers = {
"Ocp-Apim-Subscription-Key": api_key.api_key,
}
try:
queries = self._build_queries(species)
with httpx.Client(timeout=30, headers=headers) as client:
for query in queries:
params = {
"q": query,
"imageType": "Photo",
"license": "ShareCommercially",
"count": 50,
}
response = client.get(self.BASE_URL, params=params)
response.raise_for_status()
data = response.json()
for result in data.get("value", []):
url = result.get("contentUrl")
if not url or url in seen_urls:
continue
seen_urls.add(url)
# Use Bing's imageId, fall back to md5 hash
source_id = result.get("imageId") or hashlib.md5(url.encode()).hexdigest()[:16]
existing = db.query(Image).filter(
Image.source == self.name,
Image.source_id == source_id,
).first()
if existing:
continue
# Map license
bing_license = result.get("license", "")
license_code = self.LICENSE_MAP.get(bing_license, "UNKNOWN")
host = result.get("hostPageDisplayUrl", "")
attribution = f"via Bing ({host})" if host else "via Bing Image Search"
image = Image(
species_id=species.id,
source=self.name,
source_id=source_id,
url=url,
width=result.get("width"),
height=result.get("height"),
license=license_code,
attribution=attribution,
status="pending",
)
db.add(image)
db.commit()
download_and_process_image.delay(image.id)
downloaded += 1
time.sleep(1.0 / rate_limit)
except Exception as e:
if logger:
logger.error(f"Error scraping Bing for {species.scientific_name}: {e}")
else:
print(f"Error scraping Bing for {species.scientific_name}: {e}")
return {"downloaded": downloaded, "rejected": rejected}
def test_connection(self, api_key: ApiKey) -> str:
headers = {"Ocp-Apim-Subscription-Key": api_key.api_key}
with httpx.Client(timeout=10, headers=headers) as client:
response = client.get(
self.BASE_URL,
params={"q": "Monstera deliciosa plant", "count": 1},
)
response.raise_for_status()
data = response.json()
count = data.get("totalEstimatedMatches", 0)
return f"Bing Image Search working ({count:,} estimated matches)"

View File

@@ -0,0 +1,101 @@
import hashlib
import time
import logging
from typing import Dict, Optional
from duckduckgo_search import DDGS
from sqlalchemy.orm import Session
from app.scrapers.base import BaseScraper
from app.models import Species, Image, ApiKey
from app.workers.quality_tasks import download_and_process_image
class DuckDuckGoScraper(BaseScraper):
"""Scraper for DuckDuckGo image search. No API key required."""
name = "duckduckgo"
requires_api_key = False
NEGATIVE_TERMS = "-herbarium -specimen -illustration -drawing -diagram -dried -pressed"
def _build_queries(self, species: Species) -> list[str]:
queries = [f'"{species.scientific_name}" plant photo {self.NEGATIVE_TERMS}']
if species.common_name:
queries.append(f'"{species.common_name}" houseplant photo {self.NEGATIVE_TERMS}')
return queries
def scrape_species(
self,
species: Species,
db: Session,
logger: Optional[logging.Logger] = None,
) -> Dict[str, int]:
api_key = self.get_api_key(db)
rate_limit = api_key.rate_limit_per_sec if api_key else 0.5
downloaded = 0
rejected = 0
seen_urls = set()
try:
queries = self._build_queries(species)
with DDGS() as ddgs:
for query in queries:
results = ddgs.images(
keywords=query,
type_image="photo",
max_results=50,
)
for result in results:
url = result.get("image")
if not url or url in seen_urls:
continue
seen_urls.add(url)
source_id = hashlib.md5(url.encode()).hexdigest()[:16]
# Check if already exists
existing = db.query(Image).filter(
Image.source == self.name,
Image.source_id == source_id,
).first()
if existing:
continue
title = result.get("title", "")
attribution = f"{title} via DuckDuckGo" if title else "via DuckDuckGo"
image = Image(
species_id=species.id,
source=self.name,
source_id=source_id,
url=url,
license="UNKNOWN",
attribution=attribution,
status="pending",
)
db.add(image)
db.commit()
download_and_process_image.delay(image.id)
downloaded += 1
time.sleep(1.0 / rate_limit)
except Exception as e:
if logger:
logger.error(f"Error scraping DuckDuckGo for {species.scientific_name}: {e}")
else:
print(f"Error scraping DuckDuckGo for {species.scientific_name}: {e}")
return {"downloaded": downloaded, "rejected": rejected}
def test_connection(self, api_key: ApiKey) -> str:
with DDGS() as ddgs:
results = ddgs.images(keywords="Monstera deliciosa plant", max_results=1)
count = len(list(results))
return f"DuckDuckGo search working ({count} test result)"

226
backend/app/scrapers/eol.py Normal file
View File

@@ -0,0 +1,226 @@
import time
import logging
from typing import Dict, Optional
import httpx
from sqlalchemy.orm import Session
from app.scrapers.base import BaseScraper
from app.models import Species, Image, ApiKey
from app.workers.quality_tasks import download_and_process_image
class EOLScraper(BaseScraper):
"""Scraper for Encyclopedia of Life (EOL) images.
EOL aggregates biodiversity data from many sources and provides
a free API with no authentication required.
"""
name = "eol"
requires_api_key = False
BASE_URL = "https://eol.org/api"
HEADERS = {
"User-Agent": "PlantGuideScraper/1.0 (Plant image collection for ML training)",
"Accept": "application/json",
}
# Map EOL license URLs to short codes
LICENSE_MAP = {
"http://creativecommons.org/publicdomain/zero/1.0/": "CC0",
"http://creativecommons.org/publicdomain/mark/1.0/": "CC0",
"http://creativecommons.org/licenses/by/2.0/": "CC-BY",
"http://creativecommons.org/licenses/by/3.0/": "CC-BY",
"http://creativecommons.org/licenses/by/4.0/": "CC-BY",
"http://creativecommons.org/licenses/by-sa/2.0/": "CC-BY-SA",
"http://creativecommons.org/licenses/by-sa/3.0/": "CC-BY-SA",
"http://creativecommons.org/licenses/by-sa/4.0/": "CC-BY-SA",
"https://creativecommons.org/publicdomain/zero/1.0/": "CC0",
"https://creativecommons.org/publicdomain/mark/1.0/": "CC0",
"https://creativecommons.org/licenses/by/2.0/": "CC-BY",
"https://creativecommons.org/licenses/by/3.0/": "CC-BY",
"https://creativecommons.org/licenses/by/4.0/": "CC-BY",
"https://creativecommons.org/licenses/by-sa/2.0/": "CC-BY-SA",
"https://creativecommons.org/licenses/by-sa/3.0/": "CC-BY-SA",
"https://creativecommons.org/licenses/by-sa/4.0/": "CC-BY-SA",
"pd": "CC0", # Public domain
"public domain": "CC0",
}
# Commercial-safe licenses
ALLOWED_LICENSES = {"CC0", "CC-BY", "CC-BY-SA"}
def scrape_species(
self,
species: Species,
db: Session,
logger: Optional[logging.Logger] = None
) -> Dict[str, int]:
"""Scrape images from EOL for a species."""
api_key = self.get_api_key(db)
rate_limit = api_key.rate_limit_per_sec if api_key else 0.5
downloaded = 0
rejected = 0
def log(level: str, msg: str):
if logger:
getattr(logger, level)(msg)
try:
# Disable SSL verification - EOL is a trusted source and some Docker
# environments lack proper CA certificates
with httpx.Client(timeout=30, headers=self.HEADERS, verify=False) as client:
# Step 1: Search for the species
search_response = client.get(
f"{self.BASE_URL}/search/1.0.json",
params={
"q": species.scientific_name,
"page": 1,
"exact": "true",
},
)
search_response.raise_for_status()
search_data = search_response.json()
results = search_data.get("results", [])
if not results:
log("info", f" Species not found in EOL: {species.scientific_name}")
return {"downloaded": 0, "rejected": 0}
# Get the EOL page ID
eol_page_id = results[0].get("id")
if not eol_page_id:
return {"downloaded": 0, "rejected": 0}
time.sleep(1.0 / rate_limit)
# Step 2: Get page details with images
page_response = client.get(
f"{self.BASE_URL}/pages/1.0/{eol_page_id}.json",
params={
"images_per_page": 75,
"images_page": 1,
"videos_per_page": 0,
"sounds_per_page": 0,
"maps_per_page": 0,
"texts_per_page": 0,
"details": "true",
"licenses": "cc-by|cc-by-sa|pd|cc-by-nc",
},
)
page_response.raise_for_status()
page_data = page_response.json()
data_objects = page_data.get("dataObjects", [])
log("debug", f" Found {len(data_objects)} media objects")
for obj in data_objects:
# Only process images
media_type = obj.get("dataType", "")
if "image" not in media_type.lower() and "stillimage" not in media_type.lower():
continue
# Get image URL
image_url = obj.get("eolMediaURL") or obj.get("mediaURL")
if not image_url:
rejected += 1
continue
# Check license
license_url = obj.get("license", "").lower()
license_code = None
# Try to match license URL
for pattern, code in self.LICENSE_MAP.items():
if pattern in license_url:
license_code = code
break
if not license_code:
# Check for NC licenses which we reject
if "-nc" in license_url:
rejected += 1
continue
# Unknown license, skip
log("debug", f" Rejected: unknown license {license_url}")
rejected += 1
continue
if license_code not in self.ALLOWED_LICENSES:
rejected += 1
continue
# Create unique source ID
source_id = str(obj.get("dataObjectVersionID") or obj.get("identifier") or hash(image_url))
# Check if already exists
existing = db.query(Image).filter(
Image.source == self.name,
Image.source_id == source_id,
).first()
if existing:
continue
# Build attribution
agents = obj.get("agents", [])
photographer = None
rights_holder = None
for agent in agents:
role = agent.get("role", "").lower()
name = agent.get("full_name", "")
if role == "photographer":
photographer = name
elif role == "owner" or role == "rights holder":
rights_holder = name
attribution_parts = []
if photographer:
attribution_parts.append(f"Photo by {photographer}")
if rights_holder and rights_holder != photographer:
attribution_parts.append(f"Rights: {rights_holder}")
attribution_parts.append(f"via EOL ({license_code})")
attribution = " | ".join(attribution_parts)
# Create image record
image = Image(
species_id=species.id,
source=self.name,
source_id=source_id,
url=image_url,
license=license_code,
attribution=attribution,
status="pending",
)
db.add(image)
db.commit()
# Queue for download
download_and_process_image.delay(image.id)
downloaded += 1
time.sleep(1.0 / rate_limit)
except httpx.HTTPStatusError as e:
log("error", f" HTTP error for {species.scientific_name}: {e.response.status_code}")
except Exception as e:
log("error", f" Error scraping EOL for {species.scientific_name}: {e}")
return {"downloaded": downloaded, "rejected": rejected}
def test_connection(self, api_key: ApiKey) -> str:
"""Test EOL API connection."""
with httpx.Client(timeout=10, headers=self.HEADERS, verify=False) as client:
response = client.get(
f"{self.BASE_URL}/search/1.0.json",
params={"q": "Rosa", "page": 1},
)
response.raise_for_status()
data = response.json()
total = data.get("totalResults", 0)
return f"EOL API connection successful ({total} results for 'Rosa')"

View File

@@ -0,0 +1,146 @@
import time
import logging
from typing import Dict, Optional
import httpx
from sqlalchemy.orm import Session
from app.scrapers.base import BaseScraper
from app.models import Species, Image, ApiKey
from app.workers.quality_tasks import download_and_process_image
class FlickrScraper(BaseScraper):
"""Scraper for Flickr images via their API."""
name = "flickr"
requires_api_key = True
BASE_URL = "https://api.flickr.com/services/rest/"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_3) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15"
}
# Commercial-safe license IDs
# 4 = CC BY 2.0, 7 = No known copyright, 8 = US Gov, 9 = CC0
ALLOWED_LICENSES = "4,7,8,9"
LICENSE_MAP = {
"4": "CC-BY",
"7": "NO-KNOWN-COPYRIGHT",
"8": "US-GOV",
"9": "CC0",
}
def scrape_species(
self,
species: Species,
db: Session,
logger: Optional[logging.Logger] = None
) -> Dict[str, int]:
"""Scrape images from Flickr for a species."""
api_key = self.get_api_key(db)
if not api_key:
return {"downloaded": 0, "rejected": 0, "error": "No API key configured"}
rate_limit = api_key.rate_limit_per_sec
downloaded = 0
rejected = 0
try:
params = {
"method": "flickr.photos.search",
"api_key": api_key.api_key,
"text": species.scientific_name,
"license": self.ALLOWED_LICENSES,
"content_type": 1, # Photos only
"media": "photos",
"extras": "license,url_l,url_o,owner_name",
"per_page": 100,
"format": "json",
"nojsoncallback": 1,
}
with httpx.Client(timeout=30, headers=self.HEADERS) as client:
response = client.get(self.BASE_URL, params=params)
response.raise_for_status()
data = response.json()
if data.get("stat") != "ok":
return {"downloaded": 0, "rejected": 0, "error": data.get("message")}
photos = data.get("photos", {}).get("photo", [])
for photo in photos:
# Get best URL (original or large)
url = photo.get("url_o") or photo.get("url_l")
if not url:
rejected += 1
continue
# Get license
license_id = str(photo.get("license", ""))
license_code = self.LICENSE_MAP.get(license_id, "UNKNOWN")
if license_code == "UNKNOWN":
rejected += 1
continue
# Check if already exists
source_id = str(photo.get("id"))
existing = db.query(Image).filter(
Image.source == self.name,
Image.source_id == source_id,
).first()
if existing:
continue
# Build attribution
owner = photo.get("ownername", "Unknown")
attribution = f"Photo by {owner} on Flickr ({license_code})"
# Create image record
image = Image(
species_id=species.id,
source=self.name,
source_id=source_id,
url=url,
license=license_code,
attribution=attribution,
status="pending",
)
db.add(image)
db.commit()
# Queue for download
download_and_process_image.delay(image.id)
downloaded += 1
# Rate limiting
time.sleep(1.0 / rate_limit)
except Exception as e:
print(f"Error scraping Flickr for {species.scientific_name}: {e}")
return {"downloaded": downloaded, "rejected": rejected}
def test_connection(self, api_key: ApiKey) -> str:
"""Test Flickr API connection."""
params = {
"method": "flickr.test.echo",
"api_key": api_key.api_key,
"format": "json",
"nojsoncallback": 1,
}
with httpx.Client(timeout=10, headers=self.HEADERS) as client:
response = client.get(self.BASE_URL, params=params)
response.raise_for_status()
data = response.json()
if data.get("stat") != "ok":
raise Exception(data.get("message", "API test failed"))
return "Flickr API connection successful"

View File

@@ -0,0 +1,159 @@
import time
import logging
from typing import Dict, Optional
import httpx
from sqlalchemy.orm import Session
from app.scrapers.base import BaseScraper
from app.models import Species, Image, ApiKey
from app.workers.quality_tasks import download_and_process_image
class GBIFScraper(BaseScraper):
"""Scraper for GBIF (Global Biodiversity Information Facility) images."""
name = "gbif"
requires_api_key = False # GBIF is free to use
BASE_URL = "https://api.gbif.org/v1"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_3) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15"
}
# Map GBIF license URLs to short codes
LICENSE_MAP = {
"http://creativecommons.org/publicdomain/zero/1.0/legalcode": "CC0",
"http://creativecommons.org/licenses/by/4.0/legalcode": "CC-BY",
"http://creativecommons.org/licenses/by-nc/4.0/legalcode": "CC-BY-NC",
"http://creativecommons.org/publicdomain/zero/1.0/": "CC0",
"http://creativecommons.org/licenses/by/4.0/": "CC-BY",
"http://creativecommons.org/licenses/by-nc/4.0/": "CC-BY-NC",
"https://creativecommons.org/publicdomain/zero/1.0/legalcode": "CC0",
"https://creativecommons.org/licenses/by/4.0/legalcode": "CC-BY",
"https://creativecommons.org/licenses/by-nc/4.0/legalcode": "CC-BY-NC",
"https://creativecommons.org/publicdomain/zero/1.0/": "CC0",
"https://creativecommons.org/licenses/by/4.0/": "CC-BY",
"https://creativecommons.org/licenses/by-nc/4.0/": "CC-BY-NC",
}
# Only allow commercial-safe licenses
ALLOWED_LICENSES = {"CC0", "CC-BY"}
def scrape_species(
self,
species: Species,
db: Session,
logger: Optional[logging.Logger] = None
) -> Dict[str, int]:
"""Scrape images from GBIF for a species."""
# GBIF doesn't require API key, but we still respect rate limits
api_key = self.get_api_key(db)
rate_limit = api_key.rate_limit_per_sec if api_key else 1.0
downloaded = 0
rejected = 0
try:
params = {
"scientificName": species.scientific_name,
"mediaType": "StillImage",
"limit": 100,
}
with httpx.Client(timeout=30, headers=self.HEADERS) as client:
response = client.get(
f"{self.BASE_URL}/occurrence/search",
params=params,
)
response.raise_for_status()
data = response.json()
results = data.get("results", [])
for occurrence in results:
media_list = occurrence.get("media", [])
for media in media_list:
# Only process still images
if media.get("type") != "StillImage":
continue
url = media.get("identifier")
if not url:
rejected += 1
continue
# Check license
license_url = media.get("license", "")
license_code = self.LICENSE_MAP.get(license_url)
if not license_code or license_code not in self.ALLOWED_LICENSES:
rejected += 1
continue
# Create unique source ID from occurrence key and media URL
occurrence_key = occurrence.get("key", "")
# Use hash of URL for uniqueness within occurrence
url_hash = str(hash(url))[-8:]
source_id = f"{occurrence_key}_{url_hash}"
# Check if already exists
existing = db.query(Image).filter(
Image.source == self.name,
Image.source_id == source_id,
).first()
if existing:
continue
# Build attribution
creator = media.get("creator", "")
rights_holder = media.get("rightsHolder", "")
attribution_parts = []
if creator:
attribution_parts.append(f"Photo by {creator}")
if rights_holder and rights_holder != creator:
attribution_parts.append(f"Rights: {rights_holder}")
attribution_parts.append(f"via GBIF ({license_code})")
attribution = " | ".join(attribution_parts) if attribution_parts else f"GBIF ({license_code})"
# Create image record
image = Image(
species_id=species.id,
source=self.name,
source_id=source_id,
url=url,
license=license_code,
attribution=attribution,
status="pending",
)
db.add(image)
db.commit()
# Queue for download
download_and_process_image.delay(image.id)
downloaded += 1
# Rate limiting
time.sleep(1.0 / rate_limit)
except Exception as e:
print(f"Error scraping GBIF for {species.scientific_name}: {e}")
return {"downloaded": downloaded, "rejected": rejected}
def test_connection(self, api_key: ApiKey) -> str:
"""Test GBIF API connection."""
# GBIF doesn't require authentication, just test the endpoint
with httpx.Client(timeout=10, headers=self.HEADERS) as client:
response = client.get(
f"{self.BASE_URL}/occurrence/search",
params={"limit": 1},
)
response.raise_for_status()
data = response.json()
count = data.get("count", 0)
return f"GBIF API connection successful ({count:,} total occurrences available)"

View File

@@ -0,0 +1,144 @@
import time
import logging
from typing import Dict, Optional
import httpx
from sqlalchemy.orm import Session
from app.scrapers.base import BaseScraper
from app.models import Species, Image, ApiKey
from app.workers.quality_tasks import download_and_process_image
class INaturalistScraper(BaseScraper):
"""Scraper for iNaturalist observations via their API."""
name = "inaturalist"
requires_api_key = False # Public API, but rate limited
BASE_URL = "https://api.inaturalist.org/v1"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_3) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15"
}
# Commercial-safe licenses (CC0, CC-BY)
ALLOWED_LICENSES = ["cc0", "cc-by"]
def scrape_species(
self,
species: Species,
db: Session,
logger: Optional[logging.Logger] = None
) -> Dict[str, int]:
"""Scrape images from iNaturalist for a species."""
api_key = self.get_api_key(db)
rate_limit = api_key.rate_limit_per_sec if api_key else 1.0
downloaded = 0
rejected = 0
def log(level: str, msg: str):
if logger:
getattr(logger, level)(msg)
try:
# Search for observations of this species
params = {
"taxon_name": species.scientific_name,
"quality_grade": "research", # Only research-grade
"photos": True,
"per_page": 200,
"order_by": "votes",
"license": ",".join(self.ALLOWED_LICENSES),
}
log("debug", f" API request params: {params}")
with httpx.Client(timeout=30, headers=self.HEADERS) as client:
response = client.get(
f"{self.BASE_URL}/observations",
params=params,
)
log("debug", f" API response status: {response.status_code}")
response.raise_for_status()
data = response.json()
observations = data.get("results", [])
total_results = data.get("total_results", 0)
log("debug", f" Found {len(observations)} observations (total: {total_results})")
if not observations:
log("info", f" No observations found for {species.scientific_name}")
return {"downloaded": 0, "rejected": 0}
for obs in observations:
photos = obs.get("photos", [])
for photo in photos:
# Check license
license_code = photo.get("license_code", "").lower() if photo.get("license_code") else ""
if license_code not in self.ALLOWED_LICENSES:
log("debug", f" Rejected photo {photo.get('id')}: license={license_code}")
rejected += 1
continue
# Get image URL (medium size for initial download)
url = photo.get("url", "")
if not url:
log("debug", f" Skipped photo {photo.get('id')}: no URL")
continue
# Convert to larger size
url = url.replace("square", "large")
# Check if already exists
source_id = str(photo.get("id"))
existing = db.query(Image).filter(
Image.source == self.name,
Image.source_id == source_id,
).first()
if existing:
log("debug", f" Skipped photo {source_id}: already exists")
continue
# Create image record
image = Image(
species_id=species.id,
source=self.name,
source_id=source_id,
url=url,
license=license_code.upper(),
attribution=photo.get("attribution", ""),
status="pending",
)
db.add(image)
db.commit()
# Queue for download
download_and_process_image.delay(image.id)
downloaded += 1
log("debug", f" Queued photo {source_id} for download")
# Rate limiting
time.sleep(1.0 / rate_limit)
except httpx.HTTPStatusError as e:
log("error", f" HTTP error for {species.scientific_name}: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
log("error", f" Request error for {species.scientific_name}: {e}")
except Exception as e:
log("error", f" Error scraping iNaturalist for {species.scientific_name}: {e}")
return {"downloaded": downloaded, "rejected": rejected}
def test_connection(self, api_key: ApiKey) -> str:
"""Test iNaturalist API connection."""
with httpx.Client(timeout=10, headers=self.HEADERS) as client:
response = client.get(
f"{self.BASE_URL}/observations",
params={"per_page": 1},
)
response.raise_for_status()
return "iNaturalist API connection successful"

View File

@@ -0,0 +1,154 @@
import time
import logging
from typing import Dict, Optional
import httpx
from sqlalchemy.orm import Session
from app.scrapers.base import BaseScraper
from app.models import Species, Image, ApiKey
from app.workers.quality_tasks import download_and_process_image
class TrefleScraper(BaseScraper):
"""Scraper for Trefle.io plant database."""
name = "trefle"
requires_api_key = True
BASE_URL = "https://trefle.io/api/v1"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_3) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15"
}
def scrape_species(
self,
species: Species,
db: Session,
logger: Optional[logging.Logger] = None
) -> Dict[str, int]:
"""Scrape images from Trefle for a species."""
api_key = self.get_api_key(db)
if not api_key:
return {"downloaded": 0, "rejected": 0, "error": "No API key configured"}
rate_limit = api_key.rate_limit_per_sec
downloaded = 0
rejected = 0
try:
# Search for the species
params = {
"token": api_key.api_key,
"q": species.scientific_name,
}
with httpx.Client(timeout=30, headers=self.HEADERS) as client:
response = client.get(
f"{self.BASE_URL}/plants/search",
params=params,
)
response.raise_for_status()
data = response.json()
plants = data.get("data", [])
for plant in plants:
# Get plant details for more images
plant_id = plant.get("id")
if not plant_id:
continue
detail_response = client.get(
f"{self.BASE_URL}/plants/{plant_id}",
params={"token": api_key.api_key},
)
if detail_response.status_code != 200:
continue
plant_detail = detail_response.json().get("data", {})
# Get main image
main_image = plant_detail.get("image_url")
if main_image:
source_id = f"main_{plant_id}"
existing = db.query(Image).filter(
Image.source == self.name,
Image.source_id == source_id,
).first()
if not existing:
image = Image(
species_id=species.id,
source=self.name,
source_id=source_id,
url=main_image,
license="TREFLE", # Trefle's own license
attribution="Trefle.io Plant Database",
status="pending",
)
db.add(image)
db.commit()
download_and_process_image.delay(image.id)
downloaded += 1
# Get additional images from species detail
images = plant_detail.get("images", {})
for image_type, image_list in images.items():
if not isinstance(image_list, list):
continue
for img in image_list:
url = img.get("image_url")
if not url:
continue
img_id = img.get("id", url.split("/")[-1])
source_id = f"{image_type}_{img_id}"
existing = db.query(Image).filter(
Image.source == self.name,
Image.source_id == source_id,
).first()
if existing:
continue
copyright_info = img.get("copyright", "")
image = Image(
species_id=species.id,
source=self.name,
source_id=source_id,
url=url,
license="TREFLE",
attribution=copyright_info or "Trefle.io",
status="pending",
)
db.add(image)
db.commit()
download_and_process_image.delay(image.id)
downloaded += 1
# Rate limiting
time.sleep(1.0 / rate_limit)
except Exception as e:
print(f"Error scraping Trefle for {species.scientific_name}: {e}")
return {"downloaded": downloaded, "rejected": rejected}
def test_connection(self, api_key: ApiKey) -> str:
"""Test Trefle API connection."""
params = {"token": api_key.api_key}
with httpx.Client(timeout=10, headers=self.HEADERS) as client:
response = client.get(
f"{self.BASE_URL}/plants",
params=params,
)
response.raise_for_status()
return "Trefle API connection successful"

View File

@@ -0,0 +1,146 @@
import time
import logging
from typing import Dict, Optional
import httpx
from sqlalchemy.orm import Session
from app.scrapers.base import BaseScraper
from app.models import Species, Image, ApiKey
from app.workers.quality_tasks import download_and_process_image
class WikimediaScraper(BaseScraper):
"""Scraper for Wikimedia Commons images."""
name = "wikimedia"
requires_api_key = False
BASE_URL = "https://commons.wikimedia.org/w/api.php"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_3) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15"
}
def scrape_species(
self,
species: Species,
db: Session,
logger: Optional[logging.Logger] = None
) -> Dict[str, int]:
"""Scrape images from Wikimedia Commons for a species."""
api_key = self.get_api_key(db)
rate_limit = api_key.rate_limit_per_sec if api_key else 1.0
downloaded = 0
rejected = 0
try:
# Search for images in the species category
search_term = species.scientific_name
params = {
"action": "query",
"format": "json",
"generator": "search",
"gsrsearch": f"filetype:bitmap {search_term}",
"gsrnamespace": 6, # File namespace
"gsrlimit": 50,
"prop": "imageinfo",
"iiprop": "url|extmetadata|size",
}
with httpx.Client(timeout=30, headers=self.HEADERS) as client:
response = client.get(self.BASE_URL, params=params)
response.raise_for_status()
data = response.json()
pages = data.get("query", {}).get("pages", {})
for page_id, page in pages.items():
if int(page_id) < 0:
continue
imageinfo = page.get("imageinfo", [{}])[0]
url = imageinfo.get("url", "")
if not url:
continue
# Check size
width = imageinfo.get("width", 0)
height = imageinfo.get("height", 0)
if width < 256 or height < 256:
rejected += 1
continue
# Get license from metadata
metadata = imageinfo.get("extmetadata", {})
license_info = metadata.get("LicenseShortName", {}).get("value", "")
# Filter for commercial-safe licenses
license_upper = license_info.upper()
if "CC BY" in license_upper or "CC0" in license_upper or "PUBLIC DOMAIN" in license_upper:
license_code = license_info
else:
rejected += 1
continue
# Check if already exists
source_id = str(page_id)
existing = db.query(Image).filter(
Image.source == self.name,
Image.source_id == source_id,
).first()
if existing:
continue
# Get attribution
artist = metadata.get("Artist", {}).get("value", "Unknown")
# Clean HTML from artist
if "<" in artist:
import re
artist = re.sub(r"<[^>]+>", "", artist).strip()
attribution = f"{artist} via Wikimedia Commons ({license_code})"
# Create image record
image = Image(
species_id=species.id,
source=self.name,
source_id=source_id,
url=url,
license=license_code,
attribution=attribution,
width=width,
height=height,
status="pending",
)
db.add(image)
db.commit()
# Queue for download
download_and_process_image.delay(image.id)
downloaded += 1
# Rate limiting
time.sleep(1.0 / rate_limit)
except Exception as e:
print(f"Error scraping Wikimedia for {species.scientific_name}: {e}")
return {"downloaded": downloaded, "rejected": rejected}
def test_connection(self, api_key: ApiKey) -> str:
"""Test Wikimedia API connection."""
params = {
"action": "query",
"format": "json",
"meta": "siteinfo",
}
with httpx.Client(timeout=10, headers=self.HEADERS) as client:
response = client.get(self.BASE_URL, params=params)
response.raise_for_status()
return "Wikimedia Commons API connection successful"