225 lines
7.0 KiB
Python
225 lines
7.0 KiB
Python
import os
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
from PIL import Image as PILImage
|
|
import imagehash
|
|
import numpy as np
|
|
from scipy import ndimage
|
|
|
|
from app.workers.celery_app import celery_app
|
|
from app.database import SessionLocal
|
|
from app.models import Image
|
|
from app.config import get_settings
|
|
|
|
settings = get_settings()
|
|
|
|
|
|
def calculate_blur_score(image_path: str) -> float:
|
|
"""Calculate blur score using Laplacian variance. Higher = sharper."""
|
|
try:
|
|
img = PILImage.open(image_path).convert("L")
|
|
img_array = np.array(img)
|
|
laplacian = ndimage.laplace(img_array)
|
|
return float(np.var(laplacian))
|
|
except Exception:
|
|
return 0.0
|
|
|
|
|
|
def calculate_phash(image_path: str) -> str:
|
|
"""Calculate perceptual hash for deduplication."""
|
|
try:
|
|
img = PILImage.open(image_path)
|
|
return str(imagehash.phash(img))
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def check_color_distribution(image_path: str) -> tuple[bool, str]:
|
|
"""Check if image has healthy color distribution for a plant photo.
|
|
|
|
Returns (passed, reason) tuple.
|
|
Rejects:
|
|
- Low color variance (mean channel std < 25): herbarium specimens (brown on white)
|
|
- No green + low variance (green ratio < 5% AND mean std < 40): monochrome illustrations
|
|
"""
|
|
try:
|
|
img = PILImage.open(image_path).convert("RGB")
|
|
arr = np.array(img, dtype=np.float64)
|
|
|
|
# Per-channel standard deviation
|
|
channel_stds = arr.std(axis=(0, 1)) # [R_std, G_std, B_std]
|
|
mean_std = float(channel_stds.mean())
|
|
|
|
if mean_std < 25:
|
|
return False, f"Low color variance ({mean_std:.1f})"
|
|
|
|
# Check green ratio
|
|
channel_means = arr.mean(axis=(0, 1))
|
|
total = channel_means.sum()
|
|
green_ratio = channel_means[1] / total if total > 0 else 0
|
|
|
|
if green_ratio < 0.05 and mean_std < 40:
|
|
return False, f"No green ({green_ratio:.2%}) + low variance ({mean_std:.1f})"
|
|
|
|
return True, ""
|
|
except Exception:
|
|
return True, "" # Don't reject on error
|
|
|
|
|
|
def resize_image(image_path: str, target_size: int = 512) -> bool:
|
|
"""Resize image to target size while maintaining aspect ratio."""
|
|
try:
|
|
img = PILImage.open(image_path)
|
|
img.thumbnail((target_size, target_size), PILImage.Resampling.LANCZOS)
|
|
img.save(image_path, quality=95)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
@celery_app.task
|
|
def download_and_process_image(image_id: int):
|
|
"""Download image, check quality, dedupe, and resize."""
|
|
db = SessionLocal()
|
|
try:
|
|
image = db.query(Image).filter(Image.id == image_id).first()
|
|
if not image:
|
|
return {"error": "Image not found"}
|
|
|
|
# Create directory for species
|
|
species = image.species
|
|
species_dir = Path(settings.images_path) / species.scientific_name.replace(" ", "_")
|
|
species_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Download image
|
|
filename = f"{image.source}_{image.source_id or image.id}.jpg"
|
|
local_path = species_dir / filename
|
|
|
|
try:
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_3) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15"
|
|
}
|
|
with httpx.Client(timeout=30, headers=headers, follow_redirects=True) as client:
|
|
response = client.get(image.url)
|
|
response.raise_for_status()
|
|
|
|
with open(local_path, "wb") as f:
|
|
f.write(response.content)
|
|
except Exception as e:
|
|
image.status = "rejected"
|
|
db.commit()
|
|
return {"error": f"Download failed: {e}"}
|
|
|
|
# Check minimum size
|
|
try:
|
|
with PILImage.open(local_path) as img:
|
|
width, height = img.size
|
|
if width < 256 or height < 256:
|
|
os.remove(local_path)
|
|
image.status = "rejected"
|
|
db.commit()
|
|
return {"error": "Image too small"}
|
|
image.width = width
|
|
image.height = height
|
|
except Exception as e:
|
|
if local_path.exists():
|
|
os.remove(local_path)
|
|
image.status = "rejected"
|
|
db.commit()
|
|
return {"error": f"Invalid image: {e}"}
|
|
|
|
# Calculate perceptual hash for deduplication
|
|
phash = calculate_phash(str(local_path))
|
|
if phash:
|
|
# Check for duplicates
|
|
existing = db.query(Image).filter(
|
|
Image.phash == phash,
|
|
Image.id != image.id,
|
|
Image.status == "downloaded"
|
|
).first()
|
|
|
|
if existing:
|
|
os.remove(local_path)
|
|
image.status = "rejected"
|
|
image.phash = phash
|
|
db.commit()
|
|
return {"error": "Duplicate image"}
|
|
|
|
image.phash = phash
|
|
|
|
# Calculate blur score
|
|
quality_score = calculate_blur_score(str(local_path))
|
|
image.quality_score = quality_score
|
|
|
|
# Reject very blurry images (threshold can be tuned)
|
|
if quality_score < 100: # Low variance = blurry
|
|
os.remove(local_path)
|
|
image.status = "rejected"
|
|
db.commit()
|
|
return {"error": "Image too blurry"}
|
|
|
|
# Check color distribution (reject herbarium specimens, illustrations)
|
|
color_ok, color_reason = check_color_distribution(str(local_path))
|
|
if not color_ok:
|
|
os.remove(local_path)
|
|
image.status = "rejected"
|
|
db.commit()
|
|
return {"error": f"Non-photo content: {color_reason}"}
|
|
|
|
# Resize to 512x512 max
|
|
resize_image(str(local_path))
|
|
|
|
# Update image record
|
|
image.local_path = str(local_path)
|
|
image.status = "downloaded"
|
|
db.commit()
|
|
|
|
return {
|
|
"status": "success",
|
|
"path": str(local_path),
|
|
"quality_score": quality_score,
|
|
}
|
|
|
|
except Exception as e:
|
|
if image:
|
|
image.status = "rejected"
|
|
db.commit()
|
|
return {"error": str(e)}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(bind=True)
|
|
def batch_process_pending_images(self, source: str = None, chunk_size: int = 500):
|
|
"""Process ALL pending images in chunks, with progress tracking."""
|
|
db = SessionLocal()
|
|
try:
|
|
query = db.query(Image).filter(Image.status == "pending")
|
|
if source:
|
|
query = query.filter(Image.source == source)
|
|
|
|
total = query.count()
|
|
queued = 0
|
|
offset = 0
|
|
|
|
while offset < total:
|
|
chunk = query.order_by(Image.id).offset(offset).limit(chunk_size).all()
|
|
if not chunk:
|
|
break
|
|
|
|
for image in chunk:
|
|
download_and_process_image.delay(image.id)
|
|
queued += 1
|
|
|
|
offset += len(chunk)
|
|
|
|
self.update_state(
|
|
state="PROGRESS",
|
|
meta={"queued": queued, "total": total},
|
|
)
|
|
|
|
return {"queued": queued, "total": total}
|
|
finally:
|
|
db.close()
|