Files
honeyDueAPI/internal/worker/jobs/handler.go
Trey t 12eac24632 Remove remaining status_id references after in_progress migration
- Remove Preload("Status") from worker handler and repositories
- Update seeds to use in_progress boolean instead of status_id
- Remove task_taskstatus table creation from lookup seeds
- Update documentation to reflect in_progress boolean pattern

Fixes notification worker error:
"Status: unsupported relations for schema Task"

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-08 22:43:53 -06:00

564 lines
20 KiB
Go

package jobs
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/hibiken/asynq"
"github.com/rs/zerolog/log"
"gorm.io/gorm"
"github.com/treytartt/casera-api/internal/config"
"github.com/treytartt/casera-api/internal/models"
"github.com/treytartt/casera-api/internal/push"
"github.com/treytartt/casera-api/internal/services"
)
// Task types
const (
TypeTaskReminder = "notification:task_reminder"
TypeOverdueReminder = "notification:overdue_reminder"
TypeDailyDigest = "notification:daily_digest"
TypeSendEmail = "email:send"
TypeSendPush = "push:send"
TypeOnboardingEmails = "email:onboarding"
)
// Handler handles background job processing
type Handler struct {
db *gorm.DB
pushClient *push.Client
emailService *services.EmailService
notificationService *services.NotificationService
onboardingService *services.OnboardingEmailService
config *config.Config
}
// NewHandler creates a new job handler
func NewHandler(db *gorm.DB, pushClient *push.Client, emailService *services.EmailService, notificationService *services.NotificationService, cfg *config.Config) *Handler {
// Create onboarding email service
var onboardingService *services.OnboardingEmailService
if emailService != nil {
onboardingService = services.NewOnboardingEmailService(db, emailService, cfg.Server.BaseURL)
}
return &Handler{
db: db,
pushClient: pushClient,
emailService: emailService,
notificationService: notificationService,
onboardingService: onboardingService,
config: cfg,
}
}
// TaskReminderData represents a task due soon for reminder notifications
type TaskReminderData struct {
TaskID uint
TaskTitle string
DueDate time.Time
UserID uint
UserEmail string
UserName string
ResidenceName string
}
// HandleTaskReminder processes task reminder notifications for tasks due today or tomorrow with actionable buttons
func (h *Handler) HandleTaskReminder(ctx context.Context, task *asynq.Task) error {
log.Info().Msg("Processing task reminder notifications...")
now := time.Now().UTC()
currentHour := now.Hour()
systemDefaultHour := h.config.Worker.TaskReminderHour
today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
dayAfterTomorrow := today.AddDate(0, 0, 2)
log.Info().Int("current_hour", currentHour).Int("system_default_hour", systemDefaultHour).Msg("Task reminder check")
// Step 1: Find users who should receive notifications THIS hour
// Logic: Each user gets notified ONCE per day at exactly ONE hour:
// - If user has custom hour set: notify ONLY at that custom hour
// - If user has NO custom hour (NULL): notify ONLY at system default hour
// This prevents duplicates: a user with custom hour is NEVER notified at default hour
var eligibleUserIDs []uint
query := h.db.Model(&models.NotificationPreference{}).
Select("user_id").
Where("task_due_soon = true")
if currentHour == systemDefaultHour {
// At system default hour: notify users who have NO custom hour (NULL) OR whose custom hour equals default
query = query.Where("task_due_soon_hour IS NULL OR task_due_soon_hour = ?", currentHour)
} else {
// At non-default hour: only notify users who have this specific custom hour set
// Exclude users with NULL (they get notified at default hour only)
query = query.Where("task_due_soon_hour = ?", currentHour)
}
err := query.Pluck("user_id", &eligibleUserIDs).Error
if err != nil {
log.Error().Err(err).Msg("Failed to query eligible users for task reminders")
return err
}
// Early exit if no users need notifications this hour
if len(eligibleUserIDs) == 0 {
log.Debug().Int("hour", currentHour).Msg("No users scheduled for task reminder notifications this hour")
return nil
}
log.Info().Int("eligible_users", len(eligibleUserIDs)).Msg("Found users eligible for task reminders this hour")
// Step 2: Query tasks due today or tomorrow only for eligible users
// Completion detection logic matches internal/task/predicates.IsCompleted:
// A task is "completed" when NextDueDate == nil AND has at least one completion.
// See internal/task/scopes.ScopeNotCompleted for the SQL equivalent.
var dueSoonTasks []models.Task
err = h.db.Preload("Completions").Preload("Residence").
Where("(due_date >= ? AND due_date < ?) OR (next_due_date >= ? AND next_due_date < ?)",
today, dayAfterTomorrow, today, dayAfterTomorrow).
Where("is_cancelled = false").
Where("is_archived = false").
// Exclude completed tasks (matches scopes.ScopeNotCompleted)
Where("NOT (next_due_date IS NULL AND EXISTS (SELECT 1 FROM task_taskcompletion tc WHERE tc.task_id = task_task.id))").
Where("(assigned_to_id IN ? OR residence_id IN (SELECT id FROM residence_residence WHERE owner_id IN ?))",
eligibleUserIDs, eligibleUserIDs).
Find(&dueSoonTasks).Error
if err != nil {
log.Error().Err(err).Msg("Failed to query tasks due soon")
return err
}
log.Info().Int("count", len(dueSoonTasks)).Msg("Found tasks due today/tomorrow for eligible users")
// Group tasks by user (assigned_to or residence owner)
userTasks := make(map[uint][]models.Task)
for _, t := range dueSoonTasks {
var userID uint
if t.AssignedToID != nil {
userID = *t.AssignedToID
} else if t.Residence.ID != 0 {
userID = t.Residence.OwnerID
} else {
continue
}
// Only include if user is in eligible list
for _, eligibleID := range eligibleUserIDs {
if userID == eligibleID {
userTasks[userID] = append(userTasks[userID], t)
break
}
}
}
// Step 3: Send notifications (no need to check preferences again - already filtered)
// Send individual task-specific notification for each task (all tasks, no limit)
for userID, taskList := range userTasks {
for _, t := range taskList {
if err := h.notificationService.CreateAndSendTaskNotification(ctx, userID, models.NotificationTaskDueSoon, &t); err != nil {
log.Error().Err(err).Uint("user_id", userID).Uint("task_id", t.ID).Msg("Failed to send task reminder notification")
}
}
}
log.Info().Int("users_notified", len(userTasks)).Msg("Task reminder notifications completed")
return nil
}
// HandleOverdueReminder processes overdue task notifications with actionable buttons
func (h *Handler) HandleOverdueReminder(ctx context.Context, task *asynq.Task) error {
log.Info().Msg("Processing overdue task notifications...")
now := time.Now().UTC()
currentHour := now.Hour()
systemDefaultHour := h.config.Worker.OverdueReminderHour
today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
log.Info().Int("current_hour", currentHour).Int("system_default_hour", systemDefaultHour).Msg("Overdue reminder check")
// Step 1: Find users who should receive notifications THIS hour
// Logic: Each user gets notified ONCE per day at exactly ONE hour:
// - If user has custom hour set: notify ONLY at that custom hour
// - If user has NO custom hour (NULL): notify ONLY at system default hour
// This prevents duplicates: a user with custom hour is NEVER notified at default hour
var eligibleUserIDs []uint
query := h.db.Model(&models.NotificationPreference{}).
Select("user_id").
Where("task_overdue = true")
if currentHour == systemDefaultHour {
// At system default hour: notify users who have NO custom hour (NULL) OR whose custom hour equals default
query = query.Where("task_overdue_hour IS NULL OR task_overdue_hour = ?", currentHour)
} else {
// At non-default hour: only notify users who have this specific custom hour set
// Exclude users with NULL (they get notified at default hour only)
query = query.Where("task_overdue_hour = ?", currentHour)
}
err := query.Pluck("user_id", &eligibleUserIDs).Error
if err != nil {
log.Error().Err(err).Msg("Failed to query eligible users for overdue reminders")
return err
}
// Early exit if no users need notifications this hour
if len(eligibleUserIDs) == 0 {
log.Debug().Int("hour", currentHour).Msg("No users scheduled for overdue notifications this hour")
return nil
}
log.Info().Int("eligible_users", len(eligibleUserIDs)).Msg("Found users eligible for overdue reminders this hour")
// Step 2: Query overdue tasks only for eligible users
// Completion detection logic matches internal/task/predicates.IsCompleted:
// A task is "completed" when NextDueDate == nil AND has at least one completion.
// See internal/task/scopes.ScopeNotCompleted for the SQL equivalent.
var overdueTasks []models.Task
err = h.db.Preload("Completions").Preload("Residence").
Where("due_date < ? OR next_due_date < ?", today, today).
Where("is_cancelled = false").
Where("is_archived = false").
// Exclude completed tasks (matches scopes.ScopeNotCompleted)
Where("NOT (next_due_date IS NULL AND EXISTS (SELECT 1 FROM task_taskcompletion tc WHERE tc.task_id = task_task.id))").
Where("(assigned_to_id IN ? OR residence_id IN (SELECT id FROM residence_residence WHERE owner_id IN ?))",
eligibleUserIDs, eligibleUserIDs).
Find(&overdueTasks).Error
if err != nil {
log.Error().Err(err).Msg("Failed to query overdue tasks")
return err
}
log.Info().Int("count", len(overdueTasks)).Msg("Found overdue tasks for eligible users")
// Group tasks by user (assigned_to or residence owner)
userTasks := make(map[uint][]models.Task)
for _, t := range overdueTasks {
var userID uint
if t.AssignedToID != nil {
userID = *t.AssignedToID
} else if t.Residence.ID != 0 {
userID = t.Residence.OwnerID
} else {
continue
}
// Only include if user is in eligible list
for _, eligibleID := range eligibleUserIDs {
if userID == eligibleID {
userTasks[userID] = append(userTasks[userID], t)
break
}
}
}
// Step 3: Send notifications (no need to check preferences again - already filtered)
// Send individual task-specific notification for each task (all tasks, no limit)
for userID, taskList := range userTasks {
for _, t := range taskList {
if err := h.notificationService.CreateAndSendTaskNotification(ctx, userID, models.NotificationTaskOverdue, &t); err != nil {
log.Error().Err(err).Uint("user_id", userID).Uint("task_id", t.ID).Msg("Failed to send overdue notification")
}
}
}
log.Info().Int("users_notified", len(userTasks)).Msg("Overdue task notifications completed")
return nil
}
// HandleDailyDigest processes daily digest notifications with task statistics
func (h *Handler) HandleDailyDigest(ctx context.Context, task *asynq.Task) error {
log.Info().Msg("Processing daily digest notifications...")
now := time.Now().UTC()
currentHour := now.Hour()
systemDefaultHour := h.config.Worker.DailyNotifHour
today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
nextWeek := today.AddDate(0, 0, 7)
log.Info().Int("current_hour", currentHour).Int("system_default_hour", systemDefaultHour).Msg("Daily digest check")
// Step 1: Find users who should receive daily digest THIS hour
// Logic: Each user gets notified ONCE per day at exactly ONE hour:
// - If user has custom hour set: notify ONLY at that custom hour
// - If user has NO custom hour (NULL): notify ONLY at system default hour
var eligibleUserIDs []uint
query := h.db.Model(&models.NotificationPreference{}).
Select("user_id").
Where("daily_digest = true")
if currentHour == systemDefaultHour {
// At system default hour: notify users who have NO custom hour (NULL) OR whose custom hour equals default
query = query.Where("daily_digest_hour IS NULL OR daily_digest_hour = ?", currentHour)
} else {
// At non-default hour: only notify users who have this specific custom hour set
query = query.Where("daily_digest_hour = ?", currentHour)
}
err := query.Pluck("user_id", &eligibleUserIDs).Error
if err != nil {
log.Error().Err(err).Msg("Failed to query eligible users for daily digest")
return err
}
// Early exit if no users need notifications this hour
if len(eligibleUserIDs) == 0 {
log.Debug().Int("hour", currentHour).Msg("No users scheduled for daily digest notifications this hour")
return nil
}
log.Info().Int("eligible_users", len(eligibleUserIDs)).Msg("Found users eligible for daily digest this hour")
// Step 2: Get task statistics only for eligible users
// Completion detection logic matches internal/task/predicates.IsCompleted:
// A task is "completed" when NextDueDate == nil AND has at least one completion.
// We use COALESCE(next_due_date, due_date) as the effective date for categorization.
var userStats []struct {
UserID uint
TotalTasks int
OverdueTasks int
DueThisWeek int
}
err = h.db.Raw(`
SELECT
u.id as user_id,
COUNT(DISTINCT t.id) as total_tasks,
COUNT(DISTINCT CASE
WHEN COALESCE(t.next_due_date, t.due_date) < ?
AND NOT (t.next_due_date IS NULL AND EXISTS (SELECT 1 FROM task_taskcompletion tc WHERE tc.task_id = t.id))
THEN t.id
END) as overdue_tasks,
COUNT(DISTINCT CASE
WHEN COALESCE(t.next_due_date, t.due_date) >= ? AND COALESCE(t.next_due_date, t.due_date) < ?
AND NOT (t.next_due_date IS NULL AND EXISTS (SELECT 1 FROM task_taskcompletion tc WHERE tc.task_id = t.id))
THEN t.id
END) as due_this_week
FROM auth_user u
JOIN residence_residence r ON r.owner_id = u.id OR r.id IN (
SELECT residence_id FROM residence_residence_users WHERE user_id = u.id
)
JOIN task_task t ON t.residence_id = r.id
AND t.is_cancelled = false
AND t.is_archived = false
WHERE u.is_active = true AND u.id IN ?
GROUP BY u.id
HAVING COUNT(DISTINCT t.id) > 0
`, today, today, nextWeek, eligibleUserIDs).Scan(&userStats).Error
if err != nil {
log.Error().Err(err).Msg("Failed to query user task statistics")
return err
}
log.Info().Int("users_with_tasks", len(userStats)).Msg("Processing daily digest for users")
// Step 3: Send notifications
for _, stats := range userStats {
// Skip users with no actionable items
if stats.OverdueTasks == 0 && stats.DueThisWeek == 0 {
continue
}
// Build notification message
title := "Daily Task Summary"
var body string
if stats.OverdueTasks > 0 && stats.DueThisWeek > 0 {
body = fmt.Sprintf("You have %d overdue task(s) and %d task(s) due this week", stats.OverdueTasks, stats.DueThisWeek)
} else if stats.OverdueTasks > 0 {
body = fmt.Sprintf("You have %d overdue task(s) that need attention", stats.OverdueTasks)
} else {
body = fmt.Sprintf("You have %d task(s) due this week", stats.DueThisWeek)
}
// Send push notification
if err := h.sendPushToUser(ctx, stats.UserID, title, body, map[string]string{
"type": "daily_digest",
"overdue": fmt.Sprintf("%d", stats.OverdueTasks),
"due_this_week": fmt.Sprintf("%d", stats.DueThisWeek),
}); err != nil {
log.Error().Err(err).Uint("user_id", stats.UserID).Msg("Failed to send daily digest push")
}
}
log.Info().Int("users_notified", len(userStats)).Msg("Daily digest notifications completed")
return nil
}
// EmailPayload represents the payload for email tasks
type EmailPayload struct {
To string `json:"to"`
Subject string `json:"subject"`
HTMLBody string `json:"html_body"`
TextBody string `json:"text_body"`
}
// HandleSendEmail processes email sending tasks
func (h *Handler) HandleSendEmail(ctx context.Context, task *asynq.Task) error {
var payload EmailPayload
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
return fmt.Errorf("failed to unmarshal payload: %w", err)
}
log.Info().
Str("to", payload.To).
Str("subject", payload.Subject).
Msg("Sending email...")
if h.emailService == nil {
log.Warn().Msg("Email service not configured, skipping email")
return nil
}
// Use the email service to send the email
if err := h.emailService.SendEmail(payload.To, payload.Subject, payload.HTMLBody, payload.TextBody); err != nil {
log.Error().Err(err).Str("to", payload.To).Msg("Failed to send email")
return err
}
log.Info().Str("to", payload.To).Msg("Email sent successfully")
return nil
}
// PushPayload represents the payload for push notification tasks
type PushPayload struct {
UserID uint `json:"user_id"`
Title string `json:"title"`
Message string `json:"message"`
Data map[string]string `json:"data,omitempty"`
}
// HandleSendPush processes push notification tasks
func (h *Handler) HandleSendPush(ctx context.Context, task *asynq.Task) error {
var payload PushPayload
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
return fmt.Errorf("failed to unmarshal payload: %w", err)
}
log.Info().
Uint("user_id", payload.UserID).
Str("title", payload.Title).
Msg("Sending push notification...")
if err := h.sendPushToUser(ctx, payload.UserID, payload.Title, payload.Message, payload.Data); err != nil {
log.Error().Err(err).Uint("user_id", payload.UserID).Msg("Failed to send push notification")
return err
}
log.Info().Uint("user_id", payload.UserID).Msg("Push notification sent successfully")
return nil
}
// sendPushToUser sends a push notification to all of a user's devices
func (h *Handler) sendPushToUser(ctx context.Context, userID uint, title, message string, data map[string]string) error {
if h.pushClient == nil {
log.Warn().Msg("Push client not configured, skipping notification")
return nil
}
// Get iOS device tokens
var iosTokens []string
err := h.db.Model(&models.APNSDevice{}).
Where("user_id = ? AND active = ?", userID, true).
Pluck("registration_id", &iosTokens).Error
if err != nil {
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get iOS tokens")
}
// Get Android device tokens
var androidTokens []string
err = h.db.Model(&models.GCMDevice{}).
Where("user_id = ? AND active = ?", userID, true).
Pluck("registration_id", &androidTokens).Error
if err != nil {
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get Android tokens")
}
if len(iosTokens) == 0 && len(androidTokens) == 0 {
log.Debug().Uint("user_id", userID).Msg("No device tokens found for user")
return nil
}
log.Debug().
Uint("user_id", userID).
Int("ios_tokens", len(iosTokens)).
Int("android_tokens", len(androidTokens)).
Msg("Sending push to user devices")
// Send to all devices
return h.pushClient.SendToAll(ctx, iosTokens, androidTokens, title, message, data)
}
// NewSendEmailTask creates a new email sending task
func NewSendEmailTask(to, subject, htmlBody, textBody string) (*asynq.Task, error) {
payload, err := json.Marshal(EmailPayload{
To: to,
Subject: subject,
HTMLBody: htmlBody,
TextBody: textBody,
})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeSendEmail, payload), nil
}
// NewSendPushTask creates a new push notification task
func NewSendPushTask(userID uint, title, message string, data map[string]string) (*asynq.Task, error) {
payload, err := json.Marshal(PushPayload{
UserID: userID,
Title: title,
Message: message,
Data: data,
})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeSendPush, payload), nil
}
// HandleOnboardingEmails processes onboarding email campaigns
// Sends emails to:
// 1. Users who registered 2+ days ago but haven't created a residence
// 2. Users who created a residence 5+ days ago but haven't created any tasks
// Each email type is only sent once per user, ever.
func (h *Handler) HandleOnboardingEmails(ctx context.Context, task *asynq.Task) error {
log.Info().Msg("Processing onboarding emails...")
if h.onboardingService == nil {
log.Warn().Msg("Onboarding email service not configured, skipping")
return nil
}
// Send no-residence emails (users without any residences after 2 days)
noResCount, err := h.onboardingService.CheckAndSendNoResidenceEmails()
if err != nil {
log.Error().Err(err).Msg("Failed to process no-residence onboarding emails")
// Continue to next type, don't return error
} else {
log.Info().Int("count", noResCount).Msg("Sent no-residence onboarding emails")
}
// Send no-tasks emails (users with residence but no tasks after 5 days)
noTasksCount, err := h.onboardingService.CheckAndSendNoTasksEmails()
if err != nil {
log.Error().Err(err).Msg("Failed to process no-tasks onboarding emails")
} else {
log.Info().Int("count", noTasksCount).Msg("Sent no-tasks onboarding emails")
}
log.Info().
Int("no_residence_sent", noResCount).
Int("no_tasks_sent", noTasksCount).
Msg("Onboarding email processing completed")
return nil
}