package jobs import ( "context" "encoding/json" "fmt" "time" "github.com/hibiken/asynq" "github.com/rs/zerolog/log" "gorm.io/gorm" "github.com/treytartt/casera-api/internal/config" "github.com/treytartt/casera-api/internal/models" "github.com/treytartt/casera-api/internal/push" "github.com/treytartt/casera-api/internal/services" ) // Task types const ( TypeTaskReminder = "notification:task_reminder" TypeOverdueReminder = "notification:overdue_reminder" TypeDailyDigest = "notification:daily_digest" TypeSendEmail = "email:send" TypeSendPush = "push:send" ) // Handler handles background job processing type Handler struct { db *gorm.DB pushClient *push.Client emailService *services.EmailService notificationService *services.NotificationService config *config.Config } // NewHandler creates a new job handler func NewHandler(db *gorm.DB, pushClient *push.Client, emailService *services.EmailService, notificationService *services.NotificationService, cfg *config.Config) *Handler { return &Handler{ db: db, pushClient: pushClient, emailService: emailService, notificationService: notificationService, config: cfg, } } // TaskReminderData represents a task due soon for reminder notifications type TaskReminderData struct { TaskID uint TaskTitle string DueDate time.Time UserID uint UserEmail string UserName string ResidenceName string } // HandleTaskReminder processes task reminder notifications for tasks due today or tomorrow with actionable buttons func (h *Handler) HandleTaskReminder(ctx context.Context, task *asynq.Task) error { log.Info().Msg("Processing task reminder notifications...") now := time.Now().UTC() currentHour := now.Hour() systemDefaultHour := h.config.Worker.TaskReminderHour today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) dayAfterTomorrow := today.AddDate(0, 0, 2) log.Info().Int("current_hour", currentHour).Int("system_default_hour", systemDefaultHour).Msg("Task reminder check") // Step 1: Find users who should receive notifications THIS hour // Users with custom hour matching current hour, OR users with no custom hour when current hour is system default var eligibleUserIDs []uint // Build query based on whether current hour is system default query := h.db.Model(&models.NotificationPreference{}). Select("user_id"). Where("task_due_soon = true") if currentHour == systemDefaultHour { // Current hour is the system default, so include users with custom hour OR no custom hour (NULL) query = query.Where("(task_due_soon_hour = ? OR task_due_soon_hour IS NULL)", currentHour) } else { // Current hour is not system default, so only include users with this specific custom hour query = query.Where("task_due_soon_hour = ?", currentHour) } err := query.Pluck("user_id", &eligibleUserIDs).Error if err != nil { log.Error().Err(err).Msg("Failed to query eligible users for task reminders") return err } // Early exit if no users need notifications this hour if len(eligibleUserIDs) == 0 { log.Debug().Int("hour", currentHour).Msg("No users scheduled for task reminder notifications this hour") return nil } log.Info().Int("eligible_users", len(eligibleUserIDs)).Msg("Found users eligible for task reminders this hour") // Step 2: Query tasks due today or tomorrow only for eligible users // A task is considered "completed" (and should be excluded) if: // - NextDueDate IS NULL AND it has at least one completion record // This matches the kanban categorization logic var dueSoonTasks []models.Task err = h.db.Preload("Status").Preload("Completions").Preload("Residence"). Where("(due_date >= ? AND due_date < ?) OR (next_due_date >= ? AND next_due_date < ?)", today, dayAfterTomorrow, today, dayAfterTomorrow). Where("is_cancelled = false"). Where("is_archived = false"). // Exclude completed tasks: tasks with no next_due_date AND at least one completion Where("NOT (next_due_date IS NULL AND EXISTS (SELECT 1 FROM task_taskcompletion tc WHERE tc.task_id = task_task.id))"). Where("(assigned_to_id IN ? OR residence_id IN (SELECT id FROM residence_residence WHERE owner_id IN ?))", eligibleUserIDs, eligibleUserIDs). Find(&dueSoonTasks).Error if err != nil { log.Error().Err(err).Msg("Failed to query tasks due soon") return err } log.Info().Int("count", len(dueSoonTasks)).Msg("Found tasks due today/tomorrow for eligible users") // Group tasks by user (assigned_to or residence owner) userTasks := make(map[uint][]models.Task) for _, t := range dueSoonTasks { var userID uint if t.AssignedToID != nil { userID = *t.AssignedToID } else if t.Residence.ID != 0 { userID = t.Residence.OwnerID } else { continue } // Only include if user is in eligible list for _, eligibleID := range eligibleUserIDs { if userID == eligibleID { userTasks[userID] = append(userTasks[userID], t) break } } } // Step 3: Send notifications (no need to check preferences again - already filtered) for userID, taskList := range userTasks { // Send individual actionable notification for each task (up to 5) maxNotifications := 5 if len(taskList) < maxNotifications { maxNotifications = len(taskList) } for i := 0; i < maxNotifications; i++ { t := taskList[i] if err := h.notificationService.CreateAndSendTaskNotification(ctx, userID, models.NotificationTaskDueSoon, &t); err != nil { log.Error().Err(err).Uint("user_id", userID).Uint("task_id", t.ID).Msg("Failed to send task reminder notification") } } // If more than 5 tasks, send a summary notification if len(taskList) > 5 { title := "More Tasks Due Soon" body := fmt.Sprintf("You have %d more tasks due soon", len(taskList)-5) if err := h.sendPushToUser(ctx, userID, title, body, map[string]string{ "type": "task_reminder_summary", }); err != nil { log.Error().Err(err).Uint("user_id", userID).Msg("Failed to send task reminder summary") } } } log.Info().Int("users_notified", len(userTasks)).Msg("Task reminder notifications completed") return nil } // HandleOverdueReminder processes overdue task notifications with actionable buttons func (h *Handler) HandleOverdueReminder(ctx context.Context, task *asynq.Task) error { log.Info().Msg("Processing overdue task notifications...") now := time.Now().UTC() currentHour := now.Hour() systemDefaultHour := h.config.Worker.OverdueReminderHour today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) log.Info().Int("current_hour", currentHour).Int("system_default_hour", systemDefaultHour).Msg("Overdue reminder check") // Step 1: Find users who should receive notifications THIS hour // Users with custom hour matching current hour, OR users with no custom hour when current hour is system default var eligibleUserIDs []uint // Build query based on whether current hour is system default query := h.db.Model(&models.NotificationPreference{}). Select("user_id"). Where("task_overdue = true") if currentHour == systemDefaultHour { // Current hour is the system default, so include users with custom hour OR no custom hour (NULL) query = query.Where("(task_overdue_hour = ? OR task_overdue_hour IS NULL)", currentHour) } else { // Current hour is not system default, so only include users with this specific custom hour query = query.Where("task_overdue_hour = ?", currentHour) } err := query.Pluck("user_id", &eligibleUserIDs).Error if err != nil { log.Error().Err(err).Msg("Failed to query eligible users for overdue reminders") return err } // Early exit if no users need notifications this hour if len(eligibleUserIDs) == 0 { log.Debug().Int("hour", currentHour).Msg("No users scheduled for overdue notifications this hour") return nil } log.Info().Int("eligible_users", len(eligibleUserIDs)).Msg("Found users eligible for overdue reminders this hour") // Step 2: Query overdue tasks only for eligible users // A task is considered "completed" (and should be excluded) if: // - NextDueDate IS NULL AND it has at least one completion record // This matches the kanban categorization logic var overdueTasks []models.Task err = h.db.Preload("Status").Preload("Completions").Preload("Residence"). Where("due_date < ? OR next_due_date < ?", today, today). Where("is_cancelled = false"). Where("is_archived = false"). // Exclude completed tasks: tasks with no next_due_date AND at least one completion Where("NOT (next_due_date IS NULL AND EXISTS (SELECT 1 FROM task_taskcompletion tc WHERE tc.task_id = task_task.id))"). Where("(assigned_to_id IN ? OR residence_id IN (SELECT id FROM residence_residence WHERE owner_id IN ?))", eligibleUserIDs, eligibleUserIDs). Find(&overdueTasks).Error if err != nil { log.Error().Err(err).Msg("Failed to query overdue tasks") return err } log.Info().Int("count", len(overdueTasks)).Msg("Found overdue tasks for eligible users") // Group tasks by user (assigned_to or residence owner) userTasks := make(map[uint][]models.Task) for _, t := range overdueTasks { var userID uint if t.AssignedToID != nil { userID = *t.AssignedToID } else if t.Residence.ID != 0 { userID = t.Residence.OwnerID } else { continue } // Only include if user is in eligible list for _, eligibleID := range eligibleUserIDs { if userID == eligibleID { userTasks[userID] = append(userTasks[userID], t) break } } } // Step 3: Send notifications (no need to check preferences again - already filtered) for userID, taskList := range userTasks { // Send individual actionable notification for each task (up to 5) maxNotifications := 5 if len(taskList) < maxNotifications { maxNotifications = len(taskList) } for i := 0; i < maxNotifications; i++ { t := taskList[i] if err := h.notificationService.CreateAndSendTaskNotification(ctx, userID, models.NotificationTaskOverdue, &t); err != nil { log.Error().Err(err).Uint("user_id", userID).Uint("task_id", t.ID).Msg("Failed to send overdue notification") } } // If more than 5 tasks, send a summary notification if len(taskList) > 5 { title := "More Overdue Tasks" body := fmt.Sprintf("You have %d more overdue tasks that need attention", len(taskList)-5) if err := h.sendPushToUser(ctx, userID, title, body, map[string]string{ "type": "overdue_summary", }); err != nil { log.Error().Err(err).Uint("user_id", userID).Msg("Failed to send overdue summary") } } } log.Info().Int("users_notified", len(userTasks)).Msg("Overdue task notifications completed") return nil } // HandleDailyDigest processes daily digest notifications with task statistics func (h *Handler) HandleDailyDigest(ctx context.Context, task *asynq.Task) error { log.Info().Msg("Processing daily digest notifications...") now := time.Now().UTC() today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) nextWeek := today.AddDate(0, 0, 7) // Get all users with their task statistics var userStats []struct { UserID uint TotalTasks int OverdueTasks int DueThisWeek int } err := h.db.Raw(` SELECT u.id as user_id, COUNT(DISTINCT t.id) as total_tasks, COUNT(DISTINCT CASE WHEN t.due_date < ? AND tc.id IS NULL THEN t.id END) as overdue_tasks, COUNT(DISTINCT CASE WHEN t.due_date >= ? AND t.due_date < ? AND tc.id IS NULL THEN t.id END) as due_this_week FROM auth_user u JOIN residence_residence r ON r.owner_id = u.id OR r.id IN ( SELECT residence_id FROM residence_residence_users WHERE user_id = u.id ) JOIN task_task t ON t.residence_id = r.id AND t.is_cancelled = false AND t.is_archived = false LEFT JOIN task_taskcompletion tc ON t.id = tc.task_id WHERE u.is_active = true GROUP BY u.id HAVING COUNT(DISTINCT t.id) > 0 `, today, today, nextWeek).Scan(&userStats).Error if err != nil { log.Error().Err(err).Msg("Failed to query user task statistics") return err } log.Info().Int("users", len(userStats)).Msg("Processing daily digest for users") for _, stats := range userStats { // Skip users with no actionable items if stats.OverdueTasks == 0 && stats.DueThisWeek == 0 { continue } // Build notification message title := "Daily Task Summary" var body string if stats.OverdueTasks > 0 && stats.DueThisWeek > 0 { body = fmt.Sprintf("You have %d overdue task(s) and %d task(s) due this week", stats.OverdueTasks, stats.DueThisWeek) } else if stats.OverdueTasks > 0 { body = fmt.Sprintf("You have %d overdue task(s) that need attention", stats.OverdueTasks) } else { body = fmt.Sprintf("You have %d task(s) due this week", stats.DueThisWeek) } // Send push notification if err := h.sendPushToUser(ctx, stats.UserID, title, body, map[string]string{ "type": "daily_digest", "overdue": fmt.Sprintf("%d", stats.OverdueTasks), "due_this_week": fmt.Sprintf("%d", stats.DueThisWeek), }); err != nil { log.Error().Err(err).Uint("user_id", stats.UserID).Msg("Failed to send daily digest push") } } log.Info().Msg("Daily digest notifications completed") return nil } // EmailPayload represents the payload for email tasks type EmailPayload struct { To string `json:"to"` Subject string `json:"subject"` HTMLBody string `json:"html_body"` TextBody string `json:"text_body"` } // HandleSendEmail processes email sending tasks func (h *Handler) HandleSendEmail(ctx context.Context, task *asynq.Task) error { var payload EmailPayload if err := json.Unmarshal(task.Payload(), &payload); err != nil { return fmt.Errorf("failed to unmarshal payload: %w", err) } log.Info(). Str("to", payload.To). Str("subject", payload.Subject). Msg("Sending email...") if h.emailService == nil { log.Warn().Msg("Email service not configured, skipping email") return nil } // Use the email service to send the email if err := h.emailService.SendEmail(payload.To, payload.Subject, payload.HTMLBody, payload.TextBody); err != nil { log.Error().Err(err).Str("to", payload.To).Msg("Failed to send email") return err } log.Info().Str("to", payload.To).Msg("Email sent successfully") return nil } // PushPayload represents the payload for push notification tasks type PushPayload struct { UserID uint `json:"user_id"` Title string `json:"title"` Message string `json:"message"` Data map[string]string `json:"data,omitempty"` } // HandleSendPush processes push notification tasks func (h *Handler) HandleSendPush(ctx context.Context, task *asynq.Task) error { var payload PushPayload if err := json.Unmarshal(task.Payload(), &payload); err != nil { return fmt.Errorf("failed to unmarshal payload: %w", err) } log.Info(). Uint("user_id", payload.UserID). Str("title", payload.Title). Msg("Sending push notification...") if err := h.sendPushToUser(ctx, payload.UserID, payload.Title, payload.Message, payload.Data); err != nil { log.Error().Err(err).Uint("user_id", payload.UserID).Msg("Failed to send push notification") return err } log.Info().Uint("user_id", payload.UserID).Msg("Push notification sent successfully") return nil } // sendPushToUser sends a push notification to all of a user's devices func (h *Handler) sendPushToUser(ctx context.Context, userID uint, title, message string, data map[string]string) error { if h.pushClient == nil { log.Warn().Msg("Push client not configured, skipping notification") return nil } // Get iOS device tokens var iosTokens []string err := h.db.Model(&models.APNSDevice{}). Where("user_id = ? AND active = ?", userID, true). Pluck("registration_id", &iosTokens).Error if err != nil { log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get iOS tokens") } // Get Android device tokens var androidTokens []string err = h.db.Model(&models.GCMDevice{}). Where("user_id = ? AND active = ?", userID, true). Pluck("registration_id", &androidTokens).Error if err != nil { log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get Android tokens") } if len(iosTokens) == 0 && len(androidTokens) == 0 { log.Debug().Uint("user_id", userID).Msg("No device tokens found for user") return nil } log.Debug(). Uint("user_id", userID). Int("ios_tokens", len(iosTokens)). Int("android_tokens", len(androidTokens)). Msg("Sending push to user devices") // Send to all devices return h.pushClient.SendToAll(ctx, iosTokens, androidTokens, title, message, data) } // NewSendEmailTask creates a new email sending task func NewSendEmailTask(to, subject, htmlBody, textBody string) (*asynq.Task, error) { payload, err := json.Marshal(EmailPayload{ To: to, Subject: subject, HTMLBody: htmlBody, TextBody: textBody, }) if err != nil { return nil, err } return asynq.NewTask(TypeSendEmail, payload), nil } // NewSendPushTask creates a new push notification task func NewSendPushTask(userID uint, title, message string, data map[string]string) (*asynq.Task, error) { payload, err := json.Marshal(PushPayload{ UserID: userID, Title: title, Message: message, Data: data, }) if err != nil { return nil, err } return asynq.NewTask(TypeSendPush, payload), nil }