Compare commits

..

2 Commits

Author SHA1 Message Date
Trey t 52bf1ff3c7 perf(task): offload completion notification fan-out to Asynq worker
Backend CI / Test (push) Has been cancelled
Backend CI / Contract Tests (push) Has been cancelled
Backend CI / Lint (push) Has been cancelled
Backend CI / Secret Scanning (push) Has been cancelled
Backend CI / Build (push) Has been cancelled
POST /api/task-completions/ was spending ~1.5-1.75s synchronously on
APNs push + SMTP email + B2 image fetches inside sendTaskCompletedNotification.
Per-user loop made it scale linearly with residence membership; one image
attached + one residence user is the 1.75s baseline observed in the live
honeydue-eli5-overview Grafana panel.

Replace the inline call (and the fire-and-forget goroutine in QuickComplete,
which violated the project's "no goroutines in handlers" rule) with an
Asynq job:

  - new task type notification:task_completed (worker/scheduler.go)
  - new payload {task_id, completion_id} — IDs only, worker re-reads
    canonical state from Postgres so concurrent edits between enqueue
    and dequeue are reflected
  - new HandleTaskCompletedNotification on jobs.Handler delegates to
    TaskService.SendTaskCompletedNotificationByID
  - new dispatchTaskCompletedNotification in task_service.go picks
    between enqueue (preferred) and inline (fallback) when Redis is
    unreachable or the enqueuer isn't wired (tests / local dev)

Other changes required to wire it up:

  - widen worker.NewTaskClient signature to accept asynq.RedisClientOpt
    so the file-mounted Redis password (audit HIGH-1) can be supplied;
    no prior callers, no breakage
  - extend worker.Enqueuer interface with EnqueueTaskCompletedNotification
  - add TaskEnqueuer field to router.Dependencies; wire from cmd/api/main.go
    with the standard typed-nil interface guard
  - wire a worker-side TaskService in cmd/worker/main.go so the handler
    can use the shared SendTaskCompletedNotificationByID implementation
    (storage service shared with the existing upload-cleanup wiring)

Expected impact on POST /api/task-completions/ p50:
  ~1.75s -> ~120-170ms (DB + tx + Asynq enqueue only)

Notifications still deliver; they just go via the worker instead of in
the request path. MaxRetry=3; "row not found" returns nil so a deleted
task/completion doesn't churn the retry loop.

All 31 test packages pass. No DB migrations.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-03 09:34:52 -05:00
Trey t e448ec66dc docs(runbook): rewrite for OVH BHS cluster + Tier-3 observability TODOs
Brings the runbook in line with the 2026-06-03 Hetzner → OVH cutover:

- Section 1-5: topology, machines (3x OVH VPS-1 BHS), software versions,
  network/firewall, DNS, filesystem layout — all reflect the live OVH
  install instead of the historical Hetzner setup.
- Section 6: canonical install-from-clean-boxes procedure (the literal
  commands run on 2026-06-03), so anyone can stand up a backup cluster
  by following along.
- Section 9: keeps existing gotchas (vmagent NetPol, token-blown-away,
  healthy-but-empty) and adds four new ones discovered during the OVH
  build: rbac.yaml not in 03-deploy.sh, namespace label missing from api
  metrics (use service="api"), cluster-label collision when two clusters
  push concurrently, worker double-firing on cutover.
- Section 11.1: enumerates Tier-3 observability gaps surfaced while
  building the honeydue-eli5-overview dashboard (node-exporter not
  deployed, Traefik metrics off, push success counters absent, worker
  /metrics endpoint absent, cache hit rate uninstrumented, APNs latency
  uninstrumented).
- Section 12: dated audit trail of cluster changes.

Pure documentation; no code or manifest changes.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-03 09:34:35 -05:00
8 changed files with 1175 additions and 211 deletions
+30
View File
@@ -9,6 +9,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/hibiken/asynq"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"gorm.io/gorm" "gorm.io/gorm"
@@ -20,6 +21,7 @@ import (
"github.com/treytartt/honeydue-api/internal/router" "github.com/treytartt/honeydue-api/internal/router"
"github.com/treytartt/honeydue-api/internal/services" "github.com/treytartt/honeydue-api/internal/services"
"github.com/treytartt/honeydue-api/internal/tracing" "github.com/treytartt/honeydue-api/internal/tracing"
"github.com/treytartt/honeydue-api/internal/worker"
"github.com/treytartt/honeydue-api/pkg/utils" "github.com/treytartt/honeydue-api/pkg/utils"
) )
@@ -194,6 +196,28 @@ func main() {
Msg("Push notification client initialized") Msg("Push notification client initialized")
} }
// Initialize Asynq enqueuer (api-side). Used by services that move
// long-running work off the request path (currently: task-completion
// notification fan-out). Same Redis as cmd/worker — file-mounted password
// applied separately because cfg.Redis.URL does not embed it (audit HIGH-1).
var taskEnqueuer *worker.TaskClient
if redisOpt, parseErr := asynq.ParseRedisURI(cfg.Redis.URL); parseErr != nil {
log.Warn().Err(parseErr).Msg("Failed to parse Redis URL for Asynq enqueuer — completion notifications will run inline")
} else if clientOpt, ok := redisOpt.(asynq.RedisClientOpt); ok {
if cfg.Redis.Password != "" {
clientOpt.Password = cfg.Redis.Password
}
taskEnqueuer = worker.NewTaskClient(clientOpt)
defer func() {
if cerr := taskEnqueuer.Close(); cerr != nil {
log.Warn().Err(cerr).Msg("Failed to close Asynq enqueuer on shutdown")
}
}()
log.Info().Msg("Asynq enqueuer initialized")
} else {
log.Warn().Msg("Redis opt is not RedisClientOpt — Asynq enqueuer skipped; completion notifications will run inline")
}
// Setup router with dependencies (includes admin panel at /admin) // Setup router with dependencies (includes admin panel at /admin)
deps := &router.Dependencies{ deps := &router.Dependencies{
DB: db, DB: db,
@@ -205,6 +229,12 @@ func main() {
StorageService: storageService, StorageService: storageService,
MonitoringService: monitoringService, MonitoringService: monitoringService,
} }
// Only assign the enqueuer when we actually constructed one. Assigning a
// nil *worker.TaskClient directly would create a typed-nil interface that
// fails the `if deps.TaskEnqueuer != nil` check in router.SetupRouter.
if taskEnqueuer != nil {
deps.TaskEnqueuer = taskEnqueuer
}
e := router.SetupRouter(deps) e := router.SetupRouter(deps)
// Create HTTP server // Create HTTP server
+29 -4
View File
@@ -23,6 +23,7 @@ import (
"github.com/treytartt/honeydue-api/internal/repositories" "github.com/treytartt/honeydue-api/internal/repositories"
"github.com/treytartt/honeydue-api/internal/services" "github.com/treytartt/honeydue-api/internal/services"
"github.com/treytartt/honeydue-api/internal/tracing" "github.com/treytartt/honeydue-api/internal/tracing"
"github.com/treytartt/honeydue-api/internal/worker"
"github.com/treytartt/honeydue-api/internal/worker/jobs" "github.com/treytartt/honeydue-api/internal/worker/jobs"
"github.com/treytartt/honeydue-api/pkg/utils" "github.com/treytartt/honeydue-api/pkg/utils"
) )
@@ -180,11 +181,15 @@ func main() {
// Create job handler // Create job handler
jobHandler := jobs.NewHandler(db, pushClient, emailService, notificationService, cfg) jobHandler := jobs.NewHandler(db, pushClient, emailService, notificationService, cfg)
// Wire upload service for the pending_uploads cleanup cron. Storage may // Wire upload service for the pending_uploads cleanup cron AND share the
// be local-disk (no S3 backend), in which case the upload service stays // underlying storage service with the TaskService below so the worker can
// nil and the cleanup handler no-ops. Cache is optional — the cleanup // load completion images for email embedding. Storage may be local-disk
// path doesn't rate-limit and works fine with a nil cache. // (no S3 backend), in which case the upload service stays nil and the
// cleanup handler no-ops. Cache is optional — the cleanup path doesn't
// rate-limit and works fine with a nil cache.
var sharedStorageService *services.StorageService
if storageService, sErr := services.NewStorageService(&cfg.Storage); sErr == nil { if storageService, sErr := services.NewStorageService(&cfg.Storage); sErr == nil {
sharedStorageService = storageService
if s3 := storageService.S3Backend(); s3 != nil { if s3 := storageService.S3Backend(); s3 != nil {
pendingUploadRepo := repositories.NewPendingUploadRepository(db) pendingUploadRepo := repositories.NewPendingUploadRepository(db)
uploadService := services.NewUploadService(pendingUploadRepo, s3, &cfg.Storage, nil) uploadService := services.NewUploadService(pendingUploadRepo, s3, &cfg.Storage, nil)
@@ -194,6 +199,25 @@ func main() {
log.Warn().Err(sErr).Msg("Failed to initialize storage service for upload cleanup; cleanup cron will no-op") log.Warn().Err(sErr).Msg("Failed to initialize storage service for upload cleanup; cleanup cron will no-op")
} }
// Wire a TaskService for the task-completed notification handler. The
// worker re-creates this (vs. importing the api's wired instance) because
// each binary owns its own dependency graph. The handler is fully nil-safe
// — if any of the wired services are absent, the corresponding side of
// notification delivery (push or email) is skipped.
taskRepo := repositories.NewTaskRepository(db)
residenceRepo := repositories.NewResidenceRepository(db)
workerTaskService := services.NewTaskService(taskRepo, residenceRepo)
if notificationService != nil {
workerTaskService.SetNotificationService(notificationService)
}
if emailService != nil {
workerTaskService.SetEmailService(emailService)
}
if sharedStorageService != nil {
workerTaskService.SetStorageService(sharedStorageService)
}
jobHandler.SetTaskService(workerTaskService)
// Create Asynq mux and register handlers // Create Asynq mux and register handlers
mux := asynq.NewServeMux() mux := asynq.NewServeMux()
@@ -208,6 +232,7 @@ func main() {
mux.HandleFunc(jobs.TypeOnboardingEmails, jobHandler.HandleOnboardingEmails) mux.HandleFunc(jobs.TypeOnboardingEmails, jobHandler.HandleOnboardingEmails)
mux.HandleFunc(jobs.TypeReminderLogCleanup, jobHandler.HandleReminderLogCleanup) mux.HandleFunc(jobs.TypeReminderLogCleanup, jobHandler.HandleReminderLogCleanup)
mux.HandleFunc(jobs.TypeUploadCleanup, jobHandler.HandleUploadCleanup) mux.HandleFunc(jobs.TypeUploadCleanup, jobHandler.HandleUploadCleanup)
mux.HandleFunc(worker.TypeTaskCompletedNotification, jobHandler.HandleTaskCompletedNotification)
// Register email job handlers (welcome, verification, password reset, password changed) // Register email job handlers (welcome, verification, password reset, password changed)
if emailService != nil { if emailService != nil {
+895 -191
View File
File diff suppressed because it is too large Load Diff
+13
View File
@@ -30,6 +30,7 @@ import (
"github.com/treytartt/honeydue-api/internal/repositories" "github.com/treytartt/honeydue-api/internal/repositories"
"github.com/treytartt/honeydue-api/internal/services" "github.com/treytartt/honeydue-api/internal/services"
customvalidator "github.com/treytartt/honeydue-api/internal/validator" customvalidator "github.com/treytartt/honeydue-api/internal/validator"
"github.com/treytartt/honeydue-api/internal/worker"
"github.com/treytartt/honeydue-api/pkg/utils" "github.com/treytartt/honeydue-api/pkg/utils"
) )
@@ -45,6 +46,11 @@ type Dependencies struct {
PushClient *push.Client // Direct APNs/FCM client PushClient *push.Client // Direct APNs/FCM client
StorageService *services.StorageService StorageService *services.StorageService
MonitoringService *monitoring.Service MonitoringService *monitoring.Service
// TaskEnqueuer is the Asynq client used to push background work onto the
// shared Redis queue. Optional — when nil, services that would enqueue
// (currently: task-completion notification fan-out) fall back to their
// inline implementation. Tests can omit it; production must wire it.
TaskEnqueuer worker.Enqueuer
} }
// SetupRouter creates and configures the Echo router // SetupRouter creates and configures the Echo router
@@ -215,6 +221,13 @@ func SetupRouter(deps *Dependencies) *echo.Echo {
taskService.SetEmailService(deps.EmailService) taskService.SetEmailService(deps.EmailService)
taskService.SetResidenceService(residenceService) // For including TotalSummary in CRUD responses taskService.SetResidenceService(residenceService) // For including TotalSummary in CRUD responses
taskService.SetStorageService(deps.StorageService) // For reading completion images for email taskService.SetStorageService(deps.StorageService) // For reading completion images for email
if deps.TaskEnqueuer != nil {
// Offload completion notifications (push + email + B2 image fetches)
// to the Asynq worker so POST /api/task-completions/ doesn't pay for
// them in the response path. When the enqueuer is absent (tests),
// task_service falls back to the inline implementation.
taskService.SetTaskCompletedNotificationEnqueuer(deps.TaskEnqueuer)
}
subscriptionService := services.NewSubscriptionService(subscriptionRepo, residenceRepo, taskRepo, contractorRepo, documentRepo) subscriptionService := services.NewSubscriptionService(subscriptionRepo, residenceRepo, taskRepo, contractorRepo, documentRepo)
residenceService.SetSubscriptionService(subscriptionService) // Wire up subscription service for tier limit enforcement residenceService.SetSubscriptionService(subscriptionService) // Wire up subscription service for tier limit enforcement
+85 -11
View File
@@ -29,6 +29,16 @@ var (
ErrCompletionNotFound = errors.New("task completion not found") ErrCompletionNotFound = errors.New("task completion not found")
) )
// TaskCompletedNotificationEnqueuer is the narrow contract TaskService needs
// from the Asynq client to offload completion-notification fan-out (push +
// email + B2 image fetches) into the background worker. Implemented by
// internal/worker.TaskClient; nil when the api is started without a worker
// queue (tests / local dev), in which case CreateCompletion falls back to
// the inline synchronous path.
type TaskCompletedNotificationEnqueuer interface {
EnqueueTaskCompletedNotification(taskID, completionID uint) error
}
// TaskService handles task business logic // TaskService handles task business logic
type TaskService struct { type TaskService struct {
taskRepo *repositories.TaskRepository taskRepo *repositories.TaskRepository
@@ -39,6 +49,7 @@ type TaskService struct {
storageService *StorageService storageService *StorageService
uploadService *UploadService // optional — only set when S3 storage is configured uploadService *UploadService // optional — only set when S3 storage is configured
cache *CacheService cache *CacheService
notifEnqueuer TaskCompletedNotificationEnqueuer
} }
// SetUploadService wires the presigned-URL upload service so CreateCompletion // SetUploadService wires the presigned-URL upload service so CreateCompletion
@@ -82,6 +93,14 @@ func (s *TaskService) SetStorageService(ss *StorageService) {
s.storageService = ss s.storageService = ss
} }
// SetTaskCompletedNotificationEnqueuer wires the Asynq enqueuer. When set,
// CreateCompletion/QuickComplete schedule the notification fan-out as a
// background job instead of running it inline (saving ~1-1.5s on the
// user-facing response). When nil, the inline path runs.
func (s *TaskService) SetTaskCompletedNotificationEnqueuer(e TaskCompletedNotificationEnqueuer) {
s.notifEnqueuer = e
}
// getSummaryForUser returns an empty summary placeholder. // getSummaryForUser returns an empty summary placeholder.
// DEPRECATED: Summary calculation has been removed from CRUD responses for performance. // DEPRECATED: Summary calculation has been removed from CRUD responses for performance.
// Clients should calculate summary from kanban data instead (which already includes all tasks). // Clients should calculate summary from kanban data instead (which already includes all tasks).
@@ -769,8 +788,11 @@ func (s *TaskService) CreateCompletion(ctx context.Context, req *requests.Create
}, nil }, nil
} }
// Send notification to residence owner and other users // Dispatch notification fan-out to the Asynq worker so the api response
s.sendTaskCompletedNotification(ctx, task, completion) // returns without waiting on APNs + SMTP + B2 image fetches (which cost
// ~1-1.5s end-to-end). Falls back to the inline path when no enqueuer is
// wired (tests / local dev) or when Redis is unreachable.
s.dispatchTaskCompletedNotification(ctx, task, completion)
// Return completion with updated task (includes kanban_column for UI update) // Return completion with updated task (includes kanban_column for UI update)
resp := responses.NewTaskCompletionWithTaskResponseWithTime(completion, task, 30, now) resp := responses.NewTaskCompletionWithTaskResponseWithTime(completion, task, 30, now)
@@ -876,19 +898,71 @@ func (s *TaskService) QuickComplete(ctx context.Context, taskID uint, userID uin
} }
log.Info().Uint("task_id", task.ID).Msg("QuickComplete: Task updated successfully") log.Info().Uint("task_id", task.ID).Msg("QuickComplete: Task updated successfully")
// Send notification (fire and forget with panic recovery) // Dispatch notification fan-out to the Asynq worker. Replaces the previous
go func() { // fire-and-forget goroutine — which violated the project rule against
defer func() { // spawning goroutines in request handlers and was unbounded under load.
if r := recover(); r != nil { // Falls back to inline send when no enqueuer is wired.
log.Error().Interface("panic", r).Uint("task_id", task.ID).Msg("Panic in quick-complete notification goroutine") s.dispatchTaskCompletedNotification(ctx, task, completion)
}
}()
s.sendTaskCompletedNotification(ctx, task, completion)
}()
return nil return nil
} }
// dispatchTaskCompletedNotification routes the notification fan-out to either
// the Asynq worker (preferred — keeps the api request path fast) or runs it
// inline as a fallback. The fallback covers:
// - tests / local dev where no enqueuer is wired (notifEnqueuer == nil)
// - Redis outages where Enqueue returns an error
//
// Inline fallback is intentionally synchronous (no goroutines) per the
// project's rule against unbounded goroutine spawning in handlers. The
// caller is expected to be in a request goroutine and accepting the cost.
func (s *TaskService) dispatchTaskCompletedNotification(ctx context.Context, task *models.Task, completion *models.TaskCompletion) {
if s.notifEnqueuer != nil {
if err := s.notifEnqueuer.EnqueueTaskCompletedNotification(task.ID, completion.ID); err == nil {
return
} else {
log.Warn().
Err(err).
Uint("task_id", task.ID).
Uint("completion_id", completion.ID).
Msg("Failed to enqueue completion notification; falling back to inline send")
}
}
s.sendTaskCompletedNotification(ctx, task, completion)
}
// SendTaskCompletedNotificationByID is the public entry point used by the
// Asynq worker. It re-reads the canonical Task + TaskCompletion rows from
// Postgres (cheap with Neon ~10ms away) so the worker reflects any concurrent
// edits between enqueue and dequeue, then delegates to the shared
// sendTaskCompletedNotification implementation.
//
// Returns nil for "row not found" cases (task or completion was deleted before
// the job ran) so Asynq's retry loop doesn't churn on a permanent miss. All
// other errors propagate to Asynq for retry per the queue's MaxRetry setting.
func (s *TaskService) SendTaskCompletedNotificationByID(ctx context.Context, taskID, completionID uint) error {
task, err := s.taskRepo.WithContext(ctx).FindByID(taskID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
log.Warn().Uint("task_id", taskID).Msg("task not found for completion notification (likely deleted between enqueue and dequeue)")
return nil
}
return err
}
completion, err := s.taskRepo.WithContext(ctx).FindCompletionByID(completionID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
log.Warn().Uint("completion_id", completionID).Msg("completion not found for notification (likely deleted between enqueue and dequeue)")
return nil
}
return err
}
s.sendTaskCompletedNotification(ctx, task, completion)
return nil
}
// sendTaskCompletedNotification sends notifications when a task is completed // sendTaskCompletedNotification sends notifications when a task is completed
func (s *TaskService) sendTaskCompletedNotification(ctx context.Context, task *models.Task, completion *models.TaskCompletion) { func (s *TaskService) sendTaskCompletedNotification(ctx context.Context, task *models.Task, completion *models.TaskCompletion) {
// Get all users with access to this residence // Get all users with access to this residence
+14 -1
View File
@@ -2,12 +2,16 @@ package worker
import "encoding/json" import "encoding/json"
// Enqueuer defines the interface for enqueuing background email tasks. // Enqueuer defines the interface for enqueuing background email + notification
// tasks from the api request path. Implementations are expected to be cheap to
// call and non-blocking (Asynq's client batches over a persistent Redis
// connection).
type Enqueuer interface { type Enqueuer interface {
EnqueueWelcomeEmail(to, firstName, code string) error EnqueueWelcomeEmail(to, firstName, code string) error
EnqueueVerificationEmail(to, firstName, code string) error EnqueueVerificationEmail(to, firstName, code string) error
EnqueuePasswordResetEmail(to, firstName, code, resetToken string) error EnqueuePasswordResetEmail(to, firstName, code, resetToken string) error
EnqueuePasswordChangedEmail(to, firstName string) error EnqueuePasswordChangedEmail(to, firstName string) error
EnqueueTaskCompletedNotification(taskID, completionID uint) error
} }
// Verify TaskClient satisfies the interface at compile time. // Verify TaskClient satisfies the interface at compile time.
@@ -42,3 +46,12 @@ func BuildPasswordResetEmailPayload(to, firstName, code, resetToken string) ([]b
func BuildPasswordChangedEmailPayload(to, firstName string) ([]byte, error) { func BuildPasswordChangedEmailPayload(to, firstName string) ([]byte, error) {
return json.Marshal(EmailPayload{To: to, FirstName: firstName}) return json.Marshal(EmailPayload{To: to, FirstName: firstName})
} }
// BuildTaskCompletedNotificationPayload marshals a TaskCompletedNotificationPayload
// to JSON bytes for the Asynq queue.
func BuildTaskCompletedNotificationPayload(taskID, completionID uint) ([]byte, error) {
return json.Marshal(TaskCompletedNotificationPayload{
TaskID: taskID,
CompletionID: completionID,
})
}
+53
View File
@@ -16,6 +16,7 @@ import (
"github.com/treytartt/honeydue-api/internal/push" "github.com/treytartt/honeydue-api/internal/push"
"github.com/treytartt/honeydue-api/internal/repositories" "github.com/treytartt/honeydue-api/internal/repositories"
"github.com/treytartt/honeydue-api/internal/services" "github.com/treytartt/honeydue-api/internal/services"
"github.com/treytartt/honeydue-api/internal/worker"
) )
// Task types // Task types
@@ -41,6 +42,7 @@ type Handler struct {
notificationService NotificationSender notificationService NotificationSender
onboardingService OnboardingEmailSender onboardingService OnboardingEmailSender
uploadService *services.UploadService uploadService *services.UploadService
taskService *services.TaskService
config *config.Config config *config.Config
} }
@@ -51,6 +53,14 @@ func (h *Handler) SetUploadService(us *services.UploadService) {
h.uploadService = us h.uploadService = us
} }
// SetTaskService wires the api-side TaskService so HandleTaskCompletedNotification
// can re-use the same SendTaskCompletedNotificationByID logic the inline path
// used to call. Required for the task-completed notification job; without it
// the handler logs a warning and no-ops (notifications silently dropped).
func (h *Handler) SetTaskService(ts *services.TaskService) {
h.taskService = ts
}
// NewHandler creates a new job handler // NewHandler creates a new job handler
func NewHandler(db *gorm.DB, pushClient *push.Client, emailService *services.EmailService, notificationService *services.NotificationService, cfg *config.Config) *Handler { func NewHandler(db *gorm.DB, pushClient *push.Client, emailService *services.EmailService, notificationService *services.NotificationService, cfg *config.Config) *Handler {
h := &Handler{ h := &Handler{
@@ -677,3 +687,46 @@ func (h *Handler) HandleUploadCleanup(ctx context.Context, task *asynq.Task) err
log.Info().Int("reaped", reaped).Msg("Pending uploads cleanup completed") log.Info().Int("reaped", reaped).Msg("Pending uploads cleanup completed")
return nil return nil
} }
// HandleTaskCompletedNotification fans out push + email notifications for a
// completed task. Enqueued by the api request handler (POST
// /api/task-completions/) so the synchronous chain of APNs + SMTP + B2 image
// fetches happens here instead of in the user-facing request path.
//
// The payload only carries IDs; canonical state is re-read from Postgres so
// the worker reflects any concurrent edits to the Task or Completion that
// happened between enqueue and dequeue.
//
// Asynq retries on returned error; we return nil for "row not found" cases
// (task or completion got deleted before the job ran) so retries don't
// loop forever on a permanent miss.
func (h *Handler) HandleTaskCompletedNotification(ctx context.Context, t *asynq.Task) error {
var p worker.TaskCompletedNotificationPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("unmarshal task_completed_notification payload: %w", err)
}
if h.taskService == nil {
log.Warn().
Uint("task_id", p.TaskID).
Uint("completion_id", p.CompletionID).
Msg("task_completed_notification handler invoked without TaskService wired — dropping job")
return nil
}
log.Info().
Uint("task_id", p.TaskID).
Uint("completion_id", p.CompletionID).
Msg("Processing task completion notification")
if err := h.taskService.SendTaskCompletedNotificationByID(ctx, p.TaskID, p.CompletionID); err != nil {
log.Error().
Err(err).
Uint("task_id", p.TaskID).
Uint("completion_id", p.CompletionID).
Msg("Failed to deliver task completion notification")
return err
}
return nil
}
+56 -4
View File
@@ -13,6 +13,25 @@ const (
TypePasswordChangedEmail = "email:password_changed" TypePasswordChangedEmail = "email:password_changed"
) )
// Task types for in-app notifications enqueued by the api request path.
// Handlers live in internal/worker/jobs.
const (
// TypeTaskCompletedNotification is emitted after a task completion is
// persisted; the worker fans out push + email to all residence users.
// Moves the ~1-1.5s of synchronous APNs+SMTP+B2-fetch work out of the
// POST /api/task-completions/ request path.
TypeTaskCompletedNotification = "notification:task_completed"
)
// TaskCompletedNotificationPayload carries only the IDs needed for the
// worker to re-fetch the canonical Task + TaskCompletion rows. Keeping the
// payload to IDs (vs. full model graphs) keeps the Redis queue cheap and
// avoids serialising preloaded relations through JSON.
type TaskCompletedNotificationPayload struct {
TaskID uint `json:"task_id"`
CompletionID uint `json:"completion_id"`
}
// EmailPayload is the base payload for email tasks // EmailPayload is the base payload for email tasks
type EmailPayload struct { type EmailPayload struct {
To string `json:"to"` To string `json:"to"`
@@ -43,10 +62,12 @@ type TaskClient struct {
client *asynq.Client client *asynq.Client
} }
// NewTaskClient creates a new task client // NewTaskClient creates a new task client. Accepts a full RedisClientOpt so
func NewTaskClient(redisAddr string) *TaskClient { // callers can supply Addr + Password (Redis is requirepass-protected; the
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr}) // password lives in a file-mounted secret, not the URL — see audit HIGH-1
return &TaskClient{client: client} // in cmd/worker/main.go).
func NewTaskClient(opt asynq.RedisClientOpt) *TaskClient {
return &TaskClient{client: asynq.NewClient(opt)}
} }
// Close closes the task client // Close closes the task client
@@ -125,3 +146,34 @@ func (c *TaskClient) EnqueuePasswordChangedEmail(to, firstName string) error {
log.Debug().Str("to", to).Msg("Password changed email task enqueued") log.Debug().Str("to", to).Msg("Password changed email task enqueued")
return nil return nil
} }
// EnqueueTaskCompletedNotification queues fan-out push + email delivery for a
// completed task. The api request handler calls this so the response can
// return ~immediately instead of waiting on APNs + SMTP + B2 image fetches.
//
// Queue: "default". MaxRetry: 3 (Asynq retries on handler error; notifications
// are idempotent enough at the user-perception level that a few duplicate
// pushes are preferable to silent drops).
func (c *TaskClient) EnqueueTaskCompletedNotification(taskID, completionID uint) error {
payload, err := BuildTaskCompletedNotificationPayload(taskID, completionID)
if err != nil {
return err
}
task := asynq.NewTask(TypeTaskCompletedNotification, payload)
_, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3))
if err != nil {
log.Error().
Err(err).
Uint("task_id", taskID).
Uint("completion_id", completionID).
Msg("Failed to enqueue task completion notification")
return err
}
log.Debug().
Uint("task_id", taskID).
Uint("completion_id", completionID).
Msg("Task completion notification enqueued")
return nil
}