Files
2026-04-12 09:54:27 -05:00

199 lines
7.5 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.database import get_db
from app.models import ApiKey
from app.schemas.api_key import ApiKeyCreate, ApiKeyUpdate, ApiKeyResponse
router = APIRouter()
# Available sources
# auth_type: "none" (no auth), "api_key" (single key), "api_key_secret" (key + secret), "oauth" (client_id + client_secret + access_token)
# default_rate: safe default requests per second for each API
AVAILABLE_SOURCES = [
{"name": "gbif", "label": "GBIF", "requires_secret": False, "auth_type": "none", "default_rate": 1.0}, # Free, no auth required
{"name": "inaturalist", "label": "iNaturalist", "requires_secret": True, "auth_type": "api_key_secret", "default_rate": 1.0}, # 60/min limit
{"name": "flickr", "label": "Flickr", "requires_secret": True, "auth_type": "api_key_secret", "default_rate": 0.5}, # 3600/hr shared limit
{"name": "wikimedia", "label": "Wikimedia Commons", "requires_secret": True, "auth_type": "oauth", "default_rate": 1.0}, # generous limits
{"name": "trefle", "label": "Trefle.io", "requires_secret": False, "auth_type": "api_key", "default_rate": 1.0}, # 120/min limit
{"name": "duckduckgo", "label": "DuckDuckGo", "requires_secret": False, "auth_type": "none", "default_rate": 0.5}, # Web search, no API key
{"name": "bing", "label": "Bing Image Search", "requires_secret": False, "auth_type": "api_key", "default_rate": 3.0}, # Azure Cognitive Services
]
def mask_api_key(key: str) -> str:
"""Mask API key, showing only last 4 characters."""
if not key or len(key) <= 4:
return "****"
return "*" * (len(key) - 4) + key[-4:]
@router.get("")
def list_sources(db: Session = Depends(get_db)):
"""List all available sources with their configuration status."""
api_keys = {k.source: k for k in db.query(ApiKey).all()}
result = []
for source in AVAILABLE_SOURCES:
api_key = api_keys.get(source["name"])
default_rate = source.get("default_rate", 1.0)
result.append({
"name": source["name"],
"label": source["label"],
"requires_secret": source["requires_secret"],
"auth_type": source.get("auth_type", "api_key"),
"configured": api_key is not None,
"enabled": api_key.enabled if api_key else False,
"api_key_masked": mask_api_key(api_key.api_key) if api_key else None,
"has_secret": bool(api_key.api_secret) if api_key else False,
"has_access_token": bool(getattr(api_key, 'access_token', None)) if api_key else False,
"rate_limit_per_sec": api_key.rate_limit_per_sec if api_key else default_rate,
"default_rate": default_rate,
})
return result
@router.get("/{source}")
def get_source(source: str, db: Session = Depends(get_db)):
"""Get source configuration."""
source_info = next((s for s in AVAILABLE_SOURCES if s["name"] == source), None)
if not source_info:
raise HTTPException(status_code=404, detail="Unknown source")
api_key = db.query(ApiKey).filter(ApiKey.source == source).first()
default_rate = source_info.get("default_rate", 1.0)
return {
"name": source_info["name"],
"label": source_info["label"],
"requires_secret": source_info["requires_secret"],
"auth_type": source_info.get("auth_type", "api_key"),
"configured": api_key is not None,
"enabled": api_key.enabled if api_key else False,
"api_key_masked": mask_api_key(api_key.api_key) if api_key else None,
"has_secret": bool(api_key.api_secret) if api_key else False,
"has_access_token": bool(getattr(api_key, 'access_token', None)) if api_key else False,
"rate_limit_per_sec": api_key.rate_limit_per_sec if api_key else default_rate,
"default_rate": default_rate,
}
@router.put("/{source}")
def update_source(
source: str,
config: ApiKeyCreate,
db: Session = Depends(get_db),
):
"""Create or update source configuration."""
source_info = next((s for s in AVAILABLE_SOURCES if s["name"] == source), None)
if not source_info:
raise HTTPException(status_code=404, detail="Unknown source")
# For sources that require auth, validate api_key is provided
auth_type = source_info.get("auth_type", "api_key")
if auth_type != "none" and not config.api_key:
raise HTTPException(status_code=400, detail="API key is required for this source")
api_key = db.query(ApiKey).filter(ApiKey.source == source).first()
# Use placeholder for no-auth sources
api_key_value = config.api_key or "no-auth"
if api_key:
# Update existing
api_key.api_key = api_key_value
if config.api_secret:
api_key.api_secret = config.api_secret
if config.access_token:
api_key.access_token = config.access_token
api_key.rate_limit_per_sec = config.rate_limit_per_sec
api_key.enabled = config.enabled
else:
# Create new
api_key = ApiKey(
source=source,
api_key=api_key_value,
api_secret=config.api_secret,
access_token=config.access_token,
rate_limit_per_sec=config.rate_limit_per_sec,
enabled=config.enabled,
)
db.add(api_key)
db.commit()
db.refresh(api_key)
return {
"name": source,
"configured": True,
"enabled": api_key.enabled,
"api_key_masked": mask_api_key(api_key.api_key) if auth_type != "none" else None,
"has_secret": bool(api_key.api_secret),
"has_access_token": bool(api_key.access_token),
"rate_limit_per_sec": api_key.rate_limit_per_sec,
}
@router.patch("/{source}")
def patch_source(
source: str,
config: ApiKeyUpdate,
db: Session = Depends(get_db),
):
"""Partially update source configuration."""
api_key = db.query(ApiKey).filter(ApiKey.source == source).first()
if not api_key:
raise HTTPException(status_code=404, detail="Source not configured")
update_data = config.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(api_key, field, value)
db.commit()
db.refresh(api_key)
return {
"name": source,
"configured": True,
"enabled": api_key.enabled,
"api_key_masked": mask_api_key(api_key.api_key),
"has_secret": bool(api_key.api_secret),
"has_access_token": bool(api_key.access_token),
"rate_limit_per_sec": api_key.rate_limit_per_sec,
}
@router.delete("/{source}")
def delete_source(source: str, db: Session = Depends(get_db)):
"""Delete source configuration."""
api_key = db.query(ApiKey).filter(ApiKey.source == source).first()
if not api_key:
raise HTTPException(status_code=404, detail="Source not configured")
db.delete(api_key)
db.commit()
return {"status": "deleted"}
@router.post("/{source}/test")
def test_source(source: str, db: Session = Depends(get_db)):
"""Test source API connection."""
api_key = db.query(ApiKey).filter(ApiKey.source == source).first()
if not api_key:
raise HTTPException(status_code=404, detail="Source not configured")
# Import and test the scraper
from app.scrapers import get_scraper
scraper = get_scraper(source)
if not scraper:
raise HTTPException(status_code=400, detail="No scraper for this source")
try:
result = scraper.test_connection(api_key)
return {"status": "success", "message": result}
except Exception as e:
return {"status": "error", "message": str(e)}