Files
2026-04-12 09:54:27 -05:00

174 lines
4.8 KiB
Python

import json
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.database import get_db
from app.models import Job
from app.schemas.job import JobCreate, JobResponse, JobListResponse
from app.workers.scrape_tasks import run_scrape_job
router = APIRouter()
@router.get("", response_model=JobListResponse)
def list_jobs(
status: Optional[str] = None,
source: Optional[str] = None,
limit: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
):
"""List all jobs."""
query = db.query(Job)
if status:
query = query.filter(Job.status == status)
if source:
query = query.filter(Job.source == source)
total = query.count()
jobs = query.order_by(Job.created_at.desc()).limit(limit).all()
return JobListResponse(
items=[JobResponse.model_validate(j) for j in jobs],
total=total,
)
@router.post("", response_model=JobResponse)
def create_job(job: JobCreate, db: Session = Depends(get_db)):
"""Create and start a new scrape job."""
species_filter = None
if job.species_ids:
species_filter = json.dumps(job.species_ids)
db_job = Job(
name=job.name,
source=job.source,
species_filter=species_filter,
only_without_images=job.only_without_images,
max_images=job.max_images,
status="pending",
)
db.add(db_job)
db.commit()
db.refresh(db_job)
# Start the Celery task
task = run_scrape_job.delay(db_job.id)
db_job.celery_task_id = task.id
db.commit()
return JobResponse.model_validate(db_job)
@router.get("/{job_id}", response_model=JobResponse)
def get_job(job_id: int, db: Session = Depends(get_db)):
"""Get job status."""
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return JobResponse.model_validate(job)
@router.get("/{job_id}/progress")
def get_job_progress(job_id: int, db: Session = Depends(get_db)):
"""Get real-time job progress from Celery."""
from app.workers.celery_app import celery_app
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if not job.celery_task_id:
return {
"status": job.status,
"progress_current": job.progress_current,
"progress_total": job.progress_total,
}
# Get Celery task state
result = celery_app.AsyncResult(job.celery_task_id)
if result.state == "PROGRESS":
meta = result.info
return {
"status": "running",
"progress_current": meta.get("current", 0),
"progress_total": meta.get("total", 0),
"current_species": meta.get("species", ""),
}
return {
"status": job.status,
"progress_current": job.progress_current,
"progress_total": job.progress_total,
}
@router.post("/{job_id}/pause")
def pause_job(job_id: int, db: Session = Depends(get_db)):
"""Pause a running job."""
from app.workers.celery_app import celery_app
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if job.status != "running":
raise HTTPException(status_code=400, detail="Job is not running")
# Revoke Celery task
if job.celery_task_id:
celery_app.control.revoke(job.celery_task_id, terminate=True)
job.status = "paused"
db.commit()
return {"status": "paused"}
@router.post("/{job_id}/resume")
def resume_job(job_id: int, db: Session = Depends(get_db)):
"""Resume a paused job."""
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if job.status != "paused":
raise HTTPException(status_code=400, detail="Job is not paused")
# Start new Celery task
task = run_scrape_job.delay(job.id)
job.celery_task_id = task.id
job.status = "pending"
db.commit()
return {"status": "resumed"}
@router.post("/{job_id}/cancel")
def cancel_job(job_id: int, db: Session = Depends(get_db)):
"""Cancel a job."""
from app.workers.celery_app import celery_app
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if job.status in ["completed", "failed"]:
raise HTTPException(status_code=400, detail="Job already finished")
# Revoke Celery task
if job.celery_task_id:
celery_app.control.revoke(job.celery_task_id, terminate=True)
job.status = "failed"
job.error_message = "Cancelled by user"
db.commit()
return {"status": "cancelled"}