Files
honeyDueAPI/internal/worker/jobs/handler.go
T
Trey t 52bf1ff3c7
Backend CI / Test (push) Has been cancelled
Backend CI / Contract Tests (push) Has been cancelled
Backend CI / Lint (push) Has been cancelled
Backend CI / Secret Scanning (push) Has been cancelled
Backend CI / Build (push) Has been cancelled
perf(task): offload completion notification fan-out to Asynq worker
POST /api/task-completions/ was spending ~1.5-1.75s synchronously on
APNs push + SMTP email + B2 image fetches inside sendTaskCompletedNotification.
Per-user loop made it scale linearly with residence membership; one image
attached + one residence user is the 1.75s baseline observed in the live
honeydue-eli5-overview Grafana panel.

Replace the inline call (and the fire-and-forget goroutine in QuickComplete,
which violated the project's "no goroutines in handlers" rule) with an
Asynq job:

  - new task type notification:task_completed (worker/scheduler.go)
  - new payload {task_id, completion_id} — IDs only, worker re-reads
    canonical state from Postgres so concurrent edits between enqueue
    and dequeue are reflected
  - new HandleTaskCompletedNotification on jobs.Handler delegates to
    TaskService.SendTaskCompletedNotificationByID
  - new dispatchTaskCompletedNotification in task_service.go picks
    between enqueue (preferred) and inline (fallback) when Redis is
    unreachable or the enqueuer isn't wired (tests / local dev)

Other changes required to wire it up:

  - widen worker.NewTaskClient signature to accept asynq.RedisClientOpt
    so the file-mounted Redis password (audit HIGH-1) can be supplied;
    no prior callers, no breakage
  - extend worker.Enqueuer interface with EnqueueTaskCompletedNotification
  - add TaskEnqueuer field to router.Dependencies; wire from cmd/api/main.go
    with the standard typed-nil interface guard
  - wire a worker-side TaskService in cmd/worker/main.go so the handler
    can use the shared SendTaskCompletedNotificationByID implementation
    (storage service shared with the existing upload-cleanup wiring)

Expected impact on POST /api/task-completions/ p50:
  ~1.75s -> ~120-170ms (DB + tx + Asynq enqueue only)

Notifications still deliver; they just go via the worker instead of in
the request path. MaxRetry=3; "row not found" returns nil so a deleted
task/completion doesn't churn the retry loop.

All 31 test packages pass. No DB migrations.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-03 09:34:52 -05:00

733 lines
24 KiB
Go

package jobs
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/hibiken/asynq"
"github.com/rs/zerolog/log"
"gorm.io/gorm"
"github.com/treytartt/honeydue-api/internal/config"
"github.com/treytartt/honeydue-api/internal/models"
"github.com/treytartt/honeydue-api/internal/notifications"
"github.com/treytartt/honeydue-api/internal/push"
"github.com/treytartt/honeydue-api/internal/repositories"
"github.com/treytartt/honeydue-api/internal/services"
"github.com/treytartt/honeydue-api/internal/worker"
)
// Task types
const (
TypeSmartReminder = "notification:smart_reminder" // Frequency-aware reminders
TypeDailyDigest = "notification:daily_digest"
TypeSendEmail = "email:send"
TypeSendPush = "push:send"
TypeOnboardingEmails = "email:onboarding"
TypeReminderLogCleanup = "maintenance:reminder_log_cleanup"
TypeUploadCleanup = "maintenance:upload_cleanup" // Reaps expired pending_uploads
)
// Handler handles background job processing
type Handler struct {
db *gorm.DB
taskRepo TaskRepo
residenceRepo ResidenceRepo
reminderRepo ReminderRepo
notificationRepo NotificationRepo
pushClient PushSender
emailService EmailSender
notificationService NotificationSender
onboardingService OnboardingEmailSender
uploadService *services.UploadService
taskService *services.TaskService
config *config.Config
}
// SetUploadService wires the upload service so HandleUploadCleanup can reap
// expired pending_uploads rows. Optional; nil-safe — the cleanup handler
// no-ops when not configured (e.g. local-disk dev environments).
func (h *Handler) SetUploadService(us *services.UploadService) {
h.uploadService = us
}
// SetTaskService wires the api-side TaskService so HandleTaskCompletedNotification
// can re-use the same SendTaskCompletedNotificationByID logic the inline path
// used to call. Required for the task-completed notification job; without it
// the handler logs a warning and no-ops (notifications silently dropped).
func (h *Handler) SetTaskService(ts *services.TaskService) {
h.taskService = ts
}
// NewHandler creates a new job handler
func NewHandler(db *gorm.DB, pushClient *push.Client, emailService *services.EmailService, notificationService *services.NotificationService, cfg *config.Config) *Handler {
h := &Handler{
db: db,
taskRepo: repositories.NewTaskRepository(db),
residenceRepo: repositories.NewResidenceRepository(db),
reminderRepo: repositories.NewReminderRepository(db),
notificationRepo: repositories.NewNotificationRepository(db),
config: cfg,
}
// Assign interface fields only when concrete values are non-nil
// to preserve correct nil checks on the interface values.
if pushClient != nil {
h.pushClient = pushClient
}
if emailService != nil {
h.emailService = emailService
h.onboardingService = services.NewOnboardingEmailService(db, emailService, cfg.Server.BaseURL)
}
if notificationService != nil {
h.notificationService = notificationService
}
return h
}
// HandleDailyDigest processes daily digest notifications with task statistics
func (h *Handler) HandleDailyDigest(ctx context.Context, task *asynq.Task) error {
log.Info().Msg("Processing daily digest notifications...")
now := time.Now().UTC()
currentHour := now.Hour()
systemDefaultHour := h.config.Worker.DailyNotifHour
log.Info().Int("current_hour", currentHour).Int("system_default_hour", systemDefaultHour).Msg("Daily digest check")
// Step 1: Find users who should receive daily digest THIS hour
// Logic: Each user gets notified ONCE per day at exactly ONE hour:
// - If user has custom hour set: notify ONLY at that custom hour
// - If user has NO custom hour (NULL): notify ONLY at system default hour
var eligibleUserIDs []uint
query := h.db.Model(&models.NotificationPreference{}).
Select("user_id").
Where("daily_digest = true")
if currentHour == systemDefaultHour {
// At system default hour: notify users who have NO custom hour (NULL) OR whose custom hour equals default
query = query.Where("daily_digest_hour IS NULL OR daily_digest_hour = ?", currentHour)
} else {
// At non-default hour: only notify users who have this specific custom hour set
query = query.Where("daily_digest_hour = ?", currentHour)
}
err := query.Pluck("user_id", &eligibleUserIDs).Error
if err != nil {
log.Error().Err(err).Msg("Failed to query eligible users for daily digest")
return err
}
// Early exit if no users need notifications this hour
if len(eligibleUserIDs) == 0 {
log.Debug().Int("hour", currentHour).Msg("No users scheduled for daily digest notifications this hour")
return nil
}
log.Info().Int("eligible_users", len(eligibleUserIDs)).Msg("Found users eligible for daily digest this hour")
// Step 2: Get task statistics for each user using canonical repository functions
// This ensures consistency with kanban display - uses the same scopes:
// - task.ScopeOverdue (excludes in-progress tasks)
// - task.ScopeDueSoon (7 days for "due this week")
var usersNotified int
for _, userID := range eligibleUserIDs {
// Get user's timezone from notification preferences for accurate overdue calculation
// This ensures the daily digest matches what the user sees in the kanban UI
var userNow time.Time
if prefs, err := h.notificationRepo.FindPreferencesByUser(userID); err == nil && prefs.Timezone != nil {
if loc, err := time.LoadLocation(*prefs.Timezone); err == nil {
// Use start of day in user's timezone (matches kanban behavior)
userNowInTz := time.Now().In(loc)
userNow = time.Date(userNowInTz.Year(), userNowInTz.Month(), userNowInTz.Day(), 0, 0, 0, 0, loc)
log.Debug().Uint("user_id", userID).Str("timezone", *prefs.Timezone).Time("user_now", userNow).Msg("Using user timezone for daily digest")
} else {
userNow = now // Fallback to UTC
}
} else {
userNow = now // Fallback to UTC if no timezone stored
}
// Get user's residence IDs (owned + member)
residenceIDs, err := h.residenceRepo.FindResidenceIDsByUser(userID)
if err != nil {
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get user residences")
continue
}
if len(residenceIDs) == 0 {
continue // User has no residences
}
// Query overdue tasks using canonical scopes (excludes in-progress)
// Uses userNow (timezone-aware) for accurate overdue detection
opts := repositories.TaskFilterOptions{
ResidenceIDs: residenceIDs,
IncludeInProgress: false, // Match kanban: in-progress tasks not in overdue column
PreloadCompletions: true,
}
overdueTasks, err := h.taskRepo.GetOverdueTasks(userNow, opts)
if err != nil {
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get overdue tasks")
continue
}
// Query due-this-week tasks (7 days threshold)
dueSoonTasks, err := h.taskRepo.GetDueSoonTasks(userNow, 7, opts)
if err != nil {
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get due-soon tasks")
continue
}
overdueCount := len(overdueTasks)
dueThisWeekCount := len(dueSoonTasks)
// Skip users with no actionable items
if overdueCount == 0 && dueThisWeekCount == 0 {
continue
}
// Build notification message
title := "Daily Task Summary"
var body string
if overdueCount > 0 && dueThisWeekCount > 0 {
body = fmt.Sprintf("You have %d overdue task(s) and %d task(s) due this week", overdueCount, dueThisWeekCount)
} else if overdueCount > 0 {
body = fmt.Sprintf("You have %d overdue task(s) that need attention", overdueCount)
} else {
body = fmt.Sprintf("You have %d task(s) due this week", dueThisWeekCount)
}
// Send push notification
if err := h.sendPushToUser(ctx, userID, title, body, map[string]string{
"type": "daily_digest",
"overdue": fmt.Sprintf("%d", overdueCount),
"due_this_week": fmt.Sprintf("%d", dueThisWeekCount),
}); err != nil {
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to send daily digest push")
} else {
usersNotified++
}
}
log.Info().Int("users_notified", usersNotified).Msg("Daily digest notifications completed")
return nil
}
// EmailPayload represents the payload for email tasks
type EmailPayload struct {
To string `json:"to"`
Subject string `json:"subject"`
HTMLBody string `json:"html_body"`
TextBody string `json:"text_body"`
}
// HandleSendEmail processes email sending tasks
func (h *Handler) HandleSendEmail(ctx context.Context, task *asynq.Task) error {
var payload EmailPayload
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
return fmt.Errorf("failed to unmarshal payload: %w", err)
}
log.Info().
Str("to", payload.To).
Str("subject", payload.Subject).
Msg("Sending email...")
if h.emailService == nil {
log.Warn().Msg("Email service not configured, skipping email")
return nil
}
// Use the email service to send the email
if err := h.emailService.SendEmail(payload.To, payload.Subject, payload.HTMLBody, payload.TextBody); err != nil {
log.Error().Err(err).Str("to", payload.To).Msg("Failed to send email")
return err
}
log.Info().Str("to", payload.To).Msg("Email sent successfully")
return nil
}
// PushPayload represents the payload for push notification tasks
type PushPayload struct {
UserID uint `json:"user_id"`
Title string `json:"title"`
Message string `json:"message"`
Data map[string]string `json:"data,omitempty"`
}
// HandleSendPush processes push notification tasks
func (h *Handler) HandleSendPush(ctx context.Context, task *asynq.Task) error {
var payload PushPayload
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
return fmt.Errorf("failed to unmarshal payload: %w", err)
}
log.Info().
Uint("user_id", payload.UserID).
Str("title", payload.Title).
Msg("Sending push notification...")
if err := h.sendPushToUser(ctx, payload.UserID, payload.Title, payload.Message, payload.Data); err != nil {
log.Error().Err(err).Uint("user_id", payload.UserID).Msg("Failed to send push notification")
return err
}
log.Info().Uint("user_id", payload.UserID).Msg("Push notification sent successfully")
return nil
}
// sendPushToUser sends a push notification to all of a user's devices
func (h *Handler) sendPushToUser(ctx context.Context, userID uint, title, message string, data map[string]string) error {
if h.pushClient == nil {
log.Warn().Msg("Push client not configured, skipping notification")
return nil
}
// Get active device tokens via repository
iosTokens, androidTokens, err := h.notificationRepo.GetActiveTokensForUser(userID)
if err != nil {
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get device tokens")
return err
}
if len(iosTokens) == 0 && len(androidTokens) == 0 {
log.Debug().Uint("user_id", userID).Msg("No device tokens found for user")
return nil
}
log.Debug().
Uint("user_id", userID).
Int("ios_tokens", len(iosTokens)).
Int("android_tokens", len(androidTokens)).
Msg("Sending push to user devices")
// Send to all devices
return h.pushClient.SendToAll(ctx, iosTokens, androidTokens, title, message, data)
}
// NewSendEmailTask creates a new email sending task
func NewSendEmailTask(to, subject, htmlBody, textBody string) (*asynq.Task, error) {
payload, err := json.Marshal(EmailPayload{
To: to,
Subject: subject,
HTMLBody: htmlBody,
TextBody: textBody,
})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeSendEmail, payload), nil
}
// NewSendPushTask creates a new push notification task
func NewSendPushTask(userID uint, title, message string, data map[string]string) (*asynq.Task, error) {
payload, err := json.Marshal(PushPayload{
UserID: userID,
Title: title,
Message: message,
Data: data,
})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeSendPush, payload), nil
}
// HandleOnboardingEmails processes onboarding email campaigns
// Sends emails to:
// 1. Users who registered 2+ days ago but haven't created a residence
// 2. Users who created a residence 5+ days ago but haven't created any tasks
// Each email type is only sent once per user, ever.
func (h *Handler) HandleOnboardingEmails(ctx context.Context, task *asynq.Task) error {
if !h.config.Features.OnboardingEmailsEnabled {
log.Debug().Msg("Onboarding emails disabled by feature flag, skipping")
return nil
}
log.Info().Msg("Processing onboarding emails...")
if h.onboardingService == nil {
log.Warn().Msg("Onboarding email service not configured, skipping")
return nil
}
// Send no-residence emails (users without any residences after 2 days)
noResCount, noResErr := h.onboardingService.CheckAndSendNoResidenceEmails()
if noResErr != nil {
log.Error().Err(noResErr).Msg("Failed to process no-residence onboarding emails")
} else {
log.Info().Int("count", noResCount).Msg("Sent no-residence onboarding emails")
}
// Send no-tasks emails (users with residence but no tasks after 5 days)
noTasksCount, noTasksErr := h.onboardingService.CheckAndSendNoTasksEmails()
if noTasksErr != nil {
log.Error().Err(noTasksErr).Msg("Failed to process no-tasks onboarding emails")
} else {
log.Info().Int("count", noTasksCount).Msg("Sent no-tasks onboarding emails")
}
log.Info().
Int("no_residence_sent", noResCount).
Int("no_tasks_sent", noTasksCount).
Msg("Onboarding email processing completed")
// If all sub-tasks failed, return an error so Asynq retries
if noResErr != nil && noTasksErr != nil {
return fmt.Errorf("all onboarding email sub-tasks failed: no-residence: %w, no-tasks: %v", noResErr, noTasksErr)
}
return nil
}
// userReminderPrefs holds a user's notification preferences for smart reminders
type userReminderPrefs struct {
UserID uint `gorm:"column:user_id"`
WantsDueSoon bool `gorm:"column:wants_due_soon"`
WantsOverdue bool `gorm:"column:wants_overdue"`
}
// HandleSmartReminder processes frequency-aware task reminders.
// Features:
// 1. Single query to get users who want either notification type at current hour
// 2. Single query to get both due-soon AND overdue tasks for those users
// 3. Uses frequency-based schedules (weekly = day-of only, annual = 30d, 14d, 7d, day-of)
// 4. Tracks sent reminders to prevent duplicates
// 5. Tapers off overdue reminders (daily for 3 days, then every 3 days, stop after 14)
func (h *Handler) HandleSmartReminder(ctx context.Context, task *asynq.Task) error {
log.Info().Msg("Processing smart task reminders...")
now := time.Now().UTC()
currentHour := now.Hour()
dueSoonDefault := h.config.Worker.TaskReminderHour
overdueDefault := h.config.Worker.OverdueReminderHour
log.Info().
Int("current_hour", currentHour).
Int("due_soon_default", dueSoonDefault).
Int("overdue_default", overdueDefault).
Msg("Smart reminder check")
// Step 1: Single query to get all users who want ANY notification type at this hour
// Each user gets flags for which types they want
var userPrefs []userReminderPrefs
// Build hour matching conditions (reused in SELECT and WHERE)
// User matches if: they set this hour explicitly, OR they have no preference and current hour is the default
dueSoonHourMatch := fmt.Sprintf(
"(task_due_soon_hour IS NULL AND %d = %d) OR task_due_soon_hour = %d",
currentHour, dueSoonDefault, currentHour,
)
overdueHourMatch := fmt.Sprintf(
"(task_overdue_hour IS NULL AND %d = %d) OR task_overdue_hour = %d",
currentHour, overdueDefault, currentHour,
)
query := fmt.Sprintf(`
SELECT
user_id,
(task_due_soon = true AND (%s)) as wants_due_soon,
(task_overdue = true AND (%s)) as wants_overdue
FROM notifications_notificationpreference
WHERE (task_due_soon = true AND (%s))
OR (task_overdue = true AND (%s))
`, dueSoonHourMatch, overdueHourMatch, dueSoonHourMatch, overdueHourMatch)
err := h.db.Raw(query).Scan(&userPrefs).Error
if err != nil {
log.Error().Err(err).Msg("Failed to query user notification preferences")
return err
}
if len(userPrefs) == 0 {
log.Debug().Int("hour", currentHour).Msg("No users scheduled for any reminder type this hour")
return nil
}
// Build lookup maps for quick access
userWantsDueSoon := make(map[uint]bool)
userWantsOverdue := make(map[uint]bool)
var allUserIDs []uint
for _, pref := range userPrefs {
allUserIDs = append(allUserIDs, pref.UserID)
if pref.WantsDueSoon {
userWantsDueSoon[pref.UserID] = true
}
if pref.WantsOverdue {
userWantsOverdue[pref.UserID] = true
}
}
log.Info().
Int("total_users", len(allUserIDs)).
Int("want_due_soon", len(userWantsDueSoon)).
Int("want_overdue", len(userWantsOverdue)).
Msg("Found users eligible for reminders")
// Build per-user "today" using their timezone preference
// This ensures reminder stage calculations (overdue, due-soon) match the user's local date
userToday := make(map[uint]time.Time, len(allUserIDs))
utcToday := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
for _, uid := range allUserIDs {
prefs, err := h.notificationRepo.FindPreferencesByUser(uid)
if err == nil && prefs.Timezone != nil {
if loc, locErr := time.LoadLocation(*prefs.Timezone); locErr == nil {
userNowInTz := time.Now().In(loc)
userToday[uid] = time.Date(userNowInTz.Year(), userNowInTz.Month(), userNowInTz.Day(), 0, 0, 0, 0, loc)
continue
}
}
userToday[uid] = utcToday // Fallback to UTC
}
// Step 2: Single query to get ALL active tasks (both due-soon and overdue) for these users
opts := repositories.TaskFilterOptions{
UserIDs: allUserIDs,
IncludeInProgress: true,
PreloadResidence: true,
PreloadCompletions: true,
PreloadFrequency: true,
}
activeTasks, err := h.taskRepo.GetActiveTasksForUsers(now, opts)
if err != nil {
log.Error().Err(err).Msg("Failed to query active tasks")
return err
}
log.Info().Int("count", len(activeTasks)).Msg("Found active tasks for eligible users")
// Step 3: Pre-process tasks to determine stages and build batch reminder check
type candidateReminder struct {
taskIndex int
userID uint
effectiveDate time.Time
stage string
isOverdue bool
reminderStage models.ReminderStage
}
var candidates []candidateReminder
var reminderKeys []repositories.ReminderKey
for i, t := range activeTasks {
// Determine which user to notify
var userID uint
if t.AssignedToID != nil {
userID = *t.AssignedToID
} else if t.Residence.ID != 0 {
userID = t.Residence.OwnerID
} else {
continue
}
// Get the effective due date
var effectiveDate time.Time
if t.NextDueDate != nil {
effectiveDate = *t.NextDueDate
} else if t.DueDate != nil {
effectiveDate = *t.DueDate
} else {
continue
}
// Get frequency interval days
var frequencyDays *int
if t.Frequency != nil && t.Frequency.Days != nil {
days := int(*t.Frequency.Days)
frequencyDays = &days
} else if t.CustomIntervalDays != nil {
days := int(*t.CustomIntervalDays)
frequencyDays = &days
}
// Determine which reminder stage applies today using the user's local date
today := userToday[userID]
stage := notifications.GetReminderStageForToday(effectiveDate, frequencyDays, today)
if stage == "" {
continue
}
// Determine if this is an overdue or due-soon stage
isOverdueStage := len(stage) >= 7 && stage[:7] == "overdue"
// Check if user wants this notification type
if isOverdueStage && !userWantsOverdue[userID] {
continue
}
if !isOverdueStage && !userWantsDueSoon[userID] {
continue
}
reminderStage := models.ReminderStage(stage)
candidates = append(candidates, candidateReminder{
taskIndex: i,
userID: userID,
effectiveDate: effectiveDate,
stage: stage,
isOverdue: isOverdueStage,
reminderStage: reminderStage,
})
reminderKeys = append(reminderKeys, repositories.ReminderKey{
TaskID: t.ID,
UserID: userID,
DueDate: effectiveDate,
Stage: reminderStage,
})
}
// Batch check which reminders have already been sent (single query)
alreadySentMap, err := h.reminderRepo.HasSentReminderBatch(reminderKeys)
if err != nil {
log.Error().Err(err).Msg("Failed to batch check reminder logs")
return err
}
// Step 4: Send notifications for candidates that haven't been sent yet
var dueSoonSent, dueSoonSkipped, overdueSent, overdueSkipped int
for i, c := range candidates {
if alreadySentMap[i] {
if c.isOverdue {
overdueSkipped++
} else {
dueSoonSkipped++
}
continue
}
t := activeTasks[c.taskIndex]
// Determine notification type
var notificationType models.NotificationType
if c.isOverdue {
notificationType = models.NotificationTaskOverdue
} else {
notificationType = models.NotificationTaskDueSoon
}
// Log the reminder BEFORE sending so we have a record even if the send crashes
if _, err := h.reminderRepo.LogReminder(t.ID, c.userID, c.effectiveDate, c.reminderStage, nil); err != nil {
log.Error().Err(err).Uint("task_id", t.ID).Str("stage", c.stage).Msg("Failed to log reminder")
}
// Send notification
if err := h.notificationService.CreateAndSendTaskNotification(ctx, c.userID, notificationType, &t); err != nil {
log.Error().Err(err).
Uint("user_id", c.userID).
Uint("task_id", t.ID).
Str("stage", c.stage).
Msg("Failed to send smart reminder")
continue
}
if c.isOverdue {
overdueSent++
} else {
dueSoonSent++
}
}
log.Info().
Int("due_soon_sent", dueSoonSent).
Int("due_soon_skipped", dueSoonSkipped).
Int("overdue_sent", overdueSent).
Int("overdue_skipped", overdueSkipped).
Msg("Smart reminder notifications completed")
return nil
}
// HandleReminderLogCleanup cleans up old reminder logs to prevent table bloat
func (h *Handler) HandleReminderLogCleanup(ctx context.Context, task *asynq.Task) error {
log.Info().Msg("Processing reminder log cleanup...")
// Clean up logs older than 90 days
deleted, err := h.reminderRepo.CleanupOldLogs(90)
if err != nil {
log.Error().Err(err).Msg("Failed to cleanup old reminder logs")
return err
}
log.Info().Int64("deleted", deleted).Msg("Reminder log cleanup completed")
return nil
}
// HandleUploadCleanup reaps expired pending_uploads rows and their B2 objects.
//
// Runs hourly. Each tick processes up to 500 expired sessions; if the queue
// is deeper than that, the next hourly run picks up the rest. The B2 bucket
// also has a 7-day lifecycle rule on the uploads/ prefix as a backstop in
// case this worker is offline for long stretches.
func (h *Handler) HandleUploadCleanup(ctx context.Context, task *asynq.Task) error {
if h.uploadService == nil {
log.Debug().Msg("Upload cleanup skipped: upload service not configured (local-disk storage)")
return nil
}
log.Info().Msg("Processing pending_uploads cleanup...")
reaped, err := h.uploadService.CleanupExpired(ctx, 500)
if err != nil {
log.Error().Err(err).Msg("Pending uploads cleanup failed")
return err
}
log.Info().Int("reaped", reaped).Msg("Pending uploads cleanup completed")
return nil
}
// HandleTaskCompletedNotification fans out push + email notifications for a
// completed task. Enqueued by the api request handler (POST
// /api/task-completions/) so the synchronous chain of APNs + SMTP + B2 image
// fetches happens here instead of in the user-facing request path.
//
// The payload only carries IDs; canonical state is re-read from Postgres so
// the worker reflects any concurrent edits to the Task or Completion that
// happened between enqueue and dequeue.
//
// Asynq retries on returned error; we return nil for "row not found" cases
// (task or completion got deleted before the job ran) so retries don't
// loop forever on a permanent miss.
func (h *Handler) HandleTaskCompletedNotification(ctx context.Context, t *asynq.Task) error {
var p worker.TaskCompletedNotificationPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("unmarshal task_completed_notification payload: %w", err)
}
if h.taskService == nil {
log.Warn().
Uint("task_id", p.TaskID).
Uint("completion_id", p.CompletionID).
Msg("task_completed_notification handler invoked without TaskService wired — dropping job")
return nil
}
log.Info().
Uint("task_id", p.TaskID).
Uint("completion_id", p.CompletionID).
Msg("Processing task completion notification")
if err := h.taskService.SendTaskCompletedNotificationByID(ctx, p.TaskID, p.CompletionID); err != nil {
log.Error().
Err(err).
Uint("task_id", p.TaskID).
Uint("completion_id", p.CompletionID).
Msg("Failed to deliver task completion notification")
return err
}
return nil
}