Security: - Replace all binding: tags with validate: + c.Validate() in admin handlers - Add rate limiting to auth endpoints (login, register, password reset) - Add security headers (HSTS, XSS protection, nosniff, frame options) - Wire Google Pub/Sub token verification into webhook handler - Replace ParseUnverified with proper OIDC/JWKS key verification - Verify inner Apple JWS signatures in webhook handler - Add io.LimitReader (1MB) to all webhook body reads - Add ownership verification to file deletion - Move hardcoded admin credentials to env vars - Add uniqueIndex to User.Email - Hide ConfirmationCode from JSON serialization - Mask confirmation codes in admin responses - Use http.DetectContentType for upload validation - Fix path traversal in storage service - Replace os.Getenv with Viper in stripe service - Sanitize Redis URLs before logging - Separate DEBUG_FIXED_CODES from DEBUG flag - Reject weak SECRET_KEY in production - Add host check on /_next/* proxy routes - Use explicit localhost CORS origins in debug mode - Replace err.Error() with generic messages in all admin error responses Critical fixes: - Rewrite FCM to HTTP v1 API with OAuth 2.0 service account auth - Fix user_customuser -> auth_user table names in raw SQL - Fix dashboard verified query to use UserProfile model - Add escapeLikeWildcards() to prevent SQL wildcard injection Bug fixes: - Add bounds checks for days/expiring_soon query params (1-3650) - Add receipt_data/transaction_id empty-check to RestoreSubscription - Change Active bool -> *bool in device handler - Check all unchecked GORM/FindByIDWithProfile errors - Add validation for notification hour fields (0-23) - Add max=10000 validation on task description updates Transactions & data integrity: - Wrap registration flow in transaction - Wrap QuickComplete in transaction - Move image creation inside completion transaction - Wrap SetSpecialties in transaction - Wrap GetOrCreateToken in transaction - Wrap completion+image deletion in transaction Performance: - Batch completion summaries (2 queries vs 2N) - Reuse single http.Client in IAP validation - Cache dashboard counts (30s TTL) - Batch COUNT queries in admin user list - Add Limit(500) to document queries - Add reminder_stage+due_date filters to reminder queries - Parse AllowedTypes once at init - In-memory user cache in auth middleware (30s TTL) - Timezone change detection cache - Optimize P95 with per-endpoint sorted buffers - Replace crypto/md5 with hash/fnv for ETags Code quality: - Add sync.Once to all monitoring Stop()/Close() methods - Replace 8 fmt.Printf with zerolog in auth service - Log previously discarded errors - Standardize delete response shapes - Route hardcoded English through i18n - Remove FileURL from DocumentResponse (keep MediaURL only) - Thread user timezone through kanban board responses - Initialize empty slices to prevent null JSON - Extract shared field map for task Update/UpdateTx - Delete unused SoftDeleteModel, min(), formatCron, legacy handlers Worker & jobs: - Wire Asynq email infrastructure into worker - Register HandleReminderLogCleanup with daily 3AM cron - Use per-user timezone in HandleSmartReminder - Replace direct DB queries with repository calls - Delete legacy reminder handlers (~200 lines) - Delete unused task type constants Dependencies: - Replace archived jung-kurt/gofpdf with go-pdf/fpdf - Replace unmaintained gomail.v2 with wneessen/go-mail - Add TODO for Echo jwt v3 transitive dep removal Test infrastructure: - Fix MakeRequest/SeedLookupData error handling - Replace os.Exit(0) with t.Skip() in scope/consistency tests - Add 11 new FCM v1 tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
645 lines
21 KiB
Go
645 lines
21 KiB
Go
package jobs
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/hibiken/asynq"
|
|
"github.com/rs/zerolog/log"
|
|
"gorm.io/gorm"
|
|
|
|
"github.com/treytartt/honeydue-api/internal/config"
|
|
"github.com/treytartt/honeydue-api/internal/models"
|
|
"github.com/treytartt/honeydue-api/internal/notifications"
|
|
"github.com/treytartt/honeydue-api/internal/push"
|
|
"github.com/treytartt/honeydue-api/internal/repositories"
|
|
"github.com/treytartt/honeydue-api/internal/services"
|
|
)
|
|
|
|
// Task types
|
|
const (
|
|
TypeSmartReminder = "notification:smart_reminder" // Frequency-aware reminders
|
|
TypeDailyDigest = "notification:daily_digest"
|
|
TypeSendEmail = "email:send"
|
|
TypeSendPush = "push:send"
|
|
TypeOnboardingEmails = "email:onboarding"
|
|
TypeReminderLogCleanup = "maintenance:reminder_log_cleanup"
|
|
)
|
|
|
|
// Handler handles background job processing
|
|
type Handler struct {
|
|
db *gorm.DB
|
|
taskRepo *repositories.TaskRepository
|
|
residenceRepo *repositories.ResidenceRepository
|
|
reminderRepo *repositories.ReminderRepository
|
|
notificationRepo *repositories.NotificationRepository
|
|
pushClient *push.Client
|
|
emailService *services.EmailService
|
|
notificationService *services.NotificationService
|
|
onboardingService *services.OnboardingEmailService
|
|
config *config.Config
|
|
}
|
|
|
|
// NewHandler creates a new job handler
|
|
func NewHandler(db *gorm.DB, pushClient *push.Client, emailService *services.EmailService, notificationService *services.NotificationService, cfg *config.Config) *Handler {
|
|
// Create onboarding email service
|
|
var onboardingService *services.OnboardingEmailService
|
|
if emailService != nil {
|
|
onboardingService = services.NewOnboardingEmailService(db, emailService, cfg.Server.BaseURL)
|
|
}
|
|
|
|
return &Handler{
|
|
db: db,
|
|
taskRepo: repositories.NewTaskRepository(db),
|
|
residenceRepo: repositories.NewResidenceRepository(db),
|
|
reminderRepo: repositories.NewReminderRepository(db),
|
|
notificationRepo: repositories.NewNotificationRepository(db),
|
|
pushClient: pushClient,
|
|
emailService: emailService,
|
|
notificationService: notificationService,
|
|
onboardingService: onboardingService,
|
|
config: cfg,
|
|
}
|
|
}
|
|
|
|
// HandleDailyDigest processes daily digest notifications with task statistics
|
|
func (h *Handler) HandleDailyDigest(ctx context.Context, task *asynq.Task) error {
|
|
log.Info().Msg("Processing daily digest notifications...")
|
|
|
|
now := time.Now().UTC()
|
|
currentHour := now.Hour()
|
|
systemDefaultHour := h.config.Worker.DailyNotifHour
|
|
|
|
log.Info().Int("current_hour", currentHour).Int("system_default_hour", systemDefaultHour).Msg("Daily digest check")
|
|
|
|
// Step 1: Find users who should receive daily digest THIS hour
|
|
// Logic: Each user gets notified ONCE per day at exactly ONE hour:
|
|
// - If user has custom hour set: notify ONLY at that custom hour
|
|
// - If user has NO custom hour (NULL): notify ONLY at system default hour
|
|
var eligibleUserIDs []uint
|
|
|
|
query := h.db.Model(&models.NotificationPreference{}).
|
|
Select("user_id").
|
|
Where("daily_digest = true")
|
|
|
|
if currentHour == systemDefaultHour {
|
|
// At system default hour: notify users who have NO custom hour (NULL) OR whose custom hour equals default
|
|
query = query.Where("daily_digest_hour IS NULL OR daily_digest_hour = ?", currentHour)
|
|
} else {
|
|
// At non-default hour: only notify users who have this specific custom hour set
|
|
query = query.Where("daily_digest_hour = ?", currentHour)
|
|
}
|
|
|
|
err := query.Pluck("user_id", &eligibleUserIDs).Error
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to query eligible users for daily digest")
|
|
return err
|
|
}
|
|
|
|
// Early exit if no users need notifications this hour
|
|
if len(eligibleUserIDs) == 0 {
|
|
log.Debug().Int("hour", currentHour).Msg("No users scheduled for daily digest notifications this hour")
|
|
return nil
|
|
}
|
|
|
|
log.Info().Int("eligible_users", len(eligibleUserIDs)).Msg("Found users eligible for daily digest this hour")
|
|
|
|
// Step 2: Get task statistics for each user using canonical repository functions
|
|
// This ensures consistency with kanban display - uses the same scopes:
|
|
// - task.ScopeOverdue (excludes in-progress tasks)
|
|
// - task.ScopeDueSoon (7 days for "due this week")
|
|
var usersNotified int
|
|
|
|
for _, userID := range eligibleUserIDs {
|
|
// Get user's timezone from notification preferences for accurate overdue calculation
|
|
// This ensures the daily digest matches what the user sees in the kanban UI
|
|
var userNow time.Time
|
|
if prefs, err := h.notificationRepo.FindPreferencesByUser(userID); err == nil && prefs.Timezone != nil {
|
|
if loc, err := time.LoadLocation(*prefs.Timezone); err == nil {
|
|
// Use start of day in user's timezone (matches kanban behavior)
|
|
userNowInTz := time.Now().In(loc)
|
|
userNow = time.Date(userNowInTz.Year(), userNowInTz.Month(), userNowInTz.Day(), 0, 0, 0, 0, loc)
|
|
log.Debug().Uint("user_id", userID).Str("timezone", *prefs.Timezone).Time("user_now", userNow).Msg("Using user timezone for daily digest")
|
|
} else {
|
|
userNow = now // Fallback to UTC
|
|
}
|
|
} else {
|
|
userNow = now // Fallback to UTC if no timezone stored
|
|
}
|
|
|
|
// Get user's residence IDs (owned + member)
|
|
residenceIDs, err := h.residenceRepo.FindResidenceIDsByUser(userID)
|
|
if err != nil {
|
|
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get user residences")
|
|
continue
|
|
}
|
|
|
|
if len(residenceIDs) == 0 {
|
|
continue // User has no residences
|
|
}
|
|
|
|
// Query overdue tasks using canonical scopes (excludes in-progress)
|
|
// Uses userNow (timezone-aware) for accurate overdue detection
|
|
opts := repositories.TaskFilterOptions{
|
|
ResidenceIDs: residenceIDs,
|
|
IncludeInProgress: false, // Match kanban: in-progress tasks not in overdue column
|
|
PreloadCompletions: true,
|
|
}
|
|
|
|
overdueTasks, err := h.taskRepo.GetOverdueTasks(userNow, opts)
|
|
if err != nil {
|
|
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get overdue tasks")
|
|
continue
|
|
}
|
|
|
|
// Query due-this-week tasks (7 days threshold)
|
|
dueSoonTasks, err := h.taskRepo.GetDueSoonTasks(userNow, 7, opts)
|
|
if err != nil {
|
|
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get due-soon tasks")
|
|
continue
|
|
}
|
|
|
|
overdueCount := len(overdueTasks)
|
|
dueThisWeekCount := len(dueSoonTasks)
|
|
|
|
// Skip users with no actionable items
|
|
if overdueCount == 0 && dueThisWeekCount == 0 {
|
|
continue
|
|
}
|
|
|
|
// Build notification message
|
|
title := "Daily Task Summary"
|
|
var body string
|
|
|
|
if overdueCount > 0 && dueThisWeekCount > 0 {
|
|
body = fmt.Sprintf("You have %d overdue task(s) and %d task(s) due this week", overdueCount, dueThisWeekCount)
|
|
} else if overdueCount > 0 {
|
|
body = fmt.Sprintf("You have %d overdue task(s) that need attention", overdueCount)
|
|
} else {
|
|
body = fmt.Sprintf("You have %d task(s) due this week", dueThisWeekCount)
|
|
}
|
|
|
|
// Send push notification
|
|
if err := h.sendPushToUser(ctx, userID, title, body, map[string]string{
|
|
"type": "daily_digest",
|
|
"overdue": fmt.Sprintf("%d", overdueCount),
|
|
"due_this_week": fmt.Sprintf("%d", dueThisWeekCount),
|
|
}); err != nil {
|
|
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to send daily digest push")
|
|
} else {
|
|
usersNotified++
|
|
}
|
|
}
|
|
|
|
log.Info().Int("users_notified", usersNotified).Msg("Daily digest notifications completed")
|
|
return nil
|
|
}
|
|
|
|
// EmailPayload represents the payload for email tasks
|
|
type EmailPayload struct {
|
|
To string `json:"to"`
|
|
Subject string `json:"subject"`
|
|
HTMLBody string `json:"html_body"`
|
|
TextBody string `json:"text_body"`
|
|
}
|
|
|
|
// HandleSendEmail processes email sending tasks
|
|
func (h *Handler) HandleSendEmail(ctx context.Context, task *asynq.Task) error {
|
|
var payload EmailPayload
|
|
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
|
|
return fmt.Errorf("failed to unmarshal payload: %w", err)
|
|
}
|
|
|
|
log.Info().
|
|
Str("to", payload.To).
|
|
Str("subject", payload.Subject).
|
|
Msg("Sending email...")
|
|
|
|
if h.emailService == nil {
|
|
log.Warn().Msg("Email service not configured, skipping email")
|
|
return nil
|
|
}
|
|
|
|
// Use the email service to send the email
|
|
if err := h.emailService.SendEmail(payload.To, payload.Subject, payload.HTMLBody, payload.TextBody); err != nil {
|
|
log.Error().Err(err).Str("to", payload.To).Msg("Failed to send email")
|
|
return err
|
|
}
|
|
|
|
log.Info().Str("to", payload.To).Msg("Email sent successfully")
|
|
return nil
|
|
}
|
|
|
|
// PushPayload represents the payload for push notification tasks
|
|
type PushPayload struct {
|
|
UserID uint `json:"user_id"`
|
|
Title string `json:"title"`
|
|
Message string `json:"message"`
|
|
Data map[string]string `json:"data,omitempty"`
|
|
}
|
|
|
|
// HandleSendPush processes push notification tasks
|
|
func (h *Handler) HandleSendPush(ctx context.Context, task *asynq.Task) error {
|
|
var payload PushPayload
|
|
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
|
|
return fmt.Errorf("failed to unmarshal payload: %w", err)
|
|
}
|
|
|
|
log.Info().
|
|
Uint("user_id", payload.UserID).
|
|
Str("title", payload.Title).
|
|
Msg("Sending push notification...")
|
|
|
|
if err := h.sendPushToUser(ctx, payload.UserID, payload.Title, payload.Message, payload.Data); err != nil {
|
|
log.Error().Err(err).Uint("user_id", payload.UserID).Msg("Failed to send push notification")
|
|
return err
|
|
}
|
|
|
|
log.Info().Uint("user_id", payload.UserID).Msg("Push notification sent successfully")
|
|
return nil
|
|
}
|
|
|
|
// sendPushToUser sends a push notification to all of a user's devices
|
|
func (h *Handler) sendPushToUser(ctx context.Context, userID uint, title, message string, data map[string]string) error {
|
|
if h.pushClient == nil {
|
|
log.Warn().Msg("Push client not configured, skipping notification")
|
|
return nil
|
|
}
|
|
|
|
// Get active device tokens via repository
|
|
iosTokens, androidTokens, err := h.notificationRepo.GetActiveTokensForUser(userID)
|
|
if err != nil {
|
|
log.Error().Err(err).Uint("user_id", userID).Msg("Failed to get device tokens")
|
|
return err
|
|
}
|
|
|
|
if len(iosTokens) == 0 && len(androidTokens) == 0 {
|
|
log.Debug().Uint("user_id", userID).Msg("No device tokens found for user")
|
|
return nil
|
|
}
|
|
|
|
log.Debug().
|
|
Uint("user_id", userID).
|
|
Int("ios_tokens", len(iosTokens)).
|
|
Int("android_tokens", len(androidTokens)).
|
|
Msg("Sending push to user devices")
|
|
|
|
// Send to all devices
|
|
return h.pushClient.SendToAll(ctx, iosTokens, androidTokens, title, message, data)
|
|
}
|
|
|
|
// NewSendEmailTask creates a new email sending task
|
|
func NewSendEmailTask(to, subject, htmlBody, textBody string) (*asynq.Task, error) {
|
|
payload, err := json.Marshal(EmailPayload{
|
|
To: to,
|
|
Subject: subject,
|
|
HTMLBody: htmlBody,
|
|
TextBody: textBody,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return asynq.NewTask(TypeSendEmail, payload), nil
|
|
}
|
|
|
|
// NewSendPushTask creates a new push notification task
|
|
func NewSendPushTask(userID uint, title, message string, data map[string]string) (*asynq.Task, error) {
|
|
payload, err := json.Marshal(PushPayload{
|
|
UserID: userID,
|
|
Title: title,
|
|
Message: message,
|
|
Data: data,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return asynq.NewTask(TypeSendPush, payload), nil
|
|
}
|
|
|
|
// HandleOnboardingEmails processes onboarding email campaigns
|
|
// Sends emails to:
|
|
// 1. Users who registered 2+ days ago but haven't created a residence
|
|
// 2. Users who created a residence 5+ days ago but haven't created any tasks
|
|
// Each email type is only sent once per user, ever.
|
|
func (h *Handler) HandleOnboardingEmails(ctx context.Context, task *asynq.Task) error {
|
|
if !h.config.Features.OnboardingEmailsEnabled {
|
|
log.Debug().Msg("Onboarding emails disabled by feature flag, skipping")
|
|
return nil
|
|
}
|
|
|
|
log.Info().Msg("Processing onboarding emails...")
|
|
|
|
if h.onboardingService == nil {
|
|
log.Warn().Msg("Onboarding email service not configured, skipping")
|
|
return nil
|
|
}
|
|
|
|
// Send no-residence emails (users without any residences after 2 days)
|
|
noResCount, noResErr := h.onboardingService.CheckAndSendNoResidenceEmails()
|
|
if noResErr != nil {
|
|
log.Error().Err(noResErr).Msg("Failed to process no-residence onboarding emails")
|
|
} else {
|
|
log.Info().Int("count", noResCount).Msg("Sent no-residence onboarding emails")
|
|
}
|
|
|
|
// Send no-tasks emails (users with residence but no tasks after 5 days)
|
|
noTasksCount, noTasksErr := h.onboardingService.CheckAndSendNoTasksEmails()
|
|
if noTasksErr != nil {
|
|
log.Error().Err(noTasksErr).Msg("Failed to process no-tasks onboarding emails")
|
|
} else {
|
|
log.Info().Int("count", noTasksCount).Msg("Sent no-tasks onboarding emails")
|
|
}
|
|
|
|
log.Info().
|
|
Int("no_residence_sent", noResCount).
|
|
Int("no_tasks_sent", noTasksCount).
|
|
Msg("Onboarding email processing completed")
|
|
|
|
// If all sub-tasks failed, return an error so Asynq retries
|
|
if noResErr != nil && noTasksErr != nil {
|
|
return fmt.Errorf("all onboarding email sub-tasks failed: no-residence: %w, no-tasks: %v", noResErr, noTasksErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// userReminderPrefs holds a user's notification preferences for smart reminders
|
|
type userReminderPrefs struct {
|
|
UserID uint `gorm:"column:user_id"`
|
|
WantsDueSoon bool `gorm:"column:wants_due_soon"`
|
|
WantsOverdue bool `gorm:"column:wants_overdue"`
|
|
}
|
|
|
|
// HandleSmartReminder processes frequency-aware task reminders.
|
|
// Features:
|
|
// 1. Single query to get users who want either notification type at current hour
|
|
// 2. Single query to get both due-soon AND overdue tasks for those users
|
|
// 3. Uses frequency-based schedules (weekly = day-of only, annual = 30d, 14d, 7d, day-of)
|
|
// 4. Tracks sent reminders to prevent duplicates
|
|
// 5. Tapers off overdue reminders (daily for 3 days, then every 3 days, stop after 14)
|
|
func (h *Handler) HandleSmartReminder(ctx context.Context, task *asynq.Task) error {
|
|
log.Info().Msg("Processing smart task reminders...")
|
|
|
|
now := time.Now().UTC()
|
|
currentHour := now.Hour()
|
|
|
|
dueSoonDefault := h.config.Worker.TaskReminderHour
|
|
overdueDefault := h.config.Worker.OverdueReminderHour
|
|
|
|
log.Info().
|
|
Int("current_hour", currentHour).
|
|
Int("due_soon_default", dueSoonDefault).
|
|
Int("overdue_default", overdueDefault).
|
|
Msg("Smart reminder check")
|
|
|
|
// Step 1: Single query to get all users who want ANY notification type at this hour
|
|
// Each user gets flags for which types they want
|
|
var userPrefs []userReminderPrefs
|
|
|
|
// Build hour matching conditions (reused in SELECT and WHERE)
|
|
// User matches if: they set this hour explicitly, OR they have no preference and current hour is the default
|
|
dueSoonHourMatch := fmt.Sprintf(
|
|
"(task_due_soon_hour IS NULL AND %d = %d) OR task_due_soon_hour = %d",
|
|
currentHour, dueSoonDefault, currentHour,
|
|
)
|
|
overdueHourMatch := fmt.Sprintf(
|
|
"(task_overdue_hour IS NULL AND %d = %d) OR task_overdue_hour = %d",
|
|
currentHour, overdueDefault, currentHour,
|
|
)
|
|
|
|
query := fmt.Sprintf(`
|
|
SELECT
|
|
user_id,
|
|
(task_due_soon = true AND (%s)) as wants_due_soon,
|
|
(task_overdue = true AND (%s)) as wants_overdue
|
|
FROM notifications_notificationpreference
|
|
WHERE (task_due_soon = true AND (%s))
|
|
OR (task_overdue = true AND (%s))
|
|
`, dueSoonHourMatch, overdueHourMatch, dueSoonHourMatch, overdueHourMatch)
|
|
|
|
err := h.db.Raw(query).Scan(&userPrefs).Error
|
|
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to query user notification preferences")
|
|
return err
|
|
}
|
|
|
|
if len(userPrefs) == 0 {
|
|
log.Debug().Int("hour", currentHour).Msg("No users scheduled for any reminder type this hour")
|
|
return nil
|
|
}
|
|
|
|
// Build lookup maps for quick access
|
|
userWantsDueSoon := make(map[uint]bool)
|
|
userWantsOverdue := make(map[uint]bool)
|
|
var allUserIDs []uint
|
|
|
|
for _, pref := range userPrefs {
|
|
allUserIDs = append(allUserIDs, pref.UserID)
|
|
if pref.WantsDueSoon {
|
|
userWantsDueSoon[pref.UserID] = true
|
|
}
|
|
if pref.WantsOverdue {
|
|
userWantsOverdue[pref.UserID] = true
|
|
}
|
|
}
|
|
|
|
log.Info().
|
|
Int("total_users", len(allUserIDs)).
|
|
Int("want_due_soon", len(userWantsDueSoon)).
|
|
Int("want_overdue", len(userWantsOverdue)).
|
|
Msg("Found users eligible for reminders")
|
|
|
|
// Build per-user "today" using their timezone preference
|
|
// This ensures reminder stage calculations (overdue, due-soon) match the user's local date
|
|
userToday := make(map[uint]time.Time, len(allUserIDs))
|
|
utcToday := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
|
|
for _, uid := range allUserIDs {
|
|
prefs, err := h.notificationRepo.FindPreferencesByUser(uid)
|
|
if err == nil && prefs.Timezone != nil {
|
|
if loc, locErr := time.LoadLocation(*prefs.Timezone); locErr == nil {
|
|
userNowInTz := time.Now().In(loc)
|
|
userToday[uid] = time.Date(userNowInTz.Year(), userNowInTz.Month(), userNowInTz.Day(), 0, 0, 0, 0, loc)
|
|
continue
|
|
}
|
|
}
|
|
userToday[uid] = utcToday // Fallback to UTC
|
|
}
|
|
|
|
// Step 2: Single query to get ALL active tasks (both due-soon and overdue) for these users
|
|
opts := repositories.TaskFilterOptions{
|
|
UserIDs: allUserIDs,
|
|
IncludeInProgress: true,
|
|
PreloadResidence: true,
|
|
PreloadCompletions: true,
|
|
PreloadFrequency: true,
|
|
}
|
|
|
|
activeTasks, err := h.taskRepo.GetActiveTasksForUsers(now, opts)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to query active tasks")
|
|
return err
|
|
}
|
|
|
|
log.Info().Int("count", len(activeTasks)).Msg("Found active tasks for eligible users")
|
|
|
|
// Step 3: Pre-process tasks to determine stages and build batch reminder check
|
|
type candidateReminder struct {
|
|
taskIndex int
|
|
userID uint
|
|
effectiveDate time.Time
|
|
stage string
|
|
isOverdue bool
|
|
reminderStage models.ReminderStage
|
|
}
|
|
|
|
var candidates []candidateReminder
|
|
var reminderKeys []repositories.ReminderKey
|
|
|
|
for i, t := range activeTasks {
|
|
// Determine which user to notify
|
|
var userID uint
|
|
if t.AssignedToID != nil {
|
|
userID = *t.AssignedToID
|
|
} else if t.Residence.ID != 0 {
|
|
userID = t.Residence.OwnerID
|
|
} else {
|
|
continue
|
|
}
|
|
|
|
// Get the effective due date
|
|
var effectiveDate time.Time
|
|
if t.NextDueDate != nil {
|
|
effectiveDate = *t.NextDueDate
|
|
} else if t.DueDate != nil {
|
|
effectiveDate = *t.DueDate
|
|
} else {
|
|
continue
|
|
}
|
|
|
|
// Get frequency interval days
|
|
var frequencyDays *int
|
|
if t.Frequency != nil && t.Frequency.Days != nil {
|
|
days := int(*t.Frequency.Days)
|
|
frequencyDays = &days
|
|
} else if t.CustomIntervalDays != nil {
|
|
days := int(*t.CustomIntervalDays)
|
|
frequencyDays = &days
|
|
}
|
|
|
|
// Determine which reminder stage applies today using the user's local date
|
|
today := userToday[userID]
|
|
stage := notifications.GetReminderStageForToday(effectiveDate, frequencyDays, today)
|
|
if stage == "" {
|
|
continue
|
|
}
|
|
|
|
// Determine if this is an overdue or due-soon stage
|
|
isOverdueStage := len(stage) >= 7 && stage[:7] == "overdue"
|
|
|
|
// Check if user wants this notification type
|
|
if isOverdueStage && !userWantsOverdue[userID] {
|
|
continue
|
|
}
|
|
if !isOverdueStage && !userWantsDueSoon[userID] {
|
|
continue
|
|
}
|
|
|
|
reminderStage := models.ReminderStage(stage)
|
|
|
|
candidates = append(candidates, candidateReminder{
|
|
taskIndex: i,
|
|
userID: userID,
|
|
effectiveDate: effectiveDate,
|
|
stage: stage,
|
|
isOverdue: isOverdueStage,
|
|
reminderStage: reminderStage,
|
|
})
|
|
|
|
reminderKeys = append(reminderKeys, repositories.ReminderKey{
|
|
TaskID: t.ID,
|
|
UserID: userID,
|
|
DueDate: effectiveDate,
|
|
Stage: reminderStage,
|
|
})
|
|
}
|
|
|
|
// Batch check which reminders have already been sent (single query)
|
|
alreadySentMap, err := h.reminderRepo.HasSentReminderBatch(reminderKeys)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to batch check reminder logs")
|
|
return err
|
|
}
|
|
|
|
// Step 4: Send notifications for candidates that haven't been sent yet
|
|
var dueSoonSent, dueSoonSkipped, overdueSent, overdueSkipped int
|
|
|
|
for i, c := range candidates {
|
|
if alreadySentMap[i] {
|
|
if c.isOverdue {
|
|
overdueSkipped++
|
|
} else {
|
|
dueSoonSkipped++
|
|
}
|
|
continue
|
|
}
|
|
|
|
t := activeTasks[c.taskIndex]
|
|
|
|
// Determine notification type
|
|
var notificationType models.NotificationType
|
|
if c.isOverdue {
|
|
notificationType = models.NotificationTaskOverdue
|
|
} else {
|
|
notificationType = models.NotificationTaskDueSoon
|
|
}
|
|
|
|
// Log the reminder BEFORE sending so we have a record even if the send crashes
|
|
if _, err := h.reminderRepo.LogReminder(t.ID, c.userID, c.effectiveDate, c.reminderStage, nil); err != nil {
|
|
log.Error().Err(err).Uint("task_id", t.ID).Str("stage", c.stage).Msg("Failed to log reminder")
|
|
}
|
|
|
|
// Send notification
|
|
if err := h.notificationService.CreateAndSendTaskNotification(ctx, c.userID, notificationType, &t); err != nil {
|
|
log.Error().Err(err).
|
|
Uint("user_id", c.userID).
|
|
Uint("task_id", t.ID).
|
|
Str("stage", c.stage).
|
|
Msg("Failed to send smart reminder")
|
|
continue
|
|
}
|
|
|
|
if c.isOverdue {
|
|
overdueSent++
|
|
} else {
|
|
dueSoonSent++
|
|
}
|
|
}
|
|
|
|
log.Info().
|
|
Int("due_soon_sent", dueSoonSent).
|
|
Int("due_soon_skipped", dueSoonSkipped).
|
|
Int("overdue_sent", overdueSent).
|
|
Int("overdue_skipped", overdueSkipped).
|
|
Msg("Smart reminder notifications completed")
|
|
|
|
return nil
|
|
}
|
|
|
|
// HandleReminderLogCleanup cleans up old reminder logs to prevent table bloat
|
|
func (h *Handler) HandleReminderLogCleanup(ctx context.Context, task *asynq.Task) error {
|
|
log.Info().Msg("Processing reminder log cleanup...")
|
|
|
|
// Clean up logs older than 90 days
|
|
deleted, err := h.reminderRepo.CleanupOldLogs(90)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to cleanup old reminder logs")
|
|
return err
|
|
}
|
|
|
|
log.Info().Int64("deleted", deleted).Msg("Reminder log cleanup completed")
|
|
return nil
|
|
}
|