diff --git a/admin/src/app/(dashboard)/settings/page.tsx b/admin/src/app/(dashboard)/settings/page.tsx index 4409c3c..dfaba6a 100644 --- a/admin/src/app/(dashboard)/settings/page.tsx +++ b/admin/src/app/(dashboard)/settings/page.tsx @@ -1,7 +1,7 @@ 'use client'; import { useMutation } from '@tanstack/react-query'; -import { Database, TestTube, Trash2 } from 'lucide-react'; +import { Database, TestTube, Trash2, RefreshCw } from 'lucide-react'; import { settingsApi } from '@/lib/api'; import { Button } from '@/components/ui/button'; @@ -56,6 +56,20 @@ export default function SettingsPage() { }, }); + const clearStuckJobsMutation = useMutation({ + mutationFn: settingsApi.clearStuckJobs, + onSuccess: (data) => { + if (data.keys_deleted > 0) { + toast.success(`${data.message} (${data.keys_deleted} keys cleared)`); + } else { + toast.info('No stuck jobs found'); + } + }, + onError: () => { + toast.error('Failed to clear stuck jobs'); + }, + }); + return (
@@ -163,6 +177,53 @@ export default function SettingsPage() { + {/* Clear Stuck Jobs */} + + + + + Clear Stuck Jobs + + + Clear stuck or failed background worker jobs from Redis + + + + + + + + + + Clear Stuck Jobs? + + This will clear stuck/failed worker jobs from Redis including: +
    +
  • Jobs in the retry queue
  • +
  • Archived/dead letter jobs
  • +
  • Orphaned task metadata
  • +
+ + This is safe to run and can fix issues with notifications not running at expected times. + +
+
+ + Cancel + clearStuckJobsMutation.mutate()}> + Clear Stuck Jobs + + +
+
+
+
+ {/* Clear All Data */} diff --git a/admin/src/lib/api.ts b/admin/src/lib/api.ts index 07a93a8..8557d86 100644 --- a/admin/src/lib/api.ts +++ b/admin/src/lib/api.ts @@ -684,6 +684,11 @@ export const settingsApi = { const response = await api.post<{ message: string; users_deleted: number; preserved_users: number }>('/settings/clear-all-data'); return response.data; }, + + clearStuckJobs: async (): Promise<{ message: string; keys_deleted: number; deleted_keys: string[] }> => { + const response = await api.post<{ message: string; keys_deleted: number; deleted_keys: string[] }>('/settings/clear-stuck-jobs'); + return response.data; + }, }; // Limitations types diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 766125c..fe0db08 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -2,10 +2,10 @@ package main import ( "context" + "fmt" "os" "os/signal" "syscall" - "time" "github.com/hibiken/asynq" "github.com/rs/zerolog/log" @@ -105,21 +105,21 @@ func main() { scheduler := asynq.NewScheduler(redisOpt, nil) // Schedule task reminder notifications - reminderCron := formatCron(cfg.Worker.TaskReminderHour, cfg.Worker.TaskReminderMinute) + reminderCron := formatCron(cfg.Worker.TaskReminderHour) if _, err := scheduler.Register(reminderCron, asynq.NewTask(jobs.TypeTaskReminder, nil)); err != nil { log.Fatal().Err(err).Msg("Failed to register task reminder job") } log.Info().Str("cron", reminderCron).Msg("Registered task reminder job") - // Schedule overdue reminder at 9 AM UTC - overdueCron := formatCron(cfg.Worker.OverdueReminderHour, 0) + // Schedule overdue reminder + overdueCron := formatCron(cfg.Worker.OverdueReminderHour) if _, err := scheduler.Register(overdueCron, asynq.NewTask(jobs.TypeOverdueReminder, nil)); err != nil { log.Fatal().Err(err).Msg("Failed to register overdue reminder job") } log.Info().Str("cron", overdueCron).Msg("Registered overdue reminder job") - // Schedule daily digest at 11 AM UTC - dailyCron := formatCron(cfg.Worker.DailyNotifHour, 0) + // Schedule daily digest + dailyCron := formatCron(cfg.Worker.DailyNotifHour) if _, err := scheduler.Register(dailyCron, asynq.NewTask(jobs.TypeDailyDigest, nil)); err != nil { log.Fatal().Err(err).Msg("Failed to register daily digest job") } @@ -154,7 +154,7 @@ func main() { log.Info().Msg("Worker stopped") } -// formatCron creates a cron expression for a specific hour and minute (UTC) -func formatCron(hour, minute int) string { - return time.Date(0, 1, 1, hour, minute, 0, 0, time.UTC).Format("4 15 * * *") +// formatCron creates a cron expression for a specific hour (runs at minute 0) +func formatCron(hour int) string { + return fmt.Sprintf("0 %02d * * *", hour) } diff --git a/internal/admin/handlers/settings_handler.go b/internal/admin/handlers/settings_handler.go index 1b6e3d5..0446d37 100644 --- a/internal/admin/handlers/settings_handler.go +++ b/internal/admin/handlers/settings_handler.go @@ -467,6 +467,75 @@ type ClearAllDataResponse struct { PreservedUsers int64 `json:"preserved_users"` } +// ClearStuckJobsResponse represents the response after clearing stuck Redis jobs +type ClearStuckJobsResponse struct { + Message string `json:"message"` + KeysDeleted int `json:"keys_deleted"` + DeletedKeys []string `json:"deleted_keys"` +} + +// ClearStuckJobs handles POST /api/admin/settings/clear-stuck-jobs +// This clears stuck/failed asynq worker jobs from Redis +func (h *AdminSettingsHandler) ClearStuckJobs(c *gin.Context) { + cache := services.GetCache() + if cache == nil { + c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Redis cache not available"}) + return + } + + ctx := c.Request.Context() + client := cache.Client() + + var deletedKeys []string + + // Patterns for asynq job keys that can get stuck + patterns := []string{ + "asynq:{default}:retry", // Retry queue + "asynq:{default}:archived", // Archived/dead jobs + "asynq:{default}:t:*", // Individual task metadata + } + + for _, pattern := range patterns { + if strings.Contains(pattern, "*") { + // Use SCAN for patterns with wildcards + iter := client.Scan(ctx, 0, pattern, 0).Iterator() + for iter.Next(ctx) { + key := iter.Val() + if err := client.Del(ctx, key).Err(); err != nil { + log.Warn().Err(err).Str("key", key).Msg("Failed to delete key") + continue + } + deletedKeys = append(deletedKeys, key) + } + if err := iter.Err(); err != nil { + log.Warn().Err(err).Str("pattern", pattern).Msg("Error scanning keys") + } + } else { + // Direct key deletion + exists, err := client.Exists(ctx, pattern).Result() + if err != nil { + log.Warn().Err(err).Str("key", pattern).Msg("Failed to check key existence") + continue + } + if exists > 0 { + if err := client.Del(ctx, pattern).Err(); err != nil { + log.Warn().Err(err).Str("key", pattern).Msg("Failed to delete key") + continue + } + deletedKeys = append(deletedKeys, pattern) + } + } + } + + log.Info().Int("count", len(deletedKeys)).Strs("keys", deletedKeys).Msg("Cleared stuck Redis jobs") + + c.JSON(http.StatusOK, ClearStuckJobsResponse{ + Message: "Stuck jobs cleared successfully", + KeysDeleted: len(deletedKeys), + DeletedKeys: deletedKeys, + }) +} + // ClearAllData handles POST /api/admin/settings/clear-all-data // This clears all data except super admin accounts and lookup tables func (h *AdminSettingsHandler) ClearAllData(c *gin.Context) { diff --git a/internal/admin/routes.go b/internal/admin/routes.go index 7acda48..4f774cd 100644 --- a/internal/admin/routes.go +++ b/internal/admin/routes.go @@ -342,6 +342,7 @@ func SetupRoutes(router *gin.Engine, db *gorm.DB, cfg *config.Config, deps *Depe settings.POST("/seed-test-data", settingsHandler.SeedTestData) settings.POST("/seed-task-templates", settingsHandler.SeedTaskTemplates) settings.POST("/clear-all-data", settingsHandler.ClearAllData) + settings.POST("/clear-stuck-jobs", settingsHandler.ClearStuckJobs) } // Limitations management (tier limits, upgrade triggers) diff --git a/internal/config/config.go b/internal/config/config.go index a7eaf43..35d4340 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -80,7 +80,6 @@ type AppleAuthConfig struct { type WorkerConfig struct { // Scheduled job times (UTC) TaskReminderHour int - TaskReminderMinute int OverdueReminderHour int DailyNotifHour int } @@ -173,7 +172,6 @@ func Load() (*Config, error) { }, Worker: WorkerConfig{ TaskReminderHour: viper.GetInt("TASK_REMINDER_HOUR"), - TaskReminderMinute: viper.GetInt("TASK_REMINDER_MINUTE"), OverdueReminderHour: viper.GetInt("OVERDUE_REMINDER_HOUR"), DailyNotifHour: viper.GetInt("DAILY_DIGEST_HOUR"), }, @@ -244,11 +242,10 @@ func setDefaults() { viper.SetDefault("APNS_USE_SANDBOX", true) viper.SetDefault("APNS_PRODUCTION", false) - // Worker defaults (all times in UTC) - viper.SetDefault("TASK_REMINDER_HOUR", 20) // 8:00 PM UTC - viper.SetDefault("TASK_REMINDER_MINUTE", 0) - viper.SetDefault("OVERDUE_REMINDER_HOUR", 9) // 9:00 AM UTC - viper.SetDefault("DAILY_DIGEST_HOUR", 11) // 11:00 AM UTC + // Worker defaults (all times in UTC, jobs run at minute 0) + viper.SetDefault("TASK_REMINDER_HOUR", 14) // 8:00 PM UTC + viper.SetDefault("OVERDUE_REMINDER_HOUR", 15) // 9:00 AM UTC + viper.SetDefault("DAILY_DIGEST_HOUR", 3) // 3:00 AM UTC // Storage defaults viper.SetDefault("STORAGE_UPLOAD_DIR", "./uploads")