Complete rewrite of Django REST API to Go with: - Gin web framework for HTTP routing - GORM for database operations - GoAdmin for admin panel - Gorush integration for push notifications - Redis for caching and job queues Features implemented: - User authentication (login, register, logout, password reset) - Residence management (CRUD, sharing, share codes) - Task management (CRUD, kanban board, completions) - Contractor management (CRUD, specialties) - Document management (CRUD, warranties) - Notifications (preferences, push notifications) - Subscription management (tiers, limits) Infrastructure: - Docker Compose for local development - Database migrations and seed data - Admin panel for data management 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
240 lines
6.6 KiB
Go
240 lines
6.6 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/hibiken/asynq"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
// Task types
|
|
const (
|
|
TypeWelcomeEmail = "email:welcome"
|
|
TypeVerificationEmail = "email:verification"
|
|
TypePasswordResetEmail = "email:password_reset"
|
|
TypePasswordChangedEmail = "email:password_changed"
|
|
TypeTaskCompletionEmail = "email:task_completion"
|
|
TypeGeneratePDFReport = "pdf:generate_report"
|
|
TypeUpdateContractorRating = "contractor:update_rating"
|
|
TypeDailyNotifications = "notifications:daily"
|
|
TypeTaskReminders = "notifications:task_reminders"
|
|
TypeOverdueReminders = "notifications:overdue_reminders"
|
|
)
|
|
|
|
// EmailPayload is the base payload for email tasks
|
|
type EmailPayload struct {
|
|
To string `json:"to"`
|
|
FirstName string `json:"first_name"`
|
|
}
|
|
|
|
// WelcomeEmailPayload is the payload for welcome emails
|
|
type WelcomeEmailPayload struct {
|
|
EmailPayload
|
|
ConfirmationCode string `json:"confirmation_code"`
|
|
}
|
|
|
|
// VerificationEmailPayload is the payload for verification emails
|
|
type VerificationEmailPayload struct {
|
|
EmailPayload
|
|
Code string `json:"code"`
|
|
}
|
|
|
|
// PasswordResetEmailPayload is the payload for password reset emails
|
|
type PasswordResetEmailPayload struct {
|
|
EmailPayload
|
|
Code string `json:"code"`
|
|
ResetToken string `json:"reset_token"`
|
|
}
|
|
|
|
// TaskClient wraps the asynq client for enqueuing tasks
|
|
type TaskClient struct {
|
|
client *asynq.Client
|
|
}
|
|
|
|
// NewTaskClient creates a new task client
|
|
func NewTaskClient(redisAddr string) *TaskClient {
|
|
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
|
|
return &TaskClient{client: client}
|
|
}
|
|
|
|
// Close closes the task client
|
|
func (c *TaskClient) Close() error {
|
|
return c.client.Close()
|
|
}
|
|
|
|
// EnqueueWelcomeEmail enqueues a welcome email task
|
|
func (c *TaskClient) EnqueueWelcomeEmail(to, firstName, code string) error {
|
|
payload, err := json.Marshal(WelcomeEmailPayload{
|
|
EmailPayload: EmailPayload{To: to, FirstName: firstName},
|
|
ConfirmationCode: code,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
task := asynq.NewTask(TypeWelcomeEmail, payload)
|
|
_, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3))
|
|
if err != nil {
|
|
log.Error().Err(err).Str("to", to).Msg("Failed to enqueue welcome email")
|
|
return err
|
|
}
|
|
|
|
log.Debug().Str("to", to).Msg("Welcome email task enqueued")
|
|
return nil
|
|
}
|
|
|
|
// EnqueueVerificationEmail enqueues a verification email task
|
|
func (c *TaskClient) EnqueueVerificationEmail(to, firstName, code string) error {
|
|
payload, err := json.Marshal(VerificationEmailPayload{
|
|
EmailPayload: EmailPayload{To: to, FirstName: firstName},
|
|
Code: code,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
task := asynq.NewTask(TypeVerificationEmail, payload)
|
|
_, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3))
|
|
if err != nil {
|
|
log.Error().Err(err).Str("to", to).Msg("Failed to enqueue verification email")
|
|
return err
|
|
}
|
|
|
|
log.Debug().Str("to", to).Msg("Verification email task enqueued")
|
|
return nil
|
|
}
|
|
|
|
// EnqueuePasswordResetEmail enqueues a password reset email task
|
|
func (c *TaskClient) EnqueuePasswordResetEmail(to, firstName, code, resetToken string) error {
|
|
payload, err := json.Marshal(PasswordResetEmailPayload{
|
|
EmailPayload: EmailPayload{To: to, FirstName: firstName},
|
|
Code: code,
|
|
ResetToken: resetToken,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
task := asynq.NewTask(TypePasswordResetEmail, payload)
|
|
_, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3))
|
|
if err != nil {
|
|
log.Error().Err(err).Str("to", to).Msg("Failed to enqueue password reset email")
|
|
return err
|
|
}
|
|
|
|
log.Debug().Str("to", to).Msg("Password reset email task enqueued")
|
|
return nil
|
|
}
|
|
|
|
// EnqueuePasswordChangedEmail enqueues a password changed confirmation email
|
|
func (c *TaskClient) EnqueuePasswordChangedEmail(to, firstName string) error {
|
|
payload, err := json.Marshal(EmailPayload{To: to, FirstName: firstName})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
task := asynq.NewTask(TypePasswordChangedEmail, payload)
|
|
_, err = c.client.Enqueue(task, asynq.Queue("default"), asynq.MaxRetry(3))
|
|
if err != nil {
|
|
log.Error().Err(err).Str("to", to).Msg("Failed to enqueue password changed email")
|
|
return err
|
|
}
|
|
|
|
log.Debug().Str("to", to).Msg("Password changed email task enqueued")
|
|
return nil
|
|
}
|
|
|
|
// WorkerServer manages the asynq worker server
|
|
type WorkerServer struct {
|
|
server *asynq.Server
|
|
scheduler *asynq.Scheduler
|
|
}
|
|
|
|
// NewWorkerServer creates a new worker server
|
|
func NewWorkerServer(redisAddr string, concurrency int) *WorkerServer {
|
|
srv := asynq.NewServer(
|
|
asynq.RedisClientOpt{Addr: redisAddr},
|
|
asynq.Config{
|
|
Concurrency: concurrency,
|
|
Queues: map[string]int{
|
|
"critical": 6,
|
|
"default": 3,
|
|
"low": 1,
|
|
},
|
|
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
|
|
log.Error().
|
|
Err(err).
|
|
Str("type", task.Type()).
|
|
Bytes("payload", task.Payload()).
|
|
Msg("Task failed")
|
|
}),
|
|
},
|
|
)
|
|
|
|
// Create scheduler for periodic tasks
|
|
loc, _ := time.LoadLocation("UTC")
|
|
scheduler := asynq.NewScheduler(
|
|
asynq.RedisClientOpt{Addr: redisAddr},
|
|
&asynq.SchedulerOpts{
|
|
Location: loc,
|
|
},
|
|
)
|
|
|
|
return &WorkerServer{
|
|
server: srv,
|
|
scheduler: scheduler,
|
|
}
|
|
}
|
|
|
|
// RegisterHandlers registers task handlers
|
|
func (w *WorkerServer) RegisterHandlers(mux *asynq.ServeMux) {
|
|
// Handlers will be registered by the main worker process
|
|
}
|
|
|
|
// RegisterScheduledTasks registers periodic tasks
|
|
func (w *WorkerServer) RegisterScheduledTasks() error {
|
|
// Task reminders - 8:00 PM UTC daily
|
|
_, err := w.scheduler.Register("0 20 * * *", asynq.NewTask(TypeTaskReminders, nil))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to register task reminders: %w", err)
|
|
}
|
|
|
|
// Overdue reminders - 9:00 AM UTC daily
|
|
_, err = w.scheduler.Register("0 9 * * *", asynq.NewTask(TypeOverdueReminders, nil))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to register overdue reminders: %w", err)
|
|
}
|
|
|
|
// Daily notifications - 11:00 AM UTC daily
|
|
_, err = w.scheduler.Register("0 11 * * *", asynq.NewTask(TypeDailyNotifications, nil))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to register daily notifications: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Start starts the worker server and scheduler
|
|
func (w *WorkerServer) Start(mux *asynq.ServeMux) error {
|
|
// Start scheduler
|
|
if err := w.scheduler.Start(); err != nil {
|
|
return fmt.Errorf("failed to start scheduler: %w", err)
|
|
}
|
|
|
|
// Start server
|
|
if err := w.server.Start(mux); err != nil {
|
|
return fmt.Errorf("failed to start worker server: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the worker server
|
|
func (w *WorkerServer) Shutdown() {
|
|
w.scheduler.Shutdown()
|
|
w.server.Shutdown()
|
|
}
|