Files
PlantGuideScraper/backend/app/workers/stats_tasks.py
2026-04-12 09:54:27 -05:00

194 lines
6.8 KiB
Python

import json
import os
from datetime import datetime
from pathlib import Path
from sqlalchemy import func, case, text
from app.workers.celery_app import celery_app
from app.database import SessionLocal
from app.models import Species, Image, Job
from app.models.cached_stats import CachedStats
from app.config import get_settings
def get_directory_size_fast(path: str) -> int:
"""Get directory size in bytes using fast os.scandir."""
total = 0
try:
with os.scandir(path) as it:
for entry in it:
try:
if entry.is_file(follow_symlinks=False):
total += entry.stat(follow_symlinks=False).st_size
elif entry.is_dir(follow_symlinks=False):
total += get_directory_size_fast(entry.path)
except (OSError, PermissionError):
pass
except (OSError, PermissionError):
pass
return total
@celery_app.task
def refresh_stats():
"""Calculate and cache dashboard statistics."""
print("=== STATS TASK: Starting refresh ===", flush=True)
db = SessionLocal()
try:
# Use raw SQL for maximum performance on SQLite
# All counts in a single query
counts_sql = text("""
SELECT
(SELECT COUNT(*) FROM species) as total_species,
(SELECT COUNT(*) FROM images) as total_images,
(SELECT COUNT(*) FROM images WHERE status = 'downloaded') as images_downloaded,
(SELECT COUNT(*) FROM images WHERE status = 'pending') as images_pending,
(SELECT COUNT(*) FROM images WHERE status = 'rejected') as images_rejected
""")
counts = db.execute(counts_sql).fetchone()
total_species = counts[0] or 0
total_images = counts[1] or 0
images_downloaded = counts[2] or 0
images_pending = counts[3] or 0
images_rejected = counts[4] or 0
# Per-source stats - single query with GROUP BY
source_sql = text("""
SELECT
source,
COUNT(*) as total,
SUM(CASE WHEN status = 'downloaded' THEN 1 ELSE 0 END) as downloaded,
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
SUM(CASE WHEN status = 'rejected' THEN 1 ELSE 0 END) as rejected
FROM images
GROUP BY source
""")
source_stats_raw = db.execute(source_sql).fetchall()
sources = [
{
"source": s[0],
"image_count": s[1],
"downloaded": s[2] or 0,
"pending": s[3] or 0,
"rejected": s[4] or 0,
}
for s in source_stats_raw
]
# Per-license stats - single indexed query
license_sql = text("""
SELECT license, COUNT(*) as count
FROM images
WHERE status = 'downloaded'
GROUP BY license
""")
license_stats_raw = db.execute(license_sql).fetchall()
licenses = [
{"license": l[0], "count": l[1]}
for l in license_stats_raw
]
# Job stats - single query
job_sql = text("""
SELECT
SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) as running,
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
FROM jobs
""")
job_counts = db.execute(job_sql).fetchone()
jobs = {
"running": job_counts[0] or 0,
"pending": job_counts[1] or 0,
"completed": job_counts[2] or 0,
"failed": job_counts[3] or 0,
}
# Top species by image count - optimized with index
top_sql = text("""
SELECT s.id, s.scientific_name, s.common_name, COUNT(i.id) as image_count
FROM species s
INNER JOIN images i ON i.species_id = s.id AND i.status = 'downloaded'
GROUP BY s.id
ORDER BY image_count DESC
LIMIT 10
""")
top_species_raw = db.execute(top_sql).fetchall()
top_species = [
{
"id": s[0],
"scientific_name": s[1],
"common_name": s[2],
"image_count": s[3],
}
for s in top_species_raw
]
# Under-represented species - use pre-computed counts
under_sql = text("""
SELECT s.id, s.scientific_name, s.common_name, COALESCE(img_counts.cnt, 0) as image_count
FROM species s
LEFT JOIN (
SELECT species_id, COUNT(*) as cnt
FROM images
WHERE status = 'downloaded'
GROUP BY species_id
) img_counts ON img_counts.species_id = s.id
WHERE COALESCE(img_counts.cnt, 0) < 100
ORDER BY image_count ASC
LIMIT 10
""")
under_rep_raw = db.execute(under_sql).fetchall()
under_represented = [
{
"id": s[0],
"scientific_name": s[1],
"common_name": s[2],
"image_count": s[3],
}
for s in under_rep_raw
]
# Calculate disk usage (fast recursive scan)
settings = get_settings()
disk_usage_bytes = get_directory_size_fast(settings.images_path)
disk_usage_mb = round(disk_usage_bytes / (1024 * 1024), 2)
# Build the stats object
stats = {
"total_species": total_species,
"total_images": total_images,
"images_downloaded": images_downloaded,
"images_pending": images_pending,
"images_rejected": images_rejected,
"disk_usage_mb": disk_usage_mb,
"sources": sources,
"licenses": licenses,
"jobs": jobs,
"top_species": top_species,
"under_represented": under_represented,
}
# Store in database
cached = db.query(CachedStats).filter(CachedStats.key == "dashboard_stats").first()
if cached:
cached.value = json.dumps(stats)
cached.updated_at = datetime.utcnow()
else:
cached = CachedStats(key="dashboard_stats", value=json.dumps(stats))
db.add(cached)
db.commit()
print(f"=== STATS TASK: Refreshed (species={total_species}, images={total_images}) ===", flush=True)
return {"status": "success", "total_species": total_species, "total_images": total_images}
except Exception as e:
print(f"=== STATS TASK ERROR: {e} ===", flush=True)
raise
finally:
db.close()