package worker import ( "github.com/hibiken/asynq" "github.com/rs/zerolog/log" ) // Task types for email jobs const ( TypeWelcomeEmail = "email:welcome" TypeVerificationEmail = "email:verification" TypePasswordResetEmail = "email:password_reset" TypePasswordChangedEmail = "email:password_changed" ) // Task types for in-app notifications enqueued by the api request path. // Handlers live in internal/worker/jobs. const ( // TypeTaskCompletedNotification is emitted after a task completion is // persisted; the worker fans out push + email to all residence users. // Moves the ~1-1.5s of synchronous APNs+SMTP+B2-fetch work out of the // POST /api/task-completions/ request path. TypeTaskCompletedNotification = "notification:task_completed" ) // TaskCompletedNotificationPayload carries only the IDs needed for the // worker to re-fetch the canonical Task + TaskCompletion rows. Keeping the // payload to IDs (vs. full model graphs) keeps the Redis queue cheap and // avoids serialising preloaded relations through JSON. type TaskCompletedNotificationPayload struct { TaskID uint `json:"task_id"` CompletionID uint `json:"completion_id"` } // EmailPayload is the base payload for email tasks type EmailPayload struct { To string `json:"to"` FirstName string `json:"first_name"` } // WelcomeEmailPayload is the payload for welcome emails type WelcomeEmailPayload struct { EmailPayload ConfirmationCode string `json:"confirmation_code"` } // VerificationEmailPayload is the payload for verification emails type VerificationEmailPayload struct { EmailPayload Code string `json:"code"` } // PasswordResetEmailPayload is the payload for password reset emails type PasswordResetEmailPayload struct { EmailPayload Code string `json:"code"` ResetToken string `json:"reset_token"` } // TaskClient wraps the asynq client for enqueuing tasks type TaskClient struct { client *asynq.Client } // NewTaskClient creates a new task client. Accepts a full RedisClientOpt so // callers can supply Addr + Password (Redis is requirepass-protected; the // password lives in a file-mounted secret, not the URL — see audit HIGH-1 // in cmd/worker/main.go). func NewTaskClient(opt asynq.RedisClientOpt) *TaskClient { return &TaskClient{client: asynq.NewClient(opt)} } // Close closes the task client func (c *TaskClient) Close() error { return c.client.Close() } // EnqueueWelcomeEmail enqueues a welcome email task func (c *TaskClient) EnqueueWelcomeEmail(to, firstName, code string) error { payload, err := BuildWelcomeEmailPayload(to, firstName, code) if err != nil { return err } task := asynq.NewTask(TypeWelcomeEmail, payload) _, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3)) if err != nil { log.Error().Err(err).Str("to", to).Msg("Failed to enqueue welcome email") return err } log.Debug().Str("to", to).Msg("Welcome email task enqueued") return nil } // EnqueueVerificationEmail enqueues a verification email task func (c *TaskClient) EnqueueVerificationEmail(to, firstName, code string) error { payload, err := BuildVerificationEmailPayload(to, firstName, code) if err != nil { return err } task := asynq.NewTask(TypeVerificationEmail, payload) _, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3)) if err != nil { log.Error().Err(err).Str("to", to).Msg("Failed to enqueue verification email") return err } log.Debug().Str("to", to).Msg("Verification email task enqueued") return nil } // EnqueuePasswordResetEmail enqueues a password reset email task func (c *TaskClient) EnqueuePasswordResetEmail(to, firstName, code, resetToken string) error { payload, err := BuildPasswordResetEmailPayload(to, firstName, code, resetToken) if err != nil { return err } task := asynq.NewTask(TypePasswordResetEmail, payload) _, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3)) if err != nil { log.Error().Err(err).Str("to", to).Msg("Failed to enqueue password reset email") return err } log.Debug().Str("to", to).Msg("Password reset email task enqueued") return nil } // EnqueuePasswordChangedEmail enqueues a password changed confirmation email func (c *TaskClient) EnqueuePasswordChangedEmail(to, firstName string) error { payload, err := BuildPasswordChangedEmailPayload(to, firstName) if err != nil { return err } task := asynq.NewTask(TypePasswordChangedEmail, payload) _, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3)) if err != nil { log.Error().Err(err).Str("to", to).Msg("Failed to enqueue password changed email") return err } log.Debug().Str("to", to).Msg("Password changed email task enqueued") return nil } // EnqueueTaskCompletedNotification queues fan-out push + email delivery for a // completed task. The api request handler calls this so the response can // return ~immediately instead of waiting on APNs + SMTP + B2 image fetches. // // Queue: "default". MaxRetry: 3 (Asynq retries on handler error; notifications // are idempotent enough at the user-perception level that a few duplicate // pushes are preferable to silent drops). func (c *TaskClient) EnqueueTaskCompletedNotification(taskID, completionID uint) error { payload, err := BuildTaskCompletedNotificationPayload(taskID, completionID) if err != nil { return err } task := asynq.NewTask(TypeTaskCompletedNotification, payload) _, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3)) if err != nil { log.Error(). Err(err). Uint("task_id", taskID). Uint("completion_id", completionID). Msg("Failed to enqueue task completion notification") return err } log.Debug(). Uint("task_id", taskID). Uint("completion_id", completionID). Msg("Task completion notification enqueued") return nil }