174 lines
4.8 KiB
Python
174 lines
4.8 KiB
Python
import json
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.database import get_db
|
|
from app.models import Job
|
|
from app.schemas.job import JobCreate, JobResponse, JobListResponse
|
|
from app.workers.scrape_tasks import run_scrape_job
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("", response_model=JobListResponse)
|
|
def list_jobs(
|
|
status: Optional[str] = None,
|
|
source: Optional[str] = None,
|
|
limit: int = Query(50, ge=1, le=200),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""List all jobs."""
|
|
query = db.query(Job)
|
|
|
|
if status:
|
|
query = query.filter(Job.status == status)
|
|
|
|
if source:
|
|
query = query.filter(Job.source == source)
|
|
|
|
total = query.count()
|
|
jobs = query.order_by(Job.created_at.desc()).limit(limit).all()
|
|
|
|
return JobListResponse(
|
|
items=[JobResponse.model_validate(j) for j in jobs],
|
|
total=total,
|
|
)
|
|
|
|
|
|
@router.post("", response_model=JobResponse)
|
|
def create_job(job: JobCreate, db: Session = Depends(get_db)):
|
|
"""Create and start a new scrape job."""
|
|
species_filter = None
|
|
if job.species_ids:
|
|
species_filter = json.dumps(job.species_ids)
|
|
|
|
db_job = Job(
|
|
name=job.name,
|
|
source=job.source,
|
|
species_filter=species_filter,
|
|
only_without_images=job.only_without_images,
|
|
max_images=job.max_images,
|
|
status="pending",
|
|
)
|
|
db.add(db_job)
|
|
db.commit()
|
|
db.refresh(db_job)
|
|
|
|
# Start the Celery task
|
|
task = run_scrape_job.delay(db_job.id)
|
|
db_job.celery_task_id = task.id
|
|
db.commit()
|
|
|
|
return JobResponse.model_validate(db_job)
|
|
|
|
|
|
@router.get("/{job_id}", response_model=JobResponse)
|
|
def get_job(job_id: int, db: Session = Depends(get_db)):
|
|
"""Get job status."""
|
|
job = db.query(Job).filter(Job.id == job_id).first()
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
return JobResponse.model_validate(job)
|
|
|
|
|
|
@router.get("/{job_id}/progress")
|
|
def get_job_progress(job_id: int, db: Session = Depends(get_db)):
|
|
"""Get real-time job progress from Celery."""
|
|
from app.workers.celery_app import celery_app
|
|
|
|
job = db.query(Job).filter(Job.id == job_id).first()
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
if not job.celery_task_id:
|
|
return {
|
|
"status": job.status,
|
|
"progress_current": job.progress_current,
|
|
"progress_total": job.progress_total,
|
|
}
|
|
|
|
# Get Celery task state
|
|
result = celery_app.AsyncResult(job.celery_task_id)
|
|
|
|
if result.state == "PROGRESS":
|
|
meta = result.info
|
|
return {
|
|
"status": "running",
|
|
"progress_current": meta.get("current", 0),
|
|
"progress_total": meta.get("total", 0),
|
|
"current_species": meta.get("species", ""),
|
|
}
|
|
|
|
return {
|
|
"status": job.status,
|
|
"progress_current": job.progress_current,
|
|
"progress_total": job.progress_total,
|
|
}
|
|
|
|
|
|
@router.post("/{job_id}/pause")
|
|
def pause_job(job_id: int, db: Session = Depends(get_db)):
|
|
"""Pause a running job."""
|
|
from app.workers.celery_app import celery_app
|
|
|
|
job = db.query(Job).filter(Job.id == job_id).first()
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
if job.status != "running":
|
|
raise HTTPException(status_code=400, detail="Job is not running")
|
|
|
|
# Revoke Celery task
|
|
if job.celery_task_id:
|
|
celery_app.control.revoke(job.celery_task_id, terminate=True)
|
|
|
|
job.status = "paused"
|
|
db.commit()
|
|
|
|
return {"status": "paused"}
|
|
|
|
|
|
@router.post("/{job_id}/resume")
|
|
def resume_job(job_id: int, db: Session = Depends(get_db)):
|
|
"""Resume a paused job."""
|
|
job = db.query(Job).filter(Job.id == job_id).first()
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
if job.status != "paused":
|
|
raise HTTPException(status_code=400, detail="Job is not paused")
|
|
|
|
# Start new Celery task
|
|
task = run_scrape_job.delay(job.id)
|
|
job.celery_task_id = task.id
|
|
job.status = "pending"
|
|
db.commit()
|
|
|
|
return {"status": "resumed"}
|
|
|
|
|
|
@router.post("/{job_id}/cancel")
|
|
def cancel_job(job_id: int, db: Session = Depends(get_db)):
|
|
"""Cancel a job."""
|
|
from app.workers.celery_app import celery_app
|
|
|
|
job = db.query(Job).filter(Job.id == job_id).first()
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
if job.status in ["completed", "failed"]:
|
|
raise HTTPException(status_code=400, detail="Job already finished")
|
|
|
|
# Revoke Celery task
|
|
if job.celery_task_id:
|
|
celery_app.control.revoke(job.celery_task_id, terminate=True)
|
|
|
|
job.status = "failed"
|
|
job.error_message = "Cancelled by user"
|
|
db.commit()
|
|
|
|
return {"status": "cancelled"}
|