import json from datetime import datetime from app.workers.celery_app import celery_app from app.database import SessionLocal from app.models import Job, Species, Image from app.utils.logging import get_job_logger @celery_app.task(bind=True) def run_scrape_job(self, job_id: int): """Main scrape task that dispatches to source-specific scrapers.""" logger = get_job_logger(job_id) logger.info(f"Starting scrape job {job_id}") db = SessionLocal() job = None try: job = db.query(Job).filter(Job.id == job_id).first() if not job: logger.error(f"Job {job_id} not found") return {"error": "Job not found"} logger.info(f"Job: {job.name}, Source: {job.source}") # Update job status job.status = "running" job.started_at = datetime.utcnow() job.celery_task_id = self.request.id db.commit() # Get species to scrape if job.species_filter: species_ids = json.loads(job.species_filter) query = db.query(Species).filter(Species.id.in_(species_ids)) logger.info(f"Filtered to species IDs: {species_ids}") else: query = db.query(Species) logger.info("Scraping all species") # Filter by image count if requested if job.only_without_images or job.max_images: from sqlalchemy import func # Subquery to count downloaded images per species image_count_subquery = ( db.query(Image.species_id, func.count(Image.id).label("count")) .filter(Image.status == "downloaded") .group_by(Image.species_id) .subquery() ) # Left join with the count subquery query = query.outerjoin( image_count_subquery, Species.id == image_count_subquery.c.species_id ) if job.only_without_images: # Filter where count is NULL or 0 query = query.filter( (image_count_subquery.c.count == None) | (image_count_subquery.c.count == 0) ) logger.info("Filtering to species without images") elif job.max_images: # Filter where count is NULL or less than max_images query = query.filter( (image_count_subquery.c.count == None) | (image_count_subquery.c.count < job.max_images) ) logger.info(f"Filtering to species with fewer than {job.max_images} images") species_list = query.all() logger.info(f"Total species to scrape: {len(species_list)}") job.progress_total = len(species_list) db.commit() # Import scraper based on source from app.scrapers import get_scraper scraper = get_scraper(job.source) if not scraper: error_msg = f"Unknown source: {job.source}" logger.error(error_msg) job.status = "failed" job.error_message = error_msg job.completed_at = datetime.utcnow() db.commit() return {"error": error_msg} logger.info(f"Using scraper: {scraper.name}") # Scrape each species for i, species in enumerate(species_list): try: # Update progress job.progress_current = i + 1 db.commit() logger.info(f"[{i+1}/{len(species_list)}] Scraping: {species.scientific_name}") # Update task state for real-time monitoring self.update_state( state="PROGRESS", meta={ "current": i + 1, "total": len(species_list), "species": species.scientific_name, } ) # Run scraper for this species results = scraper.scrape_species(species, db, logger) downloaded = results.get("downloaded", 0) rejected = results.get("rejected", 0) job.images_downloaded += downloaded job.images_rejected += rejected db.commit() logger.info(f" -> Downloaded: {downloaded}, Rejected: {rejected}") except Exception as e: # Log error but continue with other species logger.error(f"Error scraping {species.scientific_name}: {e}", exc_info=True) continue # Mark job complete job.status = "completed" job.completed_at = datetime.utcnow() db.commit() logger.info(f"Job {job_id} completed. Total downloaded: {job.images_downloaded}, rejected: {job.images_rejected}") return { "status": "completed", "downloaded": job.images_downloaded, "rejected": job.images_rejected, } except Exception as e: logger.error(f"Job {job_id} failed with error: {e}", exc_info=True) if job: job.status = "failed" job.error_message = str(e) job.completed_at = datetime.utcnow() db.commit() raise finally: db.close() @celery_app.task def pause_scrape_job(job_id: int): """Pause a running scrape job.""" db = SessionLocal() try: job = db.query(Job).filter(Job.id == job_id).first() if job and job.status == "running": job.status = "paused" db.commit() # Revoke the Celery task if job.celery_task_id: celery_app.control.revoke(job.celery_task_id, terminate=True) return {"status": "paused"} finally: db.close()