import json import os from datetime import datetime from pathlib import Path from sqlalchemy import func, case, text from app.workers.celery_app import celery_app from app.database import SessionLocal from app.models import Species, Image, Job from app.models.cached_stats import CachedStats from app.config import get_settings def get_directory_size_fast(path: str) -> int: """Get directory size in bytes using fast os.scandir.""" total = 0 try: with os.scandir(path) as it: for entry in it: try: if entry.is_file(follow_symlinks=False): total += entry.stat(follow_symlinks=False).st_size elif entry.is_dir(follow_symlinks=False): total += get_directory_size_fast(entry.path) except (OSError, PermissionError): pass except (OSError, PermissionError): pass return total @celery_app.task def refresh_stats(): """Calculate and cache dashboard statistics.""" print("=== STATS TASK: Starting refresh ===", flush=True) db = SessionLocal() try: # Use raw SQL for maximum performance on SQLite # All counts in a single query counts_sql = text(""" SELECT (SELECT COUNT(*) FROM species) as total_species, (SELECT COUNT(*) FROM images) as total_images, (SELECT COUNT(*) FROM images WHERE status = 'downloaded') as images_downloaded, (SELECT COUNT(*) FROM images WHERE status = 'pending') as images_pending, (SELECT COUNT(*) FROM images WHERE status = 'rejected') as images_rejected """) counts = db.execute(counts_sql).fetchone() total_species = counts[0] or 0 total_images = counts[1] or 0 images_downloaded = counts[2] or 0 images_pending = counts[3] or 0 images_rejected = counts[4] or 0 # Per-source stats - single query with GROUP BY source_sql = text(""" SELECT source, COUNT(*) as total, SUM(CASE WHEN status = 'downloaded' THEN 1 ELSE 0 END) as downloaded, SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending, SUM(CASE WHEN status = 'rejected' THEN 1 ELSE 0 END) as rejected FROM images GROUP BY source """) source_stats_raw = db.execute(source_sql).fetchall() sources = [ { "source": s[0], "image_count": s[1], "downloaded": s[2] or 0, "pending": s[3] or 0, "rejected": s[4] or 0, } for s in source_stats_raw ] # Per-license stats - single indexed query license_sql = text(""" SELECT license, COUNT(*) as count FROM images WHERE status = 'downloaded' GROUP BY license """) license_stats_raw = db.execute(license_sql).fetchall() licenses = [ {"license": l[0], "count": l[1]} for l in license_stats_raw ] # Job stats - single query job_sql = text(""" SELECT SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) as running, SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed FROM jobs """) job_counts = db.execute(job_sql).fetchone() jobs = { "running": job_counts[0] or 0, "pending": job_counts[1] or 0, "completed": job_counts[2] or 0, "failed": job_counts[3] or 0, } # Top species by image count - optimized with index top_sql = text(""" SELECT s.id, s.scientific_name, s.common_name, COUNT(i.id) as image_count FROM species s INNER JOIN images i ON i.species_id = s.id AND i.status = 'downloaded' GROUP BY s.id ORDER BY image_count DESC LIMIT 10 """) top_species_raw = db.execute(top_sql).fetchall() top_species = [ { "id": s[0], "scientific_name": s[1], "common_name": s[2], "image_count": s[3], } for s in top_species_raw ] # Under-represented species - use pre-computed counts under_sql = text(""" SELECT s.id, s.scientific_name, s.common_name, COALESCE(img_counts.cnt, 0) as image_count FROM species s LEFT JOIN ( SELECT species_id, COUNT(*) as cnt FROM images WHERE status = 'downloaded' GROUP BY species_id ) img_counts ON img_counts.species_id = s.id WHERE COALESCE(img_counts.cnt, 0) < 100 ORDER BY image_count ASC LIMIT 10 """) under_rep_raw = db.execute(under_sql).fetchall() under_represented = [ { "id": s[0], "scientific_name": s[1], "common_name": s[2], "image_count": s[3], } for s in under_rep_raw ] # Calculate disk usage (fast recursive scan) settings = get_settings() disk_usage_bytes = get_directory_size_fast(settings.images_path) disk_usage_mb = round(disk_usage_bytes / (1024 * 1024), 2) # Build the stats object stats = { "total_species": total_species, "total_images": total_images, "images_downloaded": images_downloaded, "images_pending": images_pending, "images_rejected": images_rejected, "disk_usage_mb": disk_usage_mb, "sources": sources, "licenses": licenses, "jobs": jobs, "top_species": top_species, "under_represented": under_represented, } # Store in database cached = db.query(CachedStats).filter(CachedStats.key == "dashboard_stats").first() if cached: cached.value = json.dumps(stats) cached.updated_at = datetime.utcnow() else: cached = CachedStats(key="dashboard_stats", value=json.dumps(stats)) db.add(cached) db.commit() print(f"=== STATS TASK: Refreshed (species={total_species}, images={total_images}) ===", flush=True) return {"status": "success", "total_species": total_species, "total_images": total_images} except Exception as e: print(f"=== STATS TASK ERROR: {e} ===", flush=True) raise finally: db.close()