Fix 113 hardening issues across entire Go backend
Security: - Replace all binding: tags with validate: + c.Validate() in admin handlers - Add rate limiting to auth endpoints (login, register, password reset) - Add security headers (HSTS, XSS protection, nosniff, frame options) - Wire Google Pub/Sub token verification into webhook handler - Replace ParseUnverified with proper OIDC/JWKS key verification - Verify inner Apple JWS signatures in webhook handler - Add io.LimitReader (1MB) to all webhook body reads - Add ownership verification to file deletion - Move hardcoded admin credentials to env vars - Add uniqueIndex to User.Email - Hide ConfirmationCode from JSON serialization - Mask confirmation codes in admin responses - Use http.DetectContentType for upload validation - Fix path traversal in storage service - Replace os.Getenv with Viper in stripe service - Sanitize Redis URLs before logging - Separate DEBUG_FIXED_CODES from DEBUG flag - Reject weak SECRET_KEY in production - Add host check on /_next/* proxy routes - Use explicit localhost CORS origins in debug mode - Replace err.Error() with generic messages in all admin error responses Critical fixes: - Rewrite FCM to HTTP v1 API with OAuth 2.0 service account auth - Fix user_customuser -> auth_user table names in raw SQL - Fix dashboard verified query to use UserProfile model - Add escapeLikeWildcards() to prevent SQL wildcard injection Bug fixes: - Add bounds checks for days/expiring_soon query params (1-3650) - Add receipt_data/transaction_id empty-check to RestoreSubscription - Change Active bool -> *bool in device handler - Check all unchecked GORM/FindByIDWithProfile errors - Add validation for notification hour fields (0-23) - Add max=10000 validation on task description updates Transactions & data integrity: - Wrap registration flow in transaction - Wrap QuickComplete in transaction - Move image creation inside completion transaction - Wrap SetSpecialties in transaction - Wrap GetOrCreateToken in transaction - Wrap completion+image deletion in transaction Performance: - Batch completion summaries (2 queries vs 2N) - Reuse single http.Client in IAP validation - Cache dashboard counts (30s TTL) - Batch COUNT queries in admin user list - Add Limit(500) to document queries - Add reminder_stage+due_date filters to reminder queries - Parse AllowedTypes once at init - In-memory user cache in auth middleware (30s TTL) - Timezone change detection cache - Optimize P95 with per-endpoint sorted buffers - Replace crypto/md5 with hash/fnv for ETags Code quality: - Add sync.Once to all monitoring Stop()/Close() methods - Replace 8 fmt.Printf with zerolog in auth service - Log previously discarded errors - Standardize delete response shapes - Route hardcoded English through i18n - Remove FileURL from DocumentResponse (keep MediaURL only) - Thread user timezone through kanban board responses - Initialize empty slices to prevent null JSON - Extract shared field map for task Update/UpdateTx - Delete unused SoftDeleteModel, min(), formatCron, legacy handlers Worker & jobs: - Wire Asynq email infrastructure into worker - Register HandleReminderLogCleanup with daily 3AM cron - Use per-user timezone in HandleSmartReminder - Replace direct DB queries with repository calls - Delete legacy reminder handlers (~200 lines) - Delete unused task type constants Dependencies: - Replace archived jung-kurt/gofpdf with go-pdf/fpdf - Replace unmaintained gomail.v2 with wneessen/go-mail - Add TODO for Echo jwt v3 transitive dep removal Test infrastructure: - Fix MakeRequest/SeedLookupData error handling - Replace os.Exit(0) with t.Skip() in scope/consistency tests - Add 11 new FCM v1 tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -20,8 +20,6 @@ import (
|
||||
|
||||
// Task types
|
||||
const (
|
||||
TypeTaskReminder = "notification:task_reminder"
|
||||
TypeOverdueReminder = "notification:overdue_reminder"
|
||||
TypeSmartReminder = "notification:smart_reminder" // Frequency-aware reminders
|
||||
TypeDailyDigest = "notification:daily_digest"
|
||||
TypeSendEmail = "email:send"
|
||||
@@ -36,6 +34,7 @@ type Handler struct {
|
||||
taskRepo *repositories.TaskRepository
|
||||
residenceRepo *repositories.ResidenceRepository
|
||||
reminderRepo *repositories.ReminderRepository
|
||||
notificationRepo *repositories.NotificationRepository
|
||||
pushClient *push.Client
|
||||
emailService *services.EmailService
|
||||
notificationService *services.NotificationService
|
||||
@@ -56,6 +55,7 @@ func NewHandler(db *gorm.DB, pushClient *push.Client, emailService *services.Ema
|
||||
taskRepo: repositories.NewTaskRepository(db),
|
||||
residenceRepo: repositories.NewResidenceRepository(db),
|
||||
reminderRepo: repositories.NewReminderRepository(db),
|
||||
notificationRepo: repositories.NewNotificationRepository(db),
|
||||
pushClient: pushClient,
|
||||
emailService: emailService,
|
||||
notificationService: notificationService,
|
||||
@@ -64,218 +64,6 @@ func NewHandler(db *gorm.DB, pushClient *push.Client, emailService *services.Ema
|
||||
}
|
||||
}
|
||||
|
||||
// TaskReminderData represents a task due soon for reminder notifications
|
||||
type TaskReminderData struct {
|
||||
TaskID uint
|
||||
TaskTitle string
|
||||
DueDate time.Time
|
||||
UserID uint
|
||||
UserEmail string
|
||||
UserName string
|
||||
ResidenceName string
|
||||
}
|
||||
|
||||
// HandleTaskReminder processes task reminder notifications for tasks due today or tomorrow with actionable buttons
|
||||
func (h *Handler) HandleTaskReminder(ctx context.Context, task *asynq.Task) error {
|
||||
log.Info().Msg("Processing task reminder notifications...")
|
||||
|
||||
now := time.Now().UTC()
|
||||
currentHour := now.Hour()
|
||||
systemDefaultHour := h.config.Worker.TaskReminderHour
|
||||
|
||||
log.Info().Int("current_hour", currentHour).Int("system_default_hour", systemDefaultHour).Msg("Task reminder check")
|
||||
|
||||
// Step 1: Find users who should receive notifications THIS hour
|
||||
// Logic: Each user gets notified ONCE per day at exactly ONE hour:
|
||||
// - If user has custom hour set: notify ONLY at that custom hour
|
||||
// - If user has NO custom hour (NULL): notify ONLY at system default hour
|
||||
// This prevents duplicates: a user with custom hour is NEVER notified at default hour
|
||||
var eligibleUserIDs []uint
|
||||
|
||||
query := h.db.Model(&models.NotificationPreference{}).
|
||||
Select("user_id").
|
||||
Where("task_due_soon = true")
|
||||
|
||||
if currentHour == systemDefaultHour {
|
||||
// At system default hour: notify users who have NO custom hour (NULL) OR whose custom hour equals default
|
||||
query = query.Where("task_due_soon_hour IS NULL OR task_due_soon_hour = ?", currentHour)
|
||||
} else {
|
||||
// At non-default hour: only notify users who have this specific custom hour set
|
||||
// Exclude users with NULL (they get notified at default hour only)
|
||||
query = query.Where("task_due_soon_hour = ?", currentHour)
|
||||
}
|
||||
|
||||
err := query.Pluck("user_id", &eligibleUserIDs).Error
|
||||
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to query eligible users for task reminders")
|
||||
return err
|
||||
}
|
||||
|
||||
// Early exit if no users need notifications this hour
|
||||
if len(eligibleUserIDs) == 0 {
|
||||
log.Debug().Int("hour", currentHour).Msg("No users scheduled for task reminder notifications this hour")
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Info().Int("eligible_users", len(eligibleUserIDs)).Msg("Found users eligible for task reminders this hour")
|
||||
|
||||
// Step 2: Query tasks due today or tomorrow using the single-purpose repository function
|
||||
// Uses the same scopes as kanban for consistency, with IncludeInProgress=true
|
||||
// so users still get notified about in-progress tasks that are due soon.
|
||||
opts := repositories.TaskFilterOptions{
|
||||
UserIDs: eligibleUserIDs,
|
||||
IncludeInProgress: true, // Notifications should include in-progress tasks
|
||||
PreloadResidence: true,
|
||||
PreloadCompletions: true,
|
||||
}
|
||||
|
||||
// Due soon = due within 2 days (today and tomorrow)
|
||||
dueSoonTasks, err := h.taskRepo.GetDueSoonTasks(now, 2, opts)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to query tasks due soon")
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info().Int("count", len(dueSoonTasks)).Msg("Found tasks due today/tomorrow for eligible users")
|
||||
|
||||
// Build set for O(1) eligibility lookups instead of O(N) linear scan
|
||||
eligibleSet := make(map[uint]bool, len(eligibleUserIDs))
|
||||
for _, id := range eligibleUserIDs {
|
||||
eligibleSet[id] = true
|
||||
}
|
||||
|
||||
// Group tasks by user (assigned_to or residence owner)
|
||||
userTasks := make(map[uint][]models.Task)
|
||||
for _, t := range dueSoonTasks {
|
||||
var userID uint
|
||||
if t.AssignedToID != nil {
|
||||
userID = *t.AssignedToID
|
||||
} else if t.Residence.ID != 0 {
|
||||
userID = t.Residence.OwnerID
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
// Only include if user is in eligible set (O(1) lookup)
|
||||
if eligibleSet[userID] {
|
||||
userTasks[userID] = append(userTasks[userID], t)
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3: Send notifications (no need to check preferences again - already filtered)
|
||||
// Send individual task-specific notification for each task (all tasks, no limit)
|
||||
for userID, taskList := range userTasks {
|
||||
for _, t := range taskList {
|
||||
if err := h.notificationService.CreateAndSendTaskNotification(ctx, userID, models.NotificationTaskDueSoon, &t); err != nil {
|
||||
log.Error().Err(err).Uint("user_id", userID).Uint("task_id", t.ID).Msg("Failed to send task reminder notification")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Info().Int("users_notified", len(userTasks)).Msg("Task reminder notifications completed")
|
||||
return nil
|
||||
}
|
||||
|
||||
// HandleOverdueReminder processes overdue task notifications with actionable buttons
|
||||
func (h *Handler) HandleOverdueReminder(ctx context.Context, task *asynq.Task) error {
|
||||
log.Info().Msg("Processing overdue task notifications...")
|
||||
|
||||
now := time.Now().UTC()
|
||||
currentHour := now.Hour()
|
||||
systemDefaultHour := h.config.Worker.OverdueReminderHour
|
||||
|
||||
log.Info().Int("current_hour", currentHour).Int("system_default_hour", systemDefaultHour).Msg("Overdue reminder check")
|
||||
|
||||
// Step 1: Find users who should receive notifications THIS hour
|
||||
// Logic: Each user gets notified ONCE per day at exactly ONE hour:
|
||||
// - If user has custom hour set: notify ONLY at that custom hour
|
||||
// - If user has NO custom hour (NULL): notify ONLY at system default hour
|
||||
// This prevents duplicates: a user with custom hour is NEVER notified at default hour
|
||||
var eligibleUserIDs []uint
|
||||
|
||||
query := h.db.Model(&models.NotificationPreference{}).
|
||||
Select("user_id").
|
||||
Where("task_overdue = true")
|
||||
|
||||
if currentHour == systemDefaultHour {
|
||||
// At system default hour: notify users who have NO custom hour (NULL) OR whose custom hour equals default
|
||||
query = query.Where("task_overdue_hour IS NULL OR task_overdue_hour = ?", currentHour)
|
||||
} else {
|
||||
// At non-default hour: only notify users who have this specific custom hour set
|
||||
// Exclude users with NULL (they get notified at default hour only)
|
||||
query = query.Where("task_overdue_hour = ?", currentHour)
|
||||
}
|
||||
|
||||
err := query.Pluck("user_id", &eligibleUserIDs).Error
|
||||
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to query eligible users for overdue reminders")
|
||||
return err
|
||||
}
|
||||
|
||||
// Early exit if no users need notifications this hour
|
||||
if len(eligibleUserIDs) == 0 {
|
||||
log.Debug().Int("hour", currentHour).Msg("No users scheduled for overdue notifications this hour")
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Info().Int("eligible_users", len(eligibleUserIDs)).Msg("Found users eligible for overdue reminders this hour")
|
||||
|
||||
// Step 2: Query overdue tasks using the single-purpose repository function
|
||||
// Uses the same scopes as kanban for consistency, with IncludeInProgress=true
|
||||
// so users still get notified about in-progress tasks that are overdue.
|
||||
opts := repositories.TaskFilterOptions{
|
||||
UserIDs: eligibleUserIDs,
|
||||
IncludeInProgress: true, // Notifications should include in-progress tasks
|
||||
PreloadResidence: true,
|
||||
PreloadCompletions: true,
|
||||
}
|
||||
|
||||
overdueTasks, err := h.taskRepo.GetOverdueTasks(now, opts)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to query overdue tasks")
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info().Int("count", len(overdueTasks)).Msg("Found overdue tasks for eligible users")
|
||||
|
||||
// Build set for O(1) eligibility lookups instead of O(N) linear scan
|
||||
eligibleSet := make(map[uint]bool, len(eligibleUserIDs))
|
||||
for _, id := range eligibleUserIDs {
|
||||
eligibleSet[id] = true
|
||||
}
|
||||
|
||||
// Group tasks by user (assigned_to or residence owner)
|
||||
userTasks := make(map[uint][]models.Task)
|
||||
for _, t := range overdueTasks {
|
||||
var userID uint
|
||||
if t.AssignedToID != nil {
|
||||
userID = *t.AssignedToID
|
||||
} else if t.Residence.ID != 0 {
|
||||
userID = t.Residence.OwnerID
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
// Only include if user is in eligible set (O(1) lookup)
|
||||
if eligibleSet[userID] {
|
||||
userTasks[userID] = append(userTasks[userID], t)
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3: Send notifications (no need to check preferences again - already filtered)
|
||||
// Send individual task-specific notification for each task (all tasks, no limit)
|
||||
for userID, taskList := range userTasks {
|
||||
for _, t := range taskList {
|
||||
if err := h.notificationService.CreateAndSendTaskNotification(ctx, userID, models.NotificationTaskOverdue, &t); err != nil {
|
||||
log.Error().Err(err).Uint("user_id", userID).Uint("task_id", t.ID).Msg("Failed to send overdue notification")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Info().Int("users_notified", len(userTasks)).Msg("Overdue task notifications completed")
|
||||
return nil
|
||||
}
|
||||
|
||||
// HandleDailyDigest processes daily digest notifications with task statistics
|
||||
func (h *Handler) HandleDailyDigest(ctx context.Context, task *asynq.Task) error {
|
||||
log.Info().Msg("Processing daily digest notifications...")
|
||||
@@ -328,8 +116,7 @@ func (h *Handler) HandleDailyDigest(ctx context.Context, task *asynq.Task) error
|
||||
// Get user's timezone from notification preferences for accurate overdue calculation
|
||||
// This ensures the daily digest matches what the user sees in the kanban UI
|
||||
var userNow time.Time
|
||||
var prefs models.NotificationPreference
|
||||
if err := h.db.Where("user_id = ?", userID).First(&prefs).Error; err == nil && prefs.Timezone != nil {
|
||||
if prefs, err := h.notificationRepo.FindPreferencesByUser(userID); err == nil && prefs.Timezone != nil {
|
||||
if loc, err := time.LoadLocation(*prefs.Timezone); err == nil {
|
||||
// Use start of day in user's timezone (matches kanban behavior)
|
||||
userNowInTz := time.Now().In(loc)
|
||||
@@ -481,22 +268,11 @@ func (h *Handler) sendPushToUser(ctx context.Context, userID uint, title, messag
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get iOS device tokens
|
||||
var iosTokens []string
|
||||
err := h.db.Model(&models.APNSDevice{}).
|
||||
Where("user_id = ? AND active = ?", userID, true).
|
||||
Pluck("registration_id", &iosTokens).Error
|
||||
// Get active device tokens via repository
|
||||
iosTokens, androidTokens, err := h.notificationRepo.GetActiveTokensForUser(userID)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get iOS tokens")
|
||||
}
|
||||
|
||||
// Get Android device tokens
|
||||
var androidTokens []string
|
||||
err = h.db.Model(&models.GCMDevice{}).
|
||||
Where("user_id = ? AND active = ?", userID, true).
|
||||
Pluck("registration_id", &androidTokens).Error
|
||||
if err != nil {
|
||||
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get Android tokens")
|
||||
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get device tokens")
|
||||
return err
|
||||
}
|
||||
|
||||
if len(iosTokens) == 0 && len(androidTokens) == 0 {
|
||||
@@ -561,18 +337,17 @@ func (h *Handler) HandleOnboardingEmails(ctx context.Context, task *asynq.Task)
|
||||
}
|
||||
|
||||
// Send no-residence emails (users without any residences after 2 days)
|
||||
noResCount, err := h.onboardingService.CheckAndSendNoResidenceEmails()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to process no-residence onboarding emails")
|
||||
// Continue to next type, don't return error
|
||||
noResCount, noResErr := h.onboardingService.CheckAndSendNoResidenceEmails()
|
||||
if noResErr != nil {
|
||||
log.Error().Err(noResErr).Msg("Failed to process no-residence onboarding emails")
|
||||
} else {
|
||||
log.Info().Int("count", noResCount).Msg("Sent no-residence onboarding emails")
|
||||
}
|
||||
|
||||
// Send no-tasks emails (users with residence but no tasks after 5 days)
|
||||
noTasksCount, err := h.onboardingService.CheckAndSendNoTasksEmails()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to process no-tasks onboarding emails")
|
||||
noTasksCount, noTasksErr := h.onboardingService.CheckAndSendNoTasksEmails()
|
||||
if noTasksErr != nil {
|
||||
log.Error().Err(noTasksErr).Msg("Failed to process no-tasks onboarding emails")
|
||||
} else {
|
||||
log.Info().Int("count", noTasksCount).Msg("Sent no-tasks onboarding emails")
|
||||
}
|
||||
@@ -582,6 +357,11 @@ func (h *Handler) HandleOnboardingEmails(ctx context.Context, task *asynq.Task)
|
||||
Int("no_tasks_sent", noTasksCount).
|
||||
Msg("Onboarding email processing completed")
|
||||
|
||||
// If all sub-tasks failed, return an error so Asynq retries
|
||||
if noResErr != nil && noTasksErr != nil {
|
||||
return fmt.Errorf("all onboarding email sub-tasks failed: no-residence: %w, no-tasks: %v", noResErr, noTasksErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -603,7 +383,6 @@ func (h *Handler) HandleSmartReminder(ctx context.Context, task *asynq.Task) err
|
||||
log.Info().Msg("Processing smart task reminders...")
|
||||
|
||||
now := time.Now().UTC()
|
||||
today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
|
||||
currentHour := now.Hour()
|
||||
|
||||
dueSoonDefault := h.config.Worker.TaskReminderHour
|
||||
@@ -673,6 +452,22 @@ func (h *Handler) HandleSmartReminder(ctx context.Context, task *asynq.Task) err
|
||||
Int("want_overdue", len(userWantsOverdue)).
|
||||
Msg("Found users eligible for reminders")
|
||||
|
||||
// Build per-user "today" using their timezone preference
|
||||
// This ensures reminder stage calculations (overdue, due-soon) match the user's local date
|
||||
userToday := make(map[uint]time.Time, len(allUserIDs))
|
||||
utcToday := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
|
||||
for _, uid := range allUserIDs {
|
||||
prefs, err := h.notificationRepo.FindPreferencesByUser(uid)
|
||||
if err == nil && prefs.Timezone != nil {
|
||||
if loc, locErr := time.LoadLocation(*prefs.Timezone); locErr == nil {
|
||||
userNowInTz := time.Now().In(loc)
|
||||
userToday[uid] = time.Date(userNowInTz.Year(), userNowInTz.Month(), userNowInTz.Day(), 0, 0, 0, 0, loc)
|
||||
continue
|
||||
}
|
||||
}
|
||||
userToday[uid] = utcToday // Fallback to UTC
|
||||
}
|
||||
|
||||
// Step 2: Single query to get ALL active tasks (both due-soon and overdue) for these users
|
||||
opts := repositories.TaskFilterOptions{
|
||||
UserIDs: allUserIDs,
|
||||
@@ -734,7 +529,8 @@ func (h *Handler) HandleSmartReminder(ctx context.Context, task *asynq.Task) err
|
||||
frequencyDays = &days
|
||||
}
|
||||
|
||||
// Determine which reminder stage applies today
|
||||
// Determine which reminder stage applies today using the user's local date
|
||||
today := userToday[userID]
|
||||
stage := notifications.GetReminderStageForToday(effectiveDate, frequencyDays, today)
|
||||
if stage == "" {
|
||||
continue
|
||||
@@ -800,6 +596,11 @@ func (h *Handler) HandleSmartReminder(ctx context.Context, task *asynq.Task) err
|
||||
notificationType = models.NotificationTaskDueSoon
|
||||
}
|
||||
|
||||
// Log the reminder BEFORE sending so we have a record even if the send crashes
|
||||
if _, err := h.reminderRepo.LogReminder(t.ID, c.userID, c.effectiveDate, c.reminderStage, nil); err != nil {
|
||||
log.Error().Err(err).Uint("task_id", t.ID).Str("stage", c.stage).Msg("Failed to log reminder")
|
||||
}
|
||||
|
||||
// Send notification
|
||||
if err := h.notificationService.CreateAndSendTaskNotification(ctx, c.userID, notificationType, &t); err != nil {
|
||||
log.Error().Err(err).
|
||||
@@ -810,11 +611,6 @@ func (h *Handler) HandleSmartReminder(ctx context.Context, task *asynq.Task) err
|
||||
continue
|
||||
}
|
||||
|
||||
// Log the reminder
|
||||
if _, err := h.reminderRepo.LogReminder(t.ID, c.userID, c.effectiveDate, c.reminderStage, nil); err != nil {
|
||||
log.Error().Err(err).Uint("task_id", t.ID).Str("stage", c.stage).Msg("Failed to log reminder")
|
||||
}
|
||||
|
||||
if c.isOverdue {
|
||||
overdueSent++
|
||||
} else {
|
||||
|
||||
@@ -1,27 +1,18 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// Task types
|
||||
// Task types for email jobs
|
||||
const (
|
||||
TypeWelcomeEmail = "email:welcome"
|
||||
TypeVerificationEmail = "email:verification"
|
||||
TypePasswordResetEmail = "email:password_reset"
|
||||
TypePasswordChangedEmail = "email:password_changed"
|
||||
TypeTaskCompletionEmail = "email:task_completion"
|
||||
TypeGeneratePDFReport = "pdf:generate_report"
|
||||
TypeUpdateContractorRating = "contractor:update_rating"
|
||||
TypeDailyNotifications = "notifications:daily"
|
||||
TypeTaskReminders = "notifications:task_reminders"
|
||||
TypeOverdueReminders = "notifications:overdue_reminders"
|
||||
TypeWelcomeEmail = "email:welcome"
|
||||
TypeVerificationEmail = "email:verification"
|
||||
TypePasswordResetEmail = "email:password_reset"
|
||||
TypePasswordChangedEmail = "email:password_changed"
|
||||
)
|
||||
|
||||
// EmailPayload is the base payload for email tasks
|
||||
@@ -146,94 +137,3 @@ func (c *TaskClient) EnqueuePasswordChangedEmail(to, firstName string) error {
|
||||
log.Debug().Str("to", to).Msg("Password changed email task enqueued")
|
||||
return nil
|
||||
}
|
||||
|
||||
// WorkerServer manages the asynq worker server
|
||||
type WorkerServer struct {
|
||||
server *asynq.Server
|
||||
scheduler *asynq.Scheduler
|
||||
}
|
||||
|
||||
// NewWorkerServer creates a new worker server
|
||||
func NewWorkerServer(redisAddr string, concurrency int) *WorkerServer {
|
||||
srv := asynq.NewServer(
|
||||
asynq.RedisClientOpt{Addr: redisAddr},
|
||||
asynq.Config{
|
||||
Concurrency: concurrency,
|
||||
Queues: map[string]int{
|
||||
"critical": 6,
|
||||
"default": 3,
|
||||
"low": 1,
|
||||
},
|
||||
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
|
||||
log.Error().
|
||||
Err(err).
|
||||
Str("type", task.Type()).
|
||||
Bytes("payload", task.Payload()).
|
||||
Msg("Task failed")
|
||||
}),
|
||||
},
|
||||
)
|
||||
|
||||
// Create scheduler for periodic tasks
|
||||
loc, _ := time.LoadLocation("UTC")
|
||||
scheduler := asynq.NewScheduler(
|
||||
asynq.RedisClientOpt{Addr: redisAddr},
|
||||
&asynq.SchedulerOpts{
|
||||
Location: loc,
|
||||
},
|
||||
)
|
||||
|
||||
return &WorkerServer{
|
||||
server: srv,
|
||||
scheduler: scheduler,
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterHandlers registers task handlers
|
||||
func (w *WorkerServer) RegisterHandlers(mux *asynq.ServeMux) {
|
||||
// Handlers will be registered by the main worker process
|
||||
}
|
||||
|
||||
// RegisterScheduledTasks registers periodic tasks
|
||||
func (w *WorkerServer) RegisterScheduledTasks() error {
|
||||
// Task reminders - 8:00 PM UTC daily
|
||||
_, err := w.scheduler.Register("0 20 * * *", asynq.NewTask(TypeTaskReminders, nil))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to register task reminders: %w", err)
|
||||
}
|
||||
|
||||
// Overdue reminders - 9:00 AM UTC daily
|
||||
_, err = w.scheduler.Register("0 9 * * *", asynq.NewTask(TypeOverdueReminders, nil))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to register overdue reminders: %w", err)
|
||||
}
|
||||
|
||||
// Daily notifications - 11:00 AM UTC daily
|
||||
_, err = w.scheduler.Register("0 11 * * *", asynq.NewTask(TypeDailyNotifications, nil))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to register daily notifications: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start starts the worker server and scheduler
|
||||
func (w *WorkerServer) Start(mux *asynq.ServeMux) error {
|
||||
// Start scheduler
|
||||
if err := w.scheduler.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start scheduler: %w", err)
|
||||
}
|
||||
|
||||
// Start server
|
||||
if err := w.server.Start(mux); err != nil {
|
||||
return fmt.Errorf("failed to start worker server: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the worker server
|
||||
func (w *WorkerServer) Shutdown() {
|
||||
w.scheduler.Shutdown()
|
||||
w.server.Shutdown()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user