The WHERE clause with `? = ?` comparing two bound parameters caused PostgreSQL to fail with "unable to encode into text format" error. Fixed by moving the comparison to Go code and building the query conditionally. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
490 lines
16 KiB
Go
490 lines
16 KiB
Go
package jobs
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/hibiken/asynq"
|
|
"github.com/rs/zerolog/log"
|
|
"gorm.io/gorm"
|
|
|
|
"github.com/treytartt/casera-api/internal/config"
|
|
"github.com/treytartt/casera-api/internal/models"
|
|
"github.com/treytartt/casera-api/internal/push"
|
|
"github.com/treytartt/casera-api/internal/services"
|
|
)
|
|
|
|
// Task types
|
|
const (
|
|
TypeTaskReminder = "notification:task_reminder"
|
|
TypeOverdueReminder = "notification:overdue_reminder"
|
|
TypeDailyDigest = "notification:daily_digest"
|
|
TypeSendEmail = "email:send"
|
|
TypeSendPush = "push:send"
|
|
)
|
|
|
|
// Handler handles background job processing
|
|
type Handler struct {
|
|
db *gorm.DB
|
|
pushClient *push.Client
|
|
emailService *services.EmailService
|
|
notificationService *services.NotificationService
|
|
config *config.Config
|
|
}
|
|
|
|
// NewHandler creates a new job handler
|
|
func NewHandler(db *gorm.DB, pushClient *push.Client, emailService *services.EmailService, notificationService *services.NotificationService, cfg *config.Config) *Handler {
|
|
return &Handler{
|
|
db: db,
|
|
pushClient: pushClient,
|
|
emailService: emailService,
|
|
notificationService: notificationService,
|
|
config: cfg,
|
|
}
|
|
}
|
|
|
|
// TaskReminderData represents a task due soon for reminder notifications
|
|
type TaskReminderData struct {
|
|
TaskID uint
|
|
TaskTitle string
|
|
DueDate time.Time
|
|
UserID uint
|
|
UserEmail string
|
|
UserName string
|
|
ResidenceName string
|
|
}
|
|
|
|
// HandleTaskReminder processes task reminder notifications for tasks due today or tomorrow with actionable buttons
|
|
func (h *Handler) HandleTaskReminder(ctx context.Context, task *asynq.Task) error {
|
|
log.Info().Msg("Processing task reminder notifications...")
|
|
|
|
now := time.Now().UTC()
|
|
currentHour := now.Hour()
|
|
systemDefaultHour := h.config.Worker.TaskReminderHour
|
|
today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
|
|
dayAfterTomorrow := today.AddDate(0, 0, 2)
|
|
|
|
log.Info().Int("current_hour", currentHour).Int("system_default_hour", systemDefaultHour).Msg("Task reminder check")
|
|
|
|
// Step 1: Find users who should receive notifications THIS hour
|
|
// Users with custom hour matching current hour, OR users with no custom hour when current hour is system default
|
|
var eligibleUserIDs []uint
|
|
|
|
// Build query based on whether current hour is system default
|
|
query := h.db.Model(&models.NotificationPreference{}).
|
|
Select("user_id").
|
|
Where("task_due_soon = true")
|
|
|
|
if currentHour == systemDefaultHour {
|
|
// Current hour is the system default, so include users with custom hour OR no custom hour (NULL)
|
|
query = query.Where("(task_due_soon_hour = ? OR task_due_soon_hour IS NULL)", currentHour)
|
|
} else {
|
|
// Current hour is not system default, so only include users with this specific custom hour
|
|
query = query.Where("task_due_soon_hour = ?", currentHour)
|
|
}
|
|
|
|
err := query.Pluck("user_id", &eligibleUserIDs).Error
|
|
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to query eligible users for task reminders")
|
|
return err
|
|
}
|
|
|
|
// Early exit if no users need notifications this hour
|
|
if len(eligibleUserIDs) == 0 {
|
|
log.Debug().Int("hour", currentHour).Msg("No users scheduled for task reminder notifications this hour")
|
|
return nil
|
|
}
|
|
|
|
log.Info().Int("eligible_users", len(eligibleUserIDs)).Msg("Found users eligible for task reminders this hour")
|
|
|
|
// Step 2: Query tasks due today or tomorrow only for eligible users
|
|
var dueSoonTasks []models.Task
|
|
err = h.db.Preload("Status").Preload("Completions").Preload("Residence").
|
|
Where("(due_date >= ? AND due_date < ?) OR (next_due_date >= ? AND next_due_date < ?)",
|
|
today, dayAfterTomorrow, today, dayAfterTomorrow).
|
|
Where("is_cancelled = false").
|
|
Where("is_archived = false").
|
|
Where("NOT EXISTS (SELECT 1 FROM task_taskcompletion tc WHERE tc.task_id = task_task.id AND tc.completed_at >= task_task.due_date)").
|
|
Where("(assigned_to_id IN ? OR residence_id IN (SELECT id FROM residence_residence WHERE owner_id IN ?))",
|
|
eligibleUserIDs, eligibleUserIDs).
|
|
Find(&dueSoonTasks).Error
|
|
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to query tasks due soon")
|
|
return err
|
|
}
|
|
|
|
log.Info().Int("count", len(dueSoonTasks)).Msg("Found tasks due today/tomorrow for eligible users")
|
|
|
|
// Group tasks by user (assigned_to or residence owner)
|
|
userTasks := make(map[uint][]models.Task)
|
|
for _, t := range dueSoonTasks {
|
|
var userID uint
|
|
if t.AssignedToID != nil {
|
|
userID = *t.AssignedToID
|
|
} else if t.Residence.ID != 0 {
|
|
userID = t.Residence.OwnerID
|
|
} else {
|
|
continue
|
|
}
|
|
// Only include if user is in eligible list
|
|
for _, eligibleID := range eligibleUserIDs {
|
|
if userID == eligibleID {
|
|
userTasks[userID] = append(userTasks[userID], t)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Step 3: Send notifications (no need to check preferences again - already filtered)
|
|
for userID, taskList := range userTasks {
|
|
// Send individual actionable notification for each task (up to 5)
|
|
maxNotifications := 5
|
|
if len(taskList) < maxNotifications {
|
|
maxNotifications = len(taskList)
|
|
}
|
|
|
|
for i := 0; i < maxNotifications; i++ {
|
|
t := taskList[i]
|
|
if err := h.notificationService.CreateAndSendTaskNotification(ctx, userID, models.NotificationTaskDueSoon, &t); err != nil {
|
|
log.Error().Err(err).Uint("user_id", userID).Uint("task_id", t.ID).Msg("Failed to send task reminder notification")
|
|
}
|
|
}
|
|
|
|
// If more than 5 tasks, send a summary notification
|
|
if len(taskList) > 5 {
|
|
title := "More Tasks Due Soon"
|
|
body := fmt.Sprintf("You have %d more tasks due soon", len(taskList)-5)
|
|
if err := h.sendPushToUser(ctx, userID, title, body, map[string]string{
|
|
"type": "task_reminder_summary",
|
|
}); err != nil {
|
|
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to send task reminder summary")
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Info().Int("users_notified", len(userTasks)).Msg("Task reminder notifications completed")
|
|
return nil
|
|
}
|
|
|
|
// HandleOverdueReminder processes overdue task notifications with actionable buttons
|
|
func (h *Handler) HandleOverdueReminder(ctx context.Context, task *asynq.Task) error {
|
|
log.Info().Msg("Processing overdue task notifications...")
|
|
|
|
now := time.Now().UTC()
|
|
currentHour := now.Hour()
|
|
systemDefaultHour := h.config.Worker.OverdueReminderHour
|
|
today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
|
|
|
|
log.Info().Int("current_hour", currentHour).Int("system_default_hour", systemDefaultHour).Msg("Overdue reminder check")
|
|
|
|
// Step 1: Find users who should receive notifications THIS hour
|
|
// Users with custom hour matching current hour, OR users with no custom hour when current hour is system default
|
|
var eligibleUserIDs []uint
|
|
|
|
// Build query based on whether current hour is system default
|
|
query := h.db.Model(&models.NotificationPreference{}).
|
|
Select("user_id").
|
|
Where("task_overdue = true")
|
|
|
|
if currentHour == systemDefaultHour {
|
|
// Current hour is the system default, so include users with custom hour OR no custom hour (NULL)
|
|
query = query.Where("(task_overdue_hour = ? OR task_overdue_hour IS NULL)", currentHour)
|
|
} else {
|
|
// Current hour is not system default, so only include users with this specific custom hour
|
|
query = query.Where("task_overdue_hour = ?", currentHour)
|
|
}
|
|
|
|
err := query.Pluck("user_id", &eligibleUserIDs).Error
|
|
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to query eligible users for overdue reminders")
|
|
return err
|
|
}
|
|
|
|
// Early exit if no users need notifications this hour
|
|
if len(eligibleUserIDs) == 0 {
|
|
log.Debug().Int("hour", currentHour).Msg("No users scheduled for overdue notifications this hour")
|
|
return nil
|
|
}
|
|
|
|
log.Info().Int("eligible_users", len(eligibleUserIDs)).Msg("Found users eligible for overdue reminders this hour")
|
|
|
|
// Step 2: Query overdue tasks only for eligible users
|
|
var overdueTasks []models.Task
|
|
err = h.db.Preload("Status").Preload("Completions").Preload("Residence").
|
|
Where("due_date < ? OR next_due_date < ?", today, today).
|
|
Where("is_cancelled = false").
|
|
Where("is_archived = false").
|
|
Where("NOT EXISTS (SELECT 1 FROM task_taskcompletion tc WHERE tc.task_id = task_task.id AND tc.completed_at >= task_task.due_date)").
|
|
Where("(assigned_to_id IN ? OR residence_id IN (SELECT id FROM residence_residence WHERE owner_id IN ?))",
|
|
eligibleUserIDs, eligibleUserIDs).
|
|
Find(&overdueTasks).Error
|
|
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to query overdue tasks")
|
|
return err
|
|
}
|
|
|
|
log.Info().Int("count", len(overdueTasks)).Msg("Found overdue tasks for eligible users")
|
|
|
|
// Group tasks by user (assigned_to or residence owner)
|
|
userTasks := make(map[uint][]models.Task)
|
|
for _, t := range overdueTasks {
|
|
var userID uint
|
|
if t.AssignedToID != nil {
|
|
userID = *t.AssignedToID
|
|
} else if t.Residence.ID != 0 {
|
|
userID = t.Residence.OwnerID
|
|
} else {
|
|
continue
|
|
}
|
|
// Only include if user is in eligible list
|
|
for _, eligibleID := range eligibleUserIDs {
|
|
if userID == eligibleID {
|
|
userTasks[userID] = append(userTasks[userID], t)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Step 3: Send notifications (no need to check preferences again - already filtered)
|
|
for userID, taskList := range userTasks {
|
|
// Send individual actionable notification for each task (up to 5)
|
|
maxNotifications := 5
|
|
if len(taskList) < maxNotifications {
|
|
maxNotifications = len(taskList)
|
|
}
|
|
|
|
for i := 0; i < maxNotifications; i++ {
|
|
t := taskList[i]
|
|
if err := h.notificationService.CreateAndSendTaskNotification(ctx, userID, models.NotificationTaskOverdue, &t); err != nil {
|
|
log.Error().Err(err).Uint("user_id", userID).Uint("task_id", t.ID).Msg("Failed to send overdue notification")
|
|
}
|
|
}
|
|
|
|
// If more than 5 tasks, send a summary notification
|
|
if len(taskList) > 5 {
|
|
title := "More Overdue Tasks"
|
|
body := fmt.Sprintf("You have %d more overdue tasks that need attention", len(taskList)-5)
|
|
if err := h.sendPushToUser(ctx, userID, title, body, map[string]string{
|
|
"type": "overdue_summary",
|
|
}); err != nil {
|
|
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to send overdue summary")
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Info().Int("users_notified", len(userTasks)).Msg("Overdue task notifications completed")
|
|
return nil
|
|
}
|
|
|
|
// HandleDailyDigest processes daily digest notifications with task statistics
|
|
func (h *Handler) HandleDailyDigest(ctx context.Context, task *asynq.Task) error {
|
|
log.Info().Msg("Processing daily digest notifications...")
|
|
|
|
now := time.Now().UTC()
|
|
today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
|
|
nextWeek := today.AddDate(0, 0, 7)
|
|
|
|
// Get all users with their task statistics
|
|
var userStats []struct {
|
|
UserID uint
|
|
TotalTasks int
|
|
OverdueTasks int
|
|
DueThisWeek int
|
|
}
|
|
|
|
err := h.db.Raw(`
|
|
SELECT
|
|
u.id as user_id,
|
|
COUNT(DISTINCT t.id) as total_tasks,
|
|
COUNT(DISTINCT CASE WHEN t.due_date < ? AND tc.id IS NULL THEN t.id END) as overdue_tasks,
|
|
COUNT(DISTINCT CASE WHEN t.due_date >= ? AND t.due_date < ? AND tc.id IS NULL THEN t.id END) as due_this_week
|
|
FROM auth_user u
|
|
JOIN residence_residence r ON r.owner_id = u.id OR r.id IN (
|
|
SELECT residence_id FROM residence_residence_users WHERE user_id = u.id
|
|
)
|
|
JOIN task_task t ON t.residence_id = r.id
|
|
AND t.is_cancelled = false
|
|
AND t.is_archived = false
|
|
LEFT JOIN task_taskcompletion tc ON t.id = tc.task_id
|
|
WHERE u.is_active = true
|
|
GROUP BY u.id
|
|
HAVING COUNT(DISTINCT t.id) > 0
|
|
`, today, today, nextWeek).Scan(&userStats).Error
|
|
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to query user task statistics")
|
|
return err
|
|
}
|
|
|
|
log.Info().Int("users", len(userStats)).Msg("Processing daily digest for users")
|
|
|
|
for _, stats := range userStats {
|
|
// Skip users with no actionable items
|
|
if stats.OverdueTasks == 0 && stats.DueThisWeek == 0 {
|
|
continue
|
|
}
|
|
|
|
// Build notification message
|
|
title := "Daily Task Summary"
|
|
var body string
|
|
|
|
if stats.OverdueTasks > 0 && stats.DueThisWeek > 0 {
|
|
body = fmt.Sprintf("You have %d overdue task(s) and %d task(s) due this week", stats.OverdueTasks, stats.DueThisWeek)
|
|
} else if stats.OverdueTasks > 0 {
|
|
body = fmt.Sprintf("You have %d overdue task(s) that need attention", stats.OverdueTasks)
|
|
} else {
|
|
body = fmt.Sprintf("You have %d task(s) due this week", stats.DueThisWeek)
|
|
}
|
|
|
|
// Send push notification
|
|
if err := h.sendPushToUser(ctx, stats.UserID, title, body, map[string]string{
|
|
"type": "daily_digest",
|
|
"overdue": fmt.Sprintf("%d", stats.OverdueTasks),
|
|
"due_this_week": fmt.Sprintf("%d", stats.DueThisWeek),
|
|
}); err != nil {
|
|
log.Error().Err(err).Uint("user_id", stats.UserID).Msg("Failed to send daily digest push")
|
|
}
|
|
}
|
|
|
|
log.Info().Msg("Daily digest notifications completed")
|
|
return nil
|
|
}
|
|
|
|
// EmailPayload represents the payload for email tasks
|
|
type EmailPayload struct {
|
|
To string `json:"to"`
|
|
Subject string `json:"subject"`
|
|
HTMLBody string `json:"html_body"`
|
|
TextBody string `json:"text_body"`
|
|
}
|
|
|
|
// HandleSendEmail processes email sending tasks
|
|
func (h *Handler) HandleSendEmail(ctx context.Context, task *asynq.Task) error {
|
|
var payload EmailPayload
|
|
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
|
|
return fmt.Errorf("failed to unmarshal payload: %w", err)
|
|
}
|
|
|
|
log.Info().
|
|
Str("to", payload.To).
|
|
Str("subject", payload.Subject).
|
|
Msg("Sending email...")
|
|
|
|
if h.emailService == nil {
|
|
log.Warn().Msg("Email service not configured, skipping email")
|
|
return nil
|
|
}
|
|
|
|
// Use the email service to send the email
|
|
if err := h.emailService.SendEmail(payload.To, payload.Subject, payload.HTMLBody, payload.TextBody); err != nil {
|
|
log.Error().Err(err).Str("to", payload.To).Msg("Failed to send email")
|
|
return err
|
|
}
|
|
|
|
log.Info().Str("to", payload.To).Msg("Email sent successfully")
|
|
return nil
|
|
}
|
|
|
|
// PushPayload represents the payload for push notification tasks
|
|
type PushPayload struct {
|
|
UserID uint `json:"user_id"`
|
|
Title string `json:"title"`
|
|
Message string `json:"message"`
|
|
Data map[string]string `json:"data,omitempty"`
|
|
}
|
|
|
|
// HandleSendPush processes push notification tasks
|
|
func (h *Handler) HandleSendPush(ctx context.Context, task *asynq.Task) error {
|
|
var payload PushPayload
|
|
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
|
|
return fmt.Errorf("failed to unmarshal payload: %w", err)
|
|
}
|
|
|
|
log.Info().
|
|
Uint("user_id", payload.UserID).
|
|
Str("title", payload.Title).
|
|
Msg("Sending push notification...")
|
|
|
|
if err := h.sendPushToUser(ctx, payload.UserID, payload.Title, payload.Message, payload.Data); err != nil {
|
|
log.Error().Err(err).Uint("user_id", payload.UserID).Msg("Failed to send push notification")
|
|
return err
|
|
}
|
|
|
|
log.Info().Uint("user_id", payload.UserID).Msg("Push notification sent successfully")
|
|
return nil
|
|
}
|
|
|
|
// sendPushToUser sends a push notification to all of a user's devices
|
|
func (h *Handler) sendPushToUser(ctx context.Context, userID uint, title, message string, data map[string]string) error {
|
|
if h.pushClient == nil {
|
|
log.Warn().Msg("Push client not configured, skipping notification")
|
|
return nil
|
|
}
|
|
|
|
// Get iOS device tokens
|
|
var iosTokens []string
|
|
err := h.db.Model(&models.APNSDevice{}).
|
|
Where("user_id = ? AND active = ?", userID, true).
|
|
Pluck("registration_id", &iosTokens).Error
|
|
if err != nil {
|
|
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get iOS tokens")
|
|
}
|
|
|
|
// Get Android device tokens
|
|
var androidTokens []string
|
|
err = h.db.Model(&models.GCMDevice{}).
|
|
Where("user_id = ? AND active = ?", userID, true).
|
|
Pluck("registration_id", &androidTokens).Error
|
|
if err != nil {
|
|
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get Android tokens")
|
|
}
|
|
|
|
if len(iosTokens) == 0 && len(androidTokens) == 0 {
|
|
log.Debug().Uint("user_id", userID).Msg("No device tokens found for user")
|
|
return nil
|
|
}
|
|
|
|
log.Debug().
|
|
Uint("user_id", userID).
|
|
Int("ios_tokens", len(iosTokens)).
|
|
Int("android_tokens", len(androidTokens)).
|
|
Msg("Sending push to user devices")
|
|
|
|
// Send to all devices
|
|
return h.pushClient.SendToAll(ctx, iosTokens, androidTokens, title, message, data)
|
|
}
|
|
|
|
// NewSendEmailTask creates a new email sending task
|
|
func NewSendEmailTask(to, subject, htmlBody, textBody string) (*asynq.Task, error) {
|
|
payload, err := json.Marshal(EmailPayload{
|
|
To: to,
|
|
Subject: subject,
|
|
HTMLBody: htmlBody,
|
|
TextBody: textBody,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return asynq.NewTask(TypeSendEmail, payload), nil
|
|
}
|
|
|
|
// NewSendPushTask creates a new push notification task
|
|
func NewSendPushTask(userID uint, title, message string, data map[string]string) (*asynq.Task, error) {
|
|
payload, err := json.Marshal(PushPayload{
|
|
UserID: userID,
|
|
Title: title,
|
|
Message: message,
|
|
Data: data,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return asynq.NewTask(TypeSendPush, payload), nil
|
|
}
|