package worker import ( "context" "encoding/json" "fmt" "time" "github.com/hibiken/asynq" "github.com/rs/zerolog/log" ) // Task types const ( TypeWelcomeEmail = "email:welcome" TypeVerificationEmail = "email:verification" TypePasswordResetEmail = "email:password_reset" TypePasswordChangedEmail = "email:password_changed" TypeTaskCompletionEmail = "email:task_completion" TypeGeneratePDFReport = "pdf:generate_report" TypeUpdateContractorRating = "contractor:update_rating" TypeDailyNotifications = "notifications:daily" TypeTaskReminders = "notifications:task_reminders" TypeOverdueReminders = "notifications:overdue_reminders" ) // EmailPayload is the base payload for email tasks type EmailPayload struct { To string `json:"to"` FirstName string `json:"first_name"` } // WelcomeEmailPayload is the payload for welcome emails type WelcomeEmailPayload struct { EmailPayload ConfirmationCode string `json:"confirmation_code"` } // VerificationEmailPayload is the payload for verification emails type VerificationEmailPayload struct { EmailPayload Code string `json:"code"` } // PasswordResetEmailPayload is the payload for password reset emails type PasswordResetEmailPayload struct { EmailPayload Code string `json:"code"` ResetToken string `json:"reset_token"` } // TaskClient wraps the asynq client for enqueuing tasks type TaskClient struct { client *asynq.Client } // NewTaskClient creates a new task client func NewTaskClient(redisAddr string) *TaskClient { client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr}) return &TaskClient{client: client} } // Close closes the task client func (c *TaskClient) Close() error { return c.client.Close() } // EnqueueWelcomeEmail enqueues a welcome email task func (c *TaskClient) EnqueueWelcomeEmail(to, firstName, code string) error { payload, err := json.Marshal(WelcomeEmailPayload{ EmailPayload: EmailPayload{To: to, FirstName: firstName}, ConfirmationCode: code, }) if err != nil { return err } task := asynq.NewTask(TypeWelcomeEmail, payload) _, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3)) if err != nil { log.Error().Err(err).Str("to", to).Msg("Failed to enqueue welcome email") return err } log.Debug().Str("to", to).Msg("Welcome email task enqueued") return nil } // EnqueueVerificationEmail enqueues a verification email task func (c *TaskClient) EnqueueVerificationEmail(to, firstName, code string) error { payload, err := json.Marshal(VerificationEmailPayload{ EmailPayload: EmailPayload{To: to, FirstName: firstName}, Code: code, }) if err != nil { return err } task := asynq.NewTask(TypeVerificationEmail, payload) _, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3)) if err != nil { log.Error().Err(err).Str("to", to).Msg("Failed to enqueue verification email") return err } log.Debug().Str("to", to).Msg("Verification email task enqueued") return nil } // EnqueuePasswordResetEmail enqueues a password reset email task func (c *TaskClient) EnqueuePasswordResetEmail(to, firstName, code, resetToken string) error { payload, err := json.Marshal(PasswordResetEmailPayload{ EmailPayload: EmailPayload{To: to, FirstName: firstName}, Code: code, ResetToken: resetToken, }) if err != nil { return err } task := asynq.NewTask(TypePasswordResetEmail, payload) _, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3)) if err != nil { log.Error().Err(err).Str("to", to).Msg("Failed to enqueue password reset email") return err } log.Debug().Str("to", to).Msg("Password reset email task enqueued") return nil } // EnqueuePasswordChangedEmail enqueues a password changed confirmation email func (c *TaskClient) EnqueuePasswordChangedEmail(to, firstName string) error { payload, err := json.Marshal(EmailPayload{To: to, FirstName: firstName}) if err != nil { return err } task := asynq.NewTask(TypePasswordChangedEmail, payload) _, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3)) if err != nil { log.Error().Err(err).Str("to", to).Msg("Failed to enqueue password changed email") return err } log.Debug().Str("to", to).Msg("Password changed email task enqueued") return nil } // WorkerServer manages the asynq worker server type WorkerServer struct { server *asynq.Server scheduler *asynq.Scheduler } // NewWorkerServer creates a new worker server func NewWorkerServer(redisAddr string, concurrency int) *WorkerServer { srv := asynq.NewServer( asynq.RedisClientOpt{Addr: redisAddr}, asynq.Config{ Concurrency: concurrency, Queues: map[string]int{ "critical": 6, "default": 3, "low": 1, }, ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) { log.Error(). Err(err). Str("type", task.Type()). Bytes("payload", task.Payload()). Msg("Task failed") }), }, ) // Create scheduler for periodic tasks loc, _ := time.LoadLocation("UTC") scheduler := asynq.NewScheduler( asynq.RedisClientOpt{Addr: redisAddr}, &asynq.SchedulerOpts{ Location: loc, }, ) return &WorkerServer{ server: srv, scheduler: scheduler, } } // RegisterHandlers registers task handlers func (w *WorkerServer) RegisterHandlers(mux *asynq.ServeMux) { // Handlers will be registered by the main worker process } // RegisterScheduledTasks registers periodic tasks func (w *WorkerServer) RegisterScheduledTasks() error { // Task reminders - 8:00 PM UTC daily _, err := w.scheduler.Register("0 20 * * *", asynq.NewTask(TypeTaskReminders, nil)) if err != nil { return fmt.Errorf("failed to register task reminders: %w", err) } // Overdue reminders - 9:00 AM UTC daily _, err = w.scheduler.Register("0 9 * * *", asynq.NewTask(TypeOverdueReminders, nil)) if err != nil { return fmt.Errorf("failed to register overdue reminders: %w", err) } // Daily notifications - 11:00 AM UTC daily _, err = w.scheduler.Register("0 11 * * *", asynq.NewTask(TypeDailyNotifications, nil)) if err != nil { return fmt.Errorf("failed to register daily notifications: %w", err) } return nil } // Start starts the worker server and scheduler func (w *WorkerServer) Start(mux *asynq.ServeMux) error { // Start scheduler if err := w.scheduler.Start(); err != nil { return fmt.Errorf("failed to start scheduler: %w", err) } // Start server if err := w.server.Start(mux); err != nil { return fmt.Errorf("failed to start worker server: %w", err) } return nil } // Shutdown gracefully shuts down the worker server func (w *WorkerServer) Shutdown() { w.scheduler.Shutdown() w.server.Shutdown() }