diff --git a/cmd/api/main.go b/cmd/api/main.go index c65db81..caa63e0 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -9,6 +9,7 @@ import ( "syscall" "time" + "github.com/hibiken/asynq" "github.com/rs/zerolog/log" "gorm.io/gorm" @@ -20,6 +21,7 @@ import ( "github.com/treytartt/honeydue-api/internal/router" "github.com/treytartt/honeydue-api/internal/services" "github.com/treytartt/honeydue-api/internal/tracing" + "github.com/treytartt/honeydue-api/internal/worker" "github.com/treytartt/honeydue-api/pkg/utils" ) @@ -194,6 +196,28 @@ func main() { Msg("Push notification client initialized") } + // Initialize Asynq enqueuer (api-side). Used by services that move + // long-running work off the request path (currently: task-completion + // notification fan-out). Same Redis as cmd/worker — file-mounted password + // applied separately because cfg.Redis.URL does not embed it (audit HIGH-1). + var taskEnqueuer *worker.TaskClient + if redisOpt, parseErr := asynq.ParseRedisURI(cfg.Redis.URL); parseErr != nil { + log.Warn().Err(parseErr).Msg("Failed to parse Redis URL for Asynq enqueuer — completion notifications will run inline") + } else if clientOpt, ok := redisOpt.(asynq.RedisClientOpt); ok { + if cfg.Redis.Password != "" { + clientOpt.Password = cfg.Redis.Password + } + taskEnqueuer = worker.NewTaskClient(clientOpt) + defer func() { + if cerr := taskEnqueuer.Close(); cerr != nil { + log.Warn().Err(cerr).Msg("Failed to close Asynq enqueuer on shutdown") + } + }() + log.Info().Msg("Asynq enqueuer initialized") + } else { + log.Warn().Msg("Redis opt is not RedisClientOpt — Asynq enqueuer skipped; completion notifications will run inline") + } + // Setup router with dependencies (includes admin panel at /admin) deps := &router.Dependencies{ DB: db, @@ -205,6 +229,12 @@ func main() { StorageService: storageService, MonitoringService: monitoringService, } + // Only assign the enqueuer when we actually constructed one. Assigning a + // nil *worker.TaskClient directly would create a typed-nil interface that + // fails the `if deps.TaskEnqueuer != nil` check in router.SetupRouter. + if taskEnqueuer != nil { + deps.TaskEnqueuer = taskEnqueuer + } e := router.SetupRouter(deps) // Create HTTP server diff --git a/cmd/worker/main.go b/cmd/worker/main.go index c3c0af6..1db3156 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -23,6 +23,7 @@ import ( "github.com/treytartt/honeydue-api/internal/repositories" "github.com/treytartt/honeydue-api/internal/services" "github.com/treytartt/honeydue-api/internal/tracing" + "github.com/treytartt/honeydue-api/internal/worker" "github.com/treytartt/honeydue-api/internal/worker/jobs" "github.com/treytartt/honeydue-api/pkg/utils" ) @@ -180,11 +181,15 @@ func main() { // Create job handler jobHandler := jobs.NewHandler(db, pushClient, emailService, notificationService, cfg) - // Wire upload service for the pending_uploads cleanup cron. Storage may - // be local-disk (no S3 backend), in which case the upload service stays - // nil and the cleanup handler no-ops. Cache is optional — the cleanup - // path doesn't rate-limit and works fine with a nil cache. + // Wire upload service for the pending_uploads cleanup cron AND share the + // underlying storage service with the TaskService below so the worker can + // load completion images for email embedding. Storage may be local-disk + // (no S3 backend), in which case the upload service stays nil and the + // cleanup handler no-ops. Cache is optional — the cleanup path doesn't + // rate-limit and works fine with a nil cache. + var sharedStorageService *services.StorageService if storageService, sErr := services.NewStorageService(&cfg.Storage); sErr == nil { + sharedStorageService = storageService if s3 := storageService.S3Backend(); s3 != nil { pendingUploadRepo := repositories.NewPendingUploadRepository(db) uploadService := services.NewUploadService(pendingUploadRepo, s3, &cfg.Storage, nil) @@ -194,6 +199,25 @@ func main() { log.Warn().Err(sErr).Msg("Failed to initialize storage service for upload cleanup; cleanup cron will no-op") } + // Wire a TaskService for the task-completed notification handler. The + // worker re-creates this (vs. importing the api's wired instance) because + // each binary owns its own dependency graph. The handler is fully nil-safe + // — if any of the wired services are absent, the corresponding side of + // notification delivery (push or email) is skipped. + taskRepo := repositories.NewTaskRepository(db) + residenceRepo := repositories.NewResidenceRepository(db) + workerTaskService := services.NewTaskService(taskRepo, residenceRepo) + if notificationService != nil { + workerTaskService.SetNotificationService(notificationService) + } + if emailService != nil { + workerTaskService.SetEmailService(emailService) + } + if sharedStorageService != nil { + workerTaskService.SetStorageService(sharedStorageService) + } + jobHandler.SetTaskService(workerTaskService) + // Create Asynq mux and register handlers mux := asynq.NewServeMux() @@ -208,6 +232,7 @@ func main() { mux.HandleFunc(jobs.TypeOnboardingEmails, jobHandler.HandleOnboardingEmails) mux.HandleFunc(jobs.TypeReminderLogCleanup, jobHandler.HandleReminderLogCleanup) mux.HandleFunc(jobs.TypeUploadCleanup, jobHandler.HandleUploadCleanup) + mux.HandleFunc(worker.TypeTaskCompletedNotification, jobHandler.HandleTaskCompletedNotification) // Register email job handlers (welcome, verification, password reset, password changed) if emailService != nil { diff --git a/internal/router/router.go b/internal/router/router.go index 3d57002..fb1033c 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -30,6 +30,7 @@ import ( "github.com/treytartt/honeydue-api/internal/repositories" "github.com/treytartt/honeydue-api/internal/services" customvalidator "github.com/treytartt/honeydue-api/internal/validator" + "github.com/treytartt/honeydue-api/internal/worker" "github.com/treytartt/honeydue-api/pkg/utils" ) @@ -45,6 +46,11 @@ type Dependencies struct { PushClient *push.Client // Direct APNs/FCM client StorageService *services.StorageService MonitoringService *monitoring.Service + // TaskEnqueuer is the Asynq client used to push background work onto the + // shared Redis queue. Optional — when nil, services that would enqueue + // (currently: task-completion notification fan-out) fall back to their + // inline implementation. Tests can omit it; production must wire it. + TaskEnqueuer worker.Enqueuer } // SetupRouter creates and configures the Echo router @@ -215,6 +221,13 @@ func SetupRouter(deps *Dependencies) *echo.Echo { taskService.SetEmailService(deps.EmailService) taskService.SetResidenceService(residenceService) // For including TotalSummary in CRUD responses taskService.SetStorageService(deps.StorageService) // For reading completion images for email + if deps.TaskEnqueuer != nil { + // Offload completion notifications (push + email + B2 image fetches) + // to the Asynq worker so POST /api/task-completions/ doesn't pay for + // them in the response path. When the enqueuer is absent (tests), + // task_service falls back to the inline implementation. + taskService.SetTaskCompletedNotificationEnqueuer(deps.TaskEnqueuer) + } subscriptionService := services.NewSubscriptionService(subscriptionRepo, residenceRepo, taskRepo, contractorRepo, documentRepo) residenceService.SetSubscriptionService(subscriptionService) // Wire up subscription service for tier limit enforcement diff --git a/internal/services/task_service.go b/internal/services/task_service.go index 243fcc0..a61d952 100644 --- a/internal/services/task_service.go +++ b/internal/services/task_service.go @@ -29,6 +29,16 @@ var ( ErrCompletionNotFound = errors.New("task completion not found") ) +// TaskCompletedNotificationEnqueuer is the narrow contract TaskService needs +// from the Asynq client to offload completion-notification fan-out (push + +// email + B2 image fetches) into the background worker. Implemented by +// internal/worker.TaskClient; nil when the api is started without a worker +// queue (tests / local dev), in which case CreateCompletion falls back to +// the inline synchronous path. +type TaskCompletedNotificationEnqueuer interface { + EnqueueTaskCompletedNotification(taskID, completionID uint) error +} + // TaskService handles task business logic type TaskService struct { taskRepo *repositories.TaskRepository @@ -39,6 +49,7 @@ type TaskService struct { storageService *StorageService uploadService *UploadService // optional — only set when S3 storage is configured cache *CacheService + notifEnqueuer TaskCompletedNotificationEnqueuer } // SetUploadService wires the presigned-URL upload service so CreateCompletion @@ -82,6 +93,14 @@ func (s *TaskService) SetStorageService(ss *StorageService) { s.storageService = ss } +// SetTaskCompletedNotificationEnqueuer wires the Asynq enqueuer. When set, +// CreateCompletion/QuickComplete schedule the notification fan-out as a +// background job instead of running it inline (saving ~1-1.5s on the +// user-facing response). When nil, the inline path runs. +func (s *TaskService) SetTaskCompletedNotificationEnqueuer(e TaskCompletedNotificationEnqueuer) { + s.notifEnqueuer = e +} + // getSummaryForUser returns an empty summary placeholder. // DEPRECATED: Summary calculation has been removed from CRUD responses for performance. // Clients should calculate summary from kanban data instead (which already includes all tasks). @@ -769,8 +788,11 @@ func (s *TaskService) CreateCompletion(ctx context.Context, req *requests.Create }, nil } - // Send notification to residence owner and other users - s.sendTaskCompletedNotification(ctx, task, completion) + // Dispatch notification fan-out to the Asynq worker so the api response + // returns without waiting on APNs + SMTP + B2 image fetches (which cost + // ~1-1.5s end-to-end). Falls back to the inline path when no enqueuer is + // wired (tests / local dev) or when Redis is unreachable. + s.dispatchTaskCompletedNotification(ctx, task, completion) // Return completion with updated task (includes kanban_column for UI update) resp := responses.NewTaskCompletionWithTaskResponseWithTime(completion, task, 30, now) @@ -876,19 +898,71 @@ func (s *TaskService) QuickComplete(ctx context.Context, taskID uint, userID uin } log.Info().Uint("task_id", task.ID).Msg("QuickComplete: Task updated successfully") - // Send notification (fire and forget with panic recovery) - go func() { - defer func() { - if r := recover(); r != nil { - log.Error().Interface("panic", r).Uint("task_id", task.ID).Msg("Panic in quick-complete notification goroutine") - } - }() - s.sendTaskCompletedNotification(ctx, task, completion) - }() + // Dispatch notification fan-out to the Asynq worker. Replaces the previous + // fire-and-forget goroutine — which violated the project rule against + // spawning goroutines in request handlers and was unbounded under load. + // Falls back to inline send when no enqueuer is wired. + s.dispatchTaskCompletedNotification(ctx, task, completion) return nil } +// dispatchTaskCompletedNotification routes the notification fan-out to either +// the Asynq worker (preferred — keeps the api request path fast) or runs it +// inline as a fallback. The fallback covers: +// - tests / local dev where no enqueuer is wired (notifEnqueuer == nil) +// - Redis outages where Enqueue returns an error +// +// Inline fallback is intentionally synchronous (no goroutines) per the +// project's rule against unbounded goroutine spawning in handlers. The +// caller is expected to be in a request goroutine and accepting the cost. +func (s *TaskService) dispatchTaskCompletedNotification(ctx context.Context, task *models.Task, completion *models.TaskCompletion) { + if s.notifEnqueuer != nil { + if err := s.notifEnqueuer.EnqueueTaskCompletedNotification(task.ID, completion.ID); err == nil { + return + } else { + log.Warn(). + Err(err). + Uint("task_id", task.ID). + Uint("completion_id", completion.ID). + Msg("Failed to enqueue completion notification; falling back to inline send") + } + } + s.sendTaskCompletedNotification(ctx, task, completion) +} + +// SendTaskCompletedNotificationByID is the public entry point used by the +// Asynq worker. It re-reads the canonical Task + TaskCompletion rows from +// Postgres (cheap with Neon ~10ms away) so the worker reflects any concurrent +// edits between enqueue and dequeue, then delegates to the shared +// sendTaskCompletedNotification implementation. +// +// Returns nil for "row not found" cases (task or completion was deleted before +// the job ran) so Asynq's retry loop doesn't churn on a permanent miss. All +// other errors propagate to Asynq for retry per the queue's MaxRetry setting. +func (s *TaskService) SendTaskCompletedNotificationByID(ctx context.Context, taskID, completionID uint) error { + task, err := s.taskRepo.WithContext(ctx).FindByID(taskID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + log.Warn().Uint("task_id", taskID).Msg("task not found for completion notification (likely deleted between enqueue and dequeue)") + return nil + } + return err + } + + completion, err := s.taskRepo.WithContext(ctx).FindCompletionByID(completionID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + log.Warn().Uint("completion_id", completionID).Msg("completion not found for notification (likely deleted between enqueue and dequeue)") + return nil + } + return err + } + + s.sendTaskCompletedNotification(ctx, task, completion) + return nil +} + // sendTaskCompletedNotification sends notifications when a task is completed func (s *TaskService) sendTaskCompletedNotification(ctx context.Context, task *models.Task, completion *models.TaskCompletion) { // Get all users with access to this residence diff --git a/internal/worker/enqueuer.go b/internal/worker/enqueuer.go index 425277c..5991dd4 100644 --- a/internal/worker/enqueuer.go +++ b/internal/worker/enqueuer.go @@ -2,12 +2,16 @@ package worker import "encoding/json" -// Enqueuer defines the interface for enqueuing background email tasks. +// Enqueuer defines the interface for enqueuing background email + notification +// tasks from the api request path. Implementations are expected to be cheap to +// call and non-blocking (Asynq's client batches over a persistent Redis +// connection). type Enqueuer interface { EnqueueWelcomeEmail(to, firstName, code string) error EnqueueVerificationEmail(to, firstName, code string) error EnqueuePasswordResetEmail(to, firstName, code, resetToken string) error EnqueuePasswordChangedEmail(to, firstName string) error + EnqueueTaskCompletedNotification(taskID, completionID uint) error } // Verify TaskClient satisfies the interface at compile time. @@ -42,3 +46,12 @@ func BuildPasswordResetEmailPayload(to, firstName, code, resetToken string) ([]b func BuildPasswordChangedEmailPayload(to, firstName string) ([]byte, error) { return json.Marshal(EmailPayload{To: to, FirstName: firstName}) } + +// BuildTaskCompletedNotificationPayload marshals a TaskCompletedNotificationPayload +// to JSON bytes for the Asynq queue. +func BuildTaskCompletedNotificationPayload(taskID, completionID uint) ([]byte, error) { + return json.Marshal(TaskCompletedNotificationPayload{ + TaskID: taskID, + CompletionID: completionID, + }) +} diff --git a/internal/worker/jobs/handler.go b/internal/worker/jobs/handler.go index 9426af2..37a94fd 100644 --- a/internal/worker/jobs/handler.go +++ b/internal/worker/jobs/handler.go @@ -16,6 +16,7 @@ import ( "github.com/treytartt/honeydue-api/internal/push" "github.com/treytartt/honeydue-api/internal/repositories" "github.com/treytartt/honeydue-api/internal/services" + "github.com/treytartt/honeydue-api/internal/worker" ) // Task types @@ -41,6 +42,7 @@ type Handler struct { notificationService NotificationSender onboardingService OnboardingEmailSender uploadService *services.UploadService + taskService *services.TaskService config *config.Config } @@ -51,6 +53,14 @@ func (h *Handler) SetUploadService(us *services.UploadService) { h.uploadService = us } +// SetTaskService wires the api-side TaskService so HandleTaskCompletedNotification +// can re-use the same SendTaskCompletedNotificationByID logic the inline path +// used to call. Required for the task-completed notification job; without it +// the handler logs a warning and no-ops (notifications silently dropped). +func (h *Handler) SetTaskService(ts *services.TaskService) { + h.taskService = ts +} + // NewHandler creates a new job handler func NewHandler(db *gorm.DB, pushClient *push.Client, emailService *services.EmailService, notificationService *services.NotificationService, cfg *config.Config) *Handler { h := &Handler{ @@ -677,3 +687,46 @@ func (h *Handler) HandleUploadCleanup(ctx context.Context, task *asynq.Task) err log.Info().Int("reaped", reaped).Msg("Pending uploads cleanup completed") return nil } + +// HandleTaskCompletedNotification fans out push + email notifications for a +// completed task. Enqueued by the api request handler (POST +// /api/task-completions/) so the synchronous chain of APNs + SMTP + B2 image +// fetches happens here instead of in the user-facing request path. +// +// The payload only carries IDs; canonical state is re-read from Postgres so +// the worker reflects any concurrent edits to the Task or Completion that +// happened between enqueue and dequeue. +// +// Asynq retries on returned error; we return nil for "row not found" cases +// (task or completion got deleted before the job ran) so retries don't +// loop forever on a permanent miss. +func (h *Handler) HandleTaskCompletedNotification(ctx context.Context, t *asynq.Task) error { + var p worker.TaskCompletedNotificationPayload + if err := json.Unmarshal(t.Payload(), &p); err != nil { + return fmt.Errorf("unmarshal task_completed_notification payload: %w", err) + } + + if h.taskService == nil { + log.Warn(). + Uint("task_id", p.TaskID). + Uint("completion_id", p.CompletionID). + Msg("task_completed_notification handler invoked without TaskService wired — dropping job") + return nil + } + + log.Info(). + Uint("task_id", p.TaskID). + Uint("completion_id", p.CompletionID). + Msg("Processing task completion notification") + + if err := h.taskService.SendTaskCompletedNotificationByID(ctx, p.TaskID, p.CompletionID); err != nil { + log.Error(). + Err(err). + Uint("task_id", p.TaskID). + Uint("completion_id", p.CompletionID). + Msg("Failed to deliver task completion notification") + return err + } + + return nil +} diff --git a/internal/worker/scheduler.go b/internal/worker/scheduler.go index b478e68..a563b3a 100644 --- a/internal/worker/scheduler.go +++ b/internal/worker/scheduler.go @@ -13,6 +13,25 @@ const ( TypePasswordChangedEmail = "email:password_changed" ) +// Task types for in-app notifications enqueued by the api request path. +// Handlers live in internal/worker/jobs. +const ( + // TypeTaskCompletedNotification is emitted after a task completion is + // persisted; the worker fans out push + email to all residence users. + // Moves the ~1-1.5s of synchronous APNs+SMTP+B2-fetch work out of the + // POST /api/task-completions/ request path. + TypeTaskCompletedNotification = "notification:task_completed" +) + +// TaskCompletedNotificationPayload carries only the IDs needed for the +// worker to re-fetch the canonical Task + TaskCompletion rows. Keeping the +// payload to IDs (vs. full model graphs) keeps the Redis queue cheap and +// avoids serialising preloaded relations through JSON. +type TaskCompletedNotificationPayload struct { + TaskID uint `json:"task_id"` + CompletionID uint `json:"completion_id"` +} + // EmailPayload is the base payload for email tasks type EmailPayload struct { To string `json:"to"` @@ -43,10 +62,12 @@ type TaskClient struct { client *asynq.Client } -// NewTaskClient creates a new task client -func NewTaskClient(redisAddr string) *TaskClient { - client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr}) - return &TaskClient{client: client} +// NewTaskClient creates a new task client. Accepts a full RedisClientOpt so +// callers can supply Addr + Password (Redis is requirepass-protected; the +// password lives in a file-mounted secret, not the URL — see audit HIGH-1 +// in cmd/worker/main.go). +func NewTaskClient(opt asynq.RedisClientOpt) *TaskClient { + return &TaskClient{client: asynq.NewClient(opt)} } // Close closes the task client @@ -125,3 +146,34 @@ func (c *TaskClient) EnqueuePasswordChangedEmail(to, firstName string) error { log.Debug().Str("to", to).Msg("Password changed email task enqueued") return nil } + +// EnqueueTaskCompletedNotification queues fan-out push + email delivery for a +// completed task. The api request handler calls this so the response can +// return ~immediately instead of waiting on APNs + SMTP + B2 image fetches. +// +// Queue: "default". MaxRetry: 3 (Asynq retries on handler error; notifications +// are idempotent enough at the user-perception level that a few duplicate +// pushes are preferable to silent drops). +func (c *TaskClient) EnqueueTaskCompletedNotification(taskID, completionID uint) error { + payload, err := BuildTaskCompletedNotificationPayload(taskID, completionID) + if err != nil { + return err + } + + task := asynq.NewTask(TypeTaskCompletedNotification, payload) + _, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3)) + if err != nil { + log.Error(). + Err(err). + Uint("task_id", taskID). + Uint("completion_id", completionID). + Msg("Failed to enqueue task completion notification") + return err + } + + log.Debug(). + Uint("task_id", taskID). + Uint("completion_id", completionID). + Msg("Task completion notification enqueued") + return nil +}