perf(task): offload completion notification fan-out to Asynq worker
POST /api/task-completions/ was spending ~1.5-1.75s synchronously on
APNs push + SMTP email + B2 image fetches inside sendTaskCompletedNotification.
Per-user loop made it scale linearly with residence membership; one image
attached + one residence user is the 1.75s baseline observed in the live
honeydue-eli5-overview Grafana panel.
Replace the inline call (and the fire-and-forget goroutine in QuickComplete,
which violated the project's "no goroutines in handlers" rule) with an
Asynq job:
- new task type notification:task_completed (worker/scheduler.go)
- new payload {task_id, completion_id} — IDs only, worker re-reads
canonical state from Postgres so concurrent edits between enqueue
and dequeue are reflected
- new HandleTaskCompletedNotification on jobs.Handler delegates to
TaskService.SendTaskCompletedNotificationByID
- new dispatchTaskCompletedNotification in task_service.go picks
between enqueue (preferred) and inline (fallback) when Redis is
unreachable or the enqueuer isn't wired (tests / local dev)
Other changes required to wire it up:
- widen worker.NewTaskClient signature to accept asynq.RedisClientOpt
so the file-mounted Redis password (audit HIGH-1) can be supplied;
no prior callers, no breakage
- extend worker.Enqueuer interface with EnqueueTaskCompletedNotification
- add TaskEnqueuer field to router.Dependencies; wire from cmd/api/main.go
with the standard typed-nil interface guard
- wire a worker-side TaskService in cmd/worker/main.go so the handler
can use the shared SendTaskCompletedNotificationByID implementation
(storage service shared with the existing upload-cleanup wiring)
Expected impact on POST /api/task-completions/ p50:
~1.75s -> ~120-170ms (DB + tx + Asynq enqueue only)
Notifications still deliver; they just go via the worker instead of in
the request path. MaxRetry=3; "row not found" returns nil so a deleted
task/completion doesn't churn the retry loop.
All 31 test packages pass. No DB migrations.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hibiken/asynq"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
|
|
||||||
@@ -20,6 +21,7 @@ import (
|
|||||||
"github.com/treytartt/honeydue-api/internal/router"
|
"github.com/treytartt/honeydue-api/internal/router"
|
||||||
"github.com/treytartt/honeydue-api/internal/services"
|
"github.com/treytartt/honeydue-api/internal/services"
|
||||||
"github.com/treytartt/honeydue-api/internal/tracing"
|
"github.com/treytartt/honeydue-api/internal/tracing"
|
||||||
|
"github.com/treytartt/honeydue-api/internal/worker"
|
||||||
"github.com/treytartt/honeydue-api/pkg/utils"
|
"github.com/treytartt/honeydue-api/pkg/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -194,6 +196,28 @@ func main() {
|
|||||||
Msg("Push notification client initialized")
|
Msg("Push notification client initialized")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initialize Asynq enqueuer (api-side). Used by services that move
|
||||||
|
// long-running work off the request path (currently: task-completion
|
||||||
|
// notification fan-out). Same Redis as cmd/worker — file-mounted password
|
||||||
|
// applied separately because cfg.Redis.URL does not embed it (audit HIGH-1).
|
||||||
|
var taskEnqueuer *worker.TaskClient
|
||||||
|
if redisOpt, parseErr := asynq.ParseRedisURI(cfg.Redis.URL); parseErr != nil {
|
||||||
|
log.Warn().Err(parseErr).Msg("Failed to parse Redis URL for Asynq enqueuer — completion notifications will run inline")
|
||||||
|
} else if clientOpt, ok := redisOpt.(asynq.RedisClientOpt); ok {
|
||||||
|
if cfg.Redis.Password != "" {
|
||||||
|
clientOpt.Password = cfg.Redis.Password
|
||||||
|
}
|
||||||
|
taskEnqueuer = worker.NewTaskClient(clientOpt)
|
||||||
|
defer func() {
|
||||||
|
if cerr := taskEnqueuer.Close(); cerr != nil {
|
||||||
|
log.Warn().Err(cerr).Msg("Failed to close Asynq enqueuer on shutdown")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
log.Info().Msg("Asynq enqueuer initialized")
|
||||||
|
} else {
|
||||||
|
log.Warn().Msg("Redis opt is not RedisClientOpt — Asynq enqueuer skipped; completion notifications will run inline")
|
||||||
|
}
|
||||||
|
|
||||||
// Setup router with dependencies (includes admin panel at /admin)
|
// Setup router with dependencies (includes admin panel at /admin)
|
||||||
deps := &router.Dependencies{
|
deps := &router.Dependencies{
|
||||||
DB: db,
|
DB: db,
|
||||||
@@ -205,6 +229,12 @@ func main() {
|
|||||||
StorageService: storageService,
|
StorageService: storageService,
|
||||||
MonitoringService: monitoringService,
|
MonitoringService: monitoringService,
|
||||||
}
|
}
|
||||||
|
// Only assign the enqueuer when we actually constructed one. Assigning a
|
||||||
|
// nil *worker.TaskClient directly would create a typed-nil interface that
|
||||||
|
// fails the `if deps.TaskEnqueuer != nil` check in router.SetupRouter.
|
||||||
|
if taskEnqueuer != nil {
|
||||||
|
deps.TaskEnqueuer = taskEnqueuer
|
||||||
|
}
|
||||||
e := router.SetupRouter(deps)
|
e := router.SetupRouter(deps)
|
||||||
|
|
||||||
// Create HTTP server
|
// Create HTTP server
|
||||||
|
|||||||
+29
-4
@@ -23,6 +23,7 @@ import (
|
|||||||
"github.com/treytartt/honeydue-api/internal/repositories"
|
"github.com/treytartt/honeydue-api/internal/repositories"
|
||||||
"github.com/treytartt/honeydue-api/internal/services"
|
"github.com/treytartt/honeydue-api/internal/services"
|
||||||
"github.com/treytartt/honeydue-api/internal/tracing"
|
"github.com/treytartt/honeydue-api/internal/tracing"
|
||||||
|
"github.com/treytartt/honeydue-api/internal/worker"
|
||||||
"github.com/treytartt/honeydue-api/internal/worker/jobs"
|
"github.com/treytartt/honeydue-api/internal/worker/jobs"
|
||||||
"github.com/treytartt/honeydue-api/pkg/utils"
|
"github.com/treytartt/honeydue-api/pkg/utils"
|
||||||
)
|
)
|
||||||
@@ -180,11 +181,15 @@ func main() {
|
|||||||
// Create job handler
|
// Create job handler
|
||||||
jobHandler := jobs.NewHandler(db, pushClient, emailService, notificationService, cfg)
|
jobHandler := jobs.NewHandler(db, pushClient, emailService, notificationService, cfg)
|
||||||
|
|
||||||
// Wire upload service for the pending_uploads cleanup cron. Storage may
|
// Wire upload service for the pending_uploads cleanup cron AND share the
|
||||||
// be local-disk (no S3 backend), in which case the upload service stays
|
// underlying storage service with the TaskService below so the worker can
|
||||||
// nil and the cleanup handler no-ops. Cache is optional — the cleanup
|
// load completion images for email embedding. Storage may be local-disk
|
||||||
// path doesn't rate-limit and works fine with a nil cache.
|
// (no S3 backend), in which case the upload service stays nil and the
|
||||||
|
// cleanup handler no-ops. Cache is optional — the cleanup path doesn't
|
||||||
|
// rate-limit and works fine with a nil cache.
|
||||||
|
var sharedStorageService *services.StorageService
|
||||||
if storageService, sErr := services.NewStorageService(&cfg.Storage); sErr == nil {
|
if storageService, sErr := services.NewStorageService(&cfg.Storage); sErr == nil {
|
||||||
|
sharedStorageService = storageService
|
||||||
if s3 := storageService.S3Backend(); s3 != nil {
|
if s3 := storageService.S3Backend(); s3 != nil {
|
||||||
pendingUploadRepo := repositories.NewPendingUploadRepository(db)
|
pendingUploadRepo := repositories.NewPendingUploadRepository(db)
|
||||||
uploadService := services.NewUploadService(pendingUploadRepo, s3, &cfg.Storage, nil)
|
uploadService := services.NewUploadService(pendingUploadRepo, s3, &cfg.Storage, nil)
|
||||||
@@ -194,6 +199,25 @@ func main() {
|
|||||||
log.Warn().Err(sErr).Msg("Failed to initialize storage service for upload cleanup; cleanup cron will no-op")
|
log.Warn().Err(sErr).Msg("Failed to initialize storage service for upload cleanup; cleanup cron will no-op")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wire a TaskService for the task-completed notification handler. The
|
||||||
|
// worker re-creates this (vs. importing the api's wired instance) because
|
||||||
|
// each binary owns its own dependency graph. The handler is fully nil-safe
|
||||||
|
// — if any of the wired services are absent, the corresponding side of
|
||||||
|
// notification delivery (push or email) is skipped.
|
||||||
|
taskRepo := repositories.NewTaskRepository(db)
|
||||||
|
residenceRepo := repositories.NewResidenceRepository(db)
|
||||||
|
workerTaskService := services.NewTaskService(taskRepo, residenceRepo)
|
||||||
|
if notificationService != nil {
|
||||||
|
workerTaskService.SetNotificationService(notificationService)
|
||||||
|
}
|
||||||
|
if emailService != nil {
|
||||||
|
workerTaskService.SetEmailService(emailService)
|
||||||
|
}
|
||||||
|
if sharedStorageService != nil {
|
||||||
|
workerTaskService.SetStorageService(sharedStorageService)
|
||||||
|
}
|
||||||
|
jobHandler.SetTaskService(workerTaskService)
|
||||||
|
|
||||||
// Create Asynq mux and register handlers
|
// Create Asynq mux and register handlers
|
||||||
mux := asynq.NewServeMux()
|
mux := asynq.NewServeMux()
|
||||||
|
|
||||||
@@ -208,6 +232,7 @@ func main() {
|
|||||||
mux.HandleFunc(jobs.TypeOnboardingEmails, jobHandler.HandleOnboardingEmails)
|
mux.HandleFunc(jobs.TypeOnboardingEmails, jobHandler.HandleOnboardingEmails)
|
||||||
mux.HandleFunc(jobs.TypeReminderLogCleanup, jobHandler.HandleReminderLogCleanup)
|
mux.HandleFunc(jobs.TypeReminderLogCleanup, jobHandler.HandleReminderLogCleanup)
|
||||||
mux.HandleFunc(jobs.TypeUploadCleanup, jobHandler.HandleUploadCleanup)
|
mux.HandleFunc(jobs.TypeUploadCleanup, jobHandler.HandleUploadCleanup)
|
||||||
|
mux.HandleFunc(worker.TypeTaskCompletedNotification, jobHandler.HandleTaskCompletedNotification)
|
||||||
|
|
||||||
// Register email job handlers (welcome, verification, password reset, password changed)
|
// Register email job handlers (welcome, verification, password reset, password changed)
|
||||||
if emailService != nil {
|
if emailService != nil {
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ import (
|
|||||||
"github.com/treytartt/honeydue-api/internal/repositories"
|
"github.com/treytartt/honeydue-api/internal/repositories"
|
||||||
"github.com/treytartt/honeydue-api/internal/services"
|
"github.com/treytartt/honeydue-api/internal/services"
|
||||||
customvalidator "github.com/treytartt/honeydue-api/internal/validator"
|
customvalidator "github.com/treytartt/honeydue-api/internal/validator"
|
||||||
|
"github.com/treytartt/honeydue-api/internal/worker"
|
||||||
"github.com/treytartt/honeydue-api/pkg/utils"
|
"github.com/treytartt/honeydue-api/pkg/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -45,6 +46,11 @@ type Dependencies struct {
|
|||||||
PushClient *push.Client // Direct APNs/FCM client
|
PushClient *push.Client // Direct APNs/FCM client
|
||||||
StorageService *services.StorageService
|
StorageService *services.StorageService
|
||||||
MonitoringService *monitoring.Service
|
MonitoringService *monitoring.Service
|
||||||
|
// TaskEnqueuer is the Asynq client used to push background work onto the
|
||||||
|
// shared Redis queue. Optional — when nil, services that would enqueue
|
||||||
|
// (currently: task-completion notification fan-out) fall back to their
|
||||||
|
// inline implementation. Tests can omit it; production must wire it.
|
||||||
|
TaskEnqueuer worker.Enqueuer
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetupRouter creates and configures the Echo router
|
// SetupRouter creates and configures the Echo router
|
||||||
@@ -215,6 +221,13 @@ func SetupRouter(deps *Dependencies) *echo.Echo {
|
|||||||
taskService.SetEmailService(deps.EmailService)
|
taskService.SetEmailService(deps.EmailService)
|
||||||
taskService.SetResidenceService(residenceService) // For including TotalSummary in CRUD responses
|
taskService.SetResidenceService(residenceService) // For including TotalSummary in CRUD responses
|
||||||
taskService.SetStorageService(deps.StorageService) // For reading completion images for email
|
taskService.SetStorageService(deps.StorageService) // For reading completion images for email
|
||||||
|
if deps.TaskEnqueuer != nil {
|
||||||
|
// Offload completion notifications (push + email + B2 image fetches)
|
||||||
|
// to the Asynq worker so POST /api/task-completions/ doesn't pay for
|
||||||
|
// them in the response path. When the enqueuer is absent (tests),
|
||||||
|
// task_service falls back to the inline implementation.
|
||||||
|
taskService.SetTaskCompletedNotificationEnqueuer(deps.TaskEnqueuer)
|
||||||
|
}
|
||||||
subscriptionService := services.NewSubscriptionService(subscriptionRepo, residenceRepo, taskRepo, contractorRepo, documentRepo)
|
subscriptionService := services.NewSubscriptionService(subscriptionRepo, residenceRepo, taskRepo, contractorRepo, documentRepo)
|
||||||
residenceService.SetSubscriptionService(subscriptionService) // Wire up subscription service for tier limit enforcement
|
residenceService.SetSubscriptionService(subscriptionService) // Wire up subscription service for tier limit enforcement
|
||||||
|
|
||||||
|
|||||||
@@ -29,6 +29,16 @@ var (
|
|||||||
ErrCompletionNotFound = errors.New("task completion not found")
|
ErrCompletionNotFound = errors.New("task completion not found")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// TaskCompletedNotificationEnqueuer is the narrow contract TaskService needs
|
||||||
|
// from the Asynq client to offload completion-notification fan-out (push +
|
||||||
|
// email + B2 image fetches) into the background worker. Implemented by
|
||||||
|
// internal/worker.TaskClient; nil when the api is started without a worker
|
||||||
|
// queue (tests / local dev), in which case CreateCompletion falls back to
|
||||||
|
// the inline synchronous path.
|
||||||
|
type TaskCompletedNotificationEnqueuer interface {
|
||||||
|
EnqueueTaskCompletedNotification(taskID, completionID uint) error
|
||||||
|
}
|
||||||
|
|
||||||
// TaskService handles task business logic
|
// TaskService handles task business logic
|
||||||
type TaskService struct {
|
type TaskService struct {
|
||||||
taskRepo *repositories.TaskRepository
|
taskRepo *repositories.TaskRepository
|
||||||
@@ -39,6 +49,7 @@ type TaskService struct {
|
|||||||
storageService *StorageService
|
storageService *StorageService
|
||||||
uploadService *UploadService // optional — only set when S3 storage is configured
|
uploadService *UploadService // optional — only set when S3 storage is configured
|
||||||
cache *CacheService
|
cache *CacheService
|
||||||
|
notifEnqueuer TaskCompletedNotificationEnqueuer
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetUploadService wires the presigned-URL upload service so CreateCompletion
|
// SetUploadService wires the presigned-URL upload service so CreateCompletion
|
||||||
@@ -82,6 +93,14 @@ func (s *TaskService) SetStorageService(ss *StorageService) {
|
|||||||
s.storageService = ss
|
s.storageService = ss
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetTaskCompletedNotificationEnqueuer wires the Asynq enqueuer. When set,
|
||||||
|
// CreateCompletion/QuickComplete schedule the notification fan-out as a
|
||||||
|
// background job instead of running it inline (saving ~1-1.5s on the
|
||||||
|
// user-facing response). When nil, the inline path runs.
|
||||||
|
func (s *TaskService) SetTaskCompletedNotificationEnqueuer(e TaskCompletedNotificationEnqueuer) {
|
||||||
|
s.notifEnqueuer = e
|
||||||
|
}
|
||||||
|
|
||||||
// getSummaryForUser returns an empty summary placeholder.
|
// getSummaryForUser returns an empty summary placeholder.
|
||||||
// DEPRECATED: Summary calculation has been removed from CRUD responses for performance.
|
// DEPRECATED: Summary calculation has been removed from CRUD responses for performance.
|
||||||
// Clients should calculate summary from kanban data instead (which already includes all tasks).
|
// Clients should calculate summary from kanban data instead (which already includes all tasks).
|
||||||
@@ -769,8 +788,11 @@ func (s *TaskService) CreateCompletion(ctx context.Context, req *requests.Create
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send notification to residence owner and other users
|
// Dispatch notification fan-out to the Asynq worker so the api response
|
||||||
s.sendTaskCompletedNotification(ctx, task, completion)
|
// returns without waiting on APNs + SMTP + B2 image fetches (which cost
|
||||||
|
// ~1-1.5s end-to-end). Falls back to the inline path when no enqueuer is
|
||||||
|
// wired (tests / local dev) or when Redis is unreachable.
|
||||||
|
s.dispatchTaskCompletedNotification(ctx, task, completion)
|
||||||
|
|
||||||
// Return completion with updated task (includes kanban_column for UI update)
|
// Return completion with updated task (includes kanban_column for UI update)
|
||||||
resp := responses.NewTaskCompletionWithTaskResponseWithTime(completion, task, 30, now)
|
resp := responses.NewTaskCompletionWithTaskResponseWithTime(completion, task, 30, now)
|
||||||
@@ -876,19 +898,71 @@ func (s *TaskService) QuickComplete(ctx context.Context, taskID uint, userID uin
|
|||||||
}
|
}
|
||||||
log.Info().Uint("task_id", task.ID).Msg("QuickComplete: Task updated successfully")
|
log.Info().Uint("task_id", task.ID).Msg("QuickComplete: Task updated successfully")
|
||||||
|
|
||||||
// Send notification (fire and forget with panic recovery)
|
// Dispatch notification fan-out to the Asynq worker. Replaces the previous
|
||||||
go func() {
|
// fire-and-forget goroutine — which violated the project rule against
|
||||||
defer func() {
|
// spawning goroutines in request handlers and was unbounded under load.
|
||||||
if r := recover(); r != nil {
|
// Falls back to inline send when no enqueuer is wired.
|
||||||
log.Error().Interface("panic", r).Uint("task_id", task.ID).Msg("Panic in quick-complete notification goroutine")
|
s.dispatchTaskCompletedNotification(ctx, task, completion)
|
||||||
}
|
|
||||||
}()
|
|
||||||
s.sendTaskCompletedNotification(ctx, task, completion)
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// dispatchTaskCompletedNotification routes the notification fan-out to either
|
||||||
|
// the Asynq worker (preferred — keeps the api request path fast) or runs it
|
||||||
|
// inline as a fallback. The fallback covers:
|
||||||
|
// - tests / local dev where no enqueuer is wired (notifEnqueuer == nil)
|
||||||
|
// - Redis outages where Enqueue returns an error
|
||||||
|
//
|
||||||
|
// Inline fallback is intentionally synchronous (no goroutines) per the
|
||||||
|
// project's rule against unbounded goroutine spawning in handlers. The
|
||||||
|
// caller is expected to be in a request goroutine and accepting the cost.
|
||||||
|
func (s *TaskService) dispatchTaskCompletedNotification(ctx context.Context, task *models.Task, completion *models.TaskCompletion) {
|
||||||
|
if s.notifEnqueuer != nil {
|
||||||
|
if err := s.notifEnqueuer.EnqueueTaskCompletedNotification(task.ID, completion.ID); err == nil {
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
log.Warn().
|
||||||
|
Err(err).
|
||||||
|
Uint("task_id", task.ID).
|
||||||
|
Uint("completion_id", completion.ID).
|
||||||
|
Msg("Failed to enqueue completion notification; falling back to inline send")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.sendTaskCompletedNotification(ctx, task, completion)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendTaskCompletedNotificationByID is the public entry point used by the
|
||||||
|
// Asynq worker. It re-reads the canonical Task + TaskCompletion rows from
|
||||||
|
// Postgres (cheap with Neon ~10ms away) so the worker reflects any concurrent
|
||||||
|
// edits between enqueue and dequeue, then delegates to the shared
|
||||||
|
// sendTaskCompletedNotification implementation.
|
||||||
|
//
|
||||||
|
// Returns nil for "row not found" cases (task or completion was deleted before
|
||||||
|
// the job ran) so Asynq's retry loop doesn't churn on a permanent miss. All
|
||||||
|
// other errors propagate to Asynq for retry per the queue's MaxRetry setting.
|
||||||
|
func (s *TaskService) SendTaskCompletedNotificationByID(ctx context.Context, taskID, completionID uint) error {
|
||||||
|
task, err := s.taskRepo.WithContext(ctx).FindByID(taskID)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
|
log.Warn().Uint("task_id", taskID).Msg("task not found for completion notification (likely deleted between enqueue and dequeue)")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
completion, err := s.taskRepo.WithContext(ctx).FindCompletionByID(completionID)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
|
log.Warn().Uint("completion_id", completionID).Msg("completion not found for notification (likely deleted between enqueue and dequeue)")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.sendTaskCompletedNotification(ctx, task, completion)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// sendTaskCompletedNotification sends notifications when a task is completed
|
// sendTaskCompletedNotification sends notifications when a task is completed
|
||||||
func (s *TaskService) sendTaskCompletedNotification(ctx context.Context, task *models.Task, completion *models.TaskCompletion) {
|
func (s *TaskService) sendTaskCompletedNotification(ctx context.Context, task *models.Task, completion *models.TaskCompletion) {
|
||||||
// Get all users with access to this residence
|
// Get all users with access to this residence
|
||||||
|
|||||||
@@ -2,12 +2,16 @@ package worker
|
|||||||
|
|
||||||
import "encoding/json"
|
import "encoding/json"
|
||||||
|
|
||||||
// Enqueuer defines the interface for enqueuing background email tasks.
|
// Enqueuer defines the interface for enqueuing background email + notification
|
||||||
|
// tasks from the api request path. Implementations are expected to be cheap to
|
||||||
|
// call and non-blocking (Asynq's client batches over a persistent Redis
|
||||||
|
// connection).
|
||||||
type Enqueuer interface {
|
type Enqueuer interface {
|
||||||
EnqueueWelcomeEmail(to, firstName, code string) error
|
EnqueueWelcomeEmail(to, firstName, code string) error
|
||||||
EnqueueVerificationEmail(to, firstName, code string) error
|
EnqueueVerificationEmail(to, firstName, code string) error
|
||||||
EnqueuePasswordResetEmail(to, firstName, code, resetToken string) error
|
EnqueuePasswordResetEmail(to, firstName, code, resetToken string) error
|
||||||
EnqueuePasswordChangedEmail(to, firstName string) error
|
EnqueuePasswordChangedEmail(to, firstName string) error
|
||||||
|
EnqueueTaskCompletedNotification(taskID, completionID uint) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify TaskClient satisfies the interface at compile time.
|
// Verify TaskClient satisfies the interface at compile time.
|
||||||
@@ -42,3 +46,12 @@ func BuildPasswordResetEmailPayload(to, firstName, code, resetToken string) ([]b
|
|||||||
func BuildPasswordChangedEmailPayload(to, firstName string) ([]byte, error) {
|
func BuildPasswordChangedEmailPayload(to, firstName string) ([]byte, error) {
|
||||||
return json.Marshal(EmailPayload{To: to, FirstName: firstName})
|
return json.Marshal(EmailPayload{To: to, FirstName: firstName})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BuildTaskCompletedNotificationPayload marshals a TaskCompletedNotificationPayload
|
||||||
|
// to JSON bytes for the Asynq queue.
|
||||||
|
func BuildTaskCompletedNotificationPayload(taskID, completionID uint) ([]byte, error) {
|
||||||
|
return json.Marshal(TaskCompletedNotificationPayload{
|
||||||
|
TaskID: taskID,
|
||||||
|
CompletionID: completionID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import (
|
|||||||
"github.com/treytartt/honeydue-api/internal/push"
|
"github.com/treytartt/honeydue-api/internal/push"
|
||||||
"github.com/treytartt/honeydue-api/internal/repositories"
|
"github.com/treytartt/honeydue-api/internal/repositories"
|
||||||
"github.com/treytartt/honeydue-api/internal/services"
|
"github.com/treytartt/honeydue-api/internal/services"
|
||||||
|
"github.com/treytartt/honeydue-api/internal/worker"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Task types
|
// Task types
|
||||||
@@ -41,6 +42,7 @@ type Handler struct {
|
|||||||
notificationService NotificationSender
|
notificationService NotificationSender
|
||||||
onboardingService OnboardingEmailSender
|
onboardingService OnboardingEmailSender
|
||||||
uploadService *services.UploadService
|
uploadService *services.UploadService
|
||||||
|
taskService *services.TaskService
|
||||||
config *config.Config
|
config *config.Config
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -51,6 +53,14 @@ func (h *Handler) SetUploadService(us *services.UploadService) {
|
|||||||
h.uploadService = us
|
h.uploadService = us
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetTaskService wires the api-side TaskService so HandleTaskCompletedNotification
|
||||||
|
// can re-use the same SendTaskCompletedNotificationByID logic the inline path
|
||||||
|
// used to call. Required for the task-completed notification job; without it
|
||||||
|
// the handler logs a warning and no-ops (notifications silently dropped).
|
||||||
|
func (h *Handler) SetTaskService(ts *services.TaskService) {
|
||||||
|
h.taskService = ts
|
||||||
|
}
|
||||||
|
|
||||||
// NewHandler creates a new job handler
|
// NewHandler creates a new job handler
|
||||||
func NewHandler(db *gorm.DB, pushClient *push.Client, emailService *services.EmailService, notificationService *services.NotificationService, cfg *config.Config) *Handler {
|
func NewHandler(db *gorm.DB, pushClient *push.Client, emailService *services.EmailService, notificationService *services.NotificationService, cfg *config.Config) *Handler {
|
||||||
h := &Handler{
|
h := &Handler{
|
||||||
@@ -677,3 +687,46 @@ func (h *Handler) HandleUploadCleanup(ctx context.Context, task *asynq.Task) err
|
|||||||
log.Info().Int("reaped", reaped).Msg("Pending uploads cleanup completed")
|
log.Info().Int("reaped", reaped).Msg("Pending uploads cleanup completed")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HandleTaskCompletedNotification fans out push + email notifications for a
|
||||||
|
// completed task. Enqueued by the api request handler (POST
|
||||||
|
// /api/task-completions/) so the synchronous chain of APNs + SMTP + B2 image
|
||||||
|
// fetches happens here instead of in the user-facing request path.
|
||||||
|
//
|
||||||
|
// The payload only carries IDs; canonical state is re-read from Postgres so
|
||||||
|
// the worker reflects any concurrent edits to the Task or Completion that
|
||||||
|
// happened between enqueue and dequeue.
|
||||||
|
//
|
||||||
|
// Asynq retries on returned error; we return nil for "row not found" cases
|
||||||
|
// (task or completion got deleted before the job ran) so retries don't
|
||||||
|
// loop forever on a permanent miss.
|
||||||
|
func (h *Handler) HandleTaskCompletedNotification(ctx context.Context, t *asynq.Task) error {
|
||||||
|
var p worker.TaskCompletedNotificationPayload
|
||||||
|
if err := json.Unmarshal(t.Payload(), &p); err != nil {
|
||||||
|
return fmt.Errorf("unmarshal task_completed_notification payload: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if h.taskService == nil {
|
||||||
|
log.Warn().
|
||||||
|
Uint("task_id", p.TaskID).
|
||||||
|
Uint("completion_id", p.CompletionID).
|
||||||
|
Msg("task_completed_notification handler invoked without TaskService wired — dropping job")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info().
|
||||||
|
Uint("task_id", p.TaskID).
|
||||||
|
Uint("completion_id", p.CompletionID).
|
||||||
|
Msg("Processing task completion notification")
|
||||||
|
|
||||||
|
if err := h.taskService.SendTaskCompletedNotificationByID(ctx, p.TaskID, p.CompletionID); err != nil {
|
||||||
|
log.Error().
|
||||||
|
Err(err).
|
||||||
|
Uint("task_id", p.TaskID).
|
||||||
|
Uint("completion_id", p.CompletionID).
|
||||||
|
Msg("Failed to deliver task completion notification")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -13,6 +13,25 @@ const (
|
|||||||
TypePasswordChangedEmail = "email:password_changed"
|
TypePasswordChangedEmail = "email:password_changed"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Task types for in-app notifications enqueued by the api request path.
|
||||||
|
// Handlers live in internal/worker/jobs.
|
||||||
|
const (
|
||||||
|
// TypeTaskCompletedNotification is emitted after a task completion is
|
||||||
|
// persisted; the worker fans out push + email to all residence users.
|
||||||
|
// Moves the ~1-1.5s of synchronous APNs+SMTP+B2-fetch work out of the
|
||||||
|
// POST /api/task-completions/ request path.
|
||||||
|
TypeTaskCompletedNotification = "notification:task_completed"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TaskCompletedNotificationPayload carries only the IDs needed for the
|
||||||
|
// worker to re-fetch the canonical Task + TaskCompletion rows. Keeping the
|
||||||
|
// payload to IDs (vs. full model graphs) keeps the Redis queue cheap and
|
||||||
|
// avoids serialising preloaded relations through JSON.
|
||||||
|
type TaskCompletedNotificationPayload struct {
|
||||||
|
TaskID uint `json:"task_id"`
|
||||||
|
CompletionID uint `json:"completion_id"`
|
||||||
|
}
|
||||||
|
|
||||||
// EmailPayload is the base payload for email tasks
|
// EmailPayload is the base payload for email tasks
|
||||||
type EmailPayload struct {
|
type EmailPayload struct {
|
||||||
To string `json:"to"`
|
To string `json:"to"`
|
||||||
@@ -43,10 +62,12 @@ type TaskClient struct {
|
|||||||
client *asynq.Client
|
client *asynq.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTaskClient creates a new task client
|
// NewTaskClient creates a new task client. Accepts a full RedisClientOpt so
|
||||||
func NewTaskClient(redisAddr string) *TaskClient {
|
// callers can supply Addr + Password (Redis is requirepass-protected; the
|
||||||
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
|
// password lives in a file-mounted secret, not the URL — see audit HIGH-1
|
||||||
return &TaskClient{client: client}
|
// in cmd/worker/main.go).
|
||||||
|
func NewTaskClient(opt asynq.RedisClientOpt) *TaskClient {
|
||||||
|
return &TaskClient{client: asynq.NewClient(opt)}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the task client
|
// Close closes the task client
|
||||||
@@ -125,3 +146,34 @@ func (c *TaskClient) EnqueuePasswordChangedEmail(to, firstName string) error {
|
|||||||
log.Debug().Str("to", to).Msg("Password changed email task enqueued")
|
log.Debug().Str("to", to).Msg("Password changed email task enqueued")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EnqueueTaskCompletedNotification queues fan-out push + email delivery for a
|
||||||
|
// completed task. The api request handler calls this so the response can
|
||||||
|
// return ~immediately instead of waiting on APNs + SMTP + B2 image fetches.
|
||||||
|
//
|
||||||
|
// Queue: "default". MaxRetry: 3 (Asynq retries on handler error; notifications
|
||||||
|
// are idempotent enough at the user-perception level that a few duplicate
|
||||||
|
// pushes are preferable to silent drops).
|
||||||
|
func (c *TaskClient) EnqueueTaskCompletedNotification(taskID, completionID uint) error {
|
||||||
|
payload, err := BuildTaskCompletedNotificationPayload(taskID, completionID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
task := asynq.NewTask(TypeTaskCompletedNotification, payload)
|
||||||
|
_, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3))
|
||||||
|
if err != nil {
|
||||||
|
log.Error().
|
||||||
|
Err(err).
|
||||||
|
Uint("task_id", taskID).
|
||||||
|
Uint("completion_id", completionID).
|
||||||
|
Msg("Failed to enqueue task completion notification")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug().
|
||||||
|
Uint("task_id", taskID).
|
||||||
|
Uint("completion_id", completionID).
|
||||||
|
Msg("Task completion notification enqueued")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user