Add clear stuck jobs admin feature and simplify worker scheduling
- Add POST /api/admin/settings/clear-stuck-jobs endpoint to clear stuck/failed asynq worker jobs from Redis (retry queue, archived, orphaned task metadata) - Add "Clear Stuck Jobs" button to admin settings UI - Remove TASK_REMINDER_MINUTE config - all jobs now run at minute 0 - Simplify formatCron to only take hour parameter - Update default notification times to CST-friendly hours 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
'use client';
|
'use client';
|
||||||
|
|
||||||
import { useMutation } from '@tanstack/react-query';
|
import { useMutation } from '@tanstack/react-query';
|
||||||
import { Database, TestTube, Trash2 } from 'lucide-react';
|
import { Database, TestTube, Trash2, RefreshCw } from 'lucide-react';
|
||||||
|
|
||||||
import { settingsApi } from '@/lib/api';
|
import { settingsApi } from '@/lib/api';
|
||||||
import { Button } from '@/components/ui/button';
|
import { Button } from '@/components/ui/button';
|
||||||
@@ -56,6 +56,20 @@ export default function SettingsPage() {
|
|||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const clearStuckJobsMutation = useMutation({
|
||||||
|
mutationFn: settingsApi.clearStuckJobs,
|
||||||
|
onSuccess: (data) => {
|
||||||
|
if (data.keys_deleted > 0) {
|
||||||
|
toast.success(`${data.message} (${data.keys_deleted} keys cleared)`);
|
||||||
|
} else {
|
||||||
|
toast.info('No stuck jobs found');
|
||||||
|
}
|
||||||
|
},
|
||||||
|
onError: () => {
|
||||||
|
toast.error('Failed to clear stuck jobs');
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<div className="p-6 space-y-6">
|
<div className="p-6 space-y-6">
|
||||||
<div>
|
<div>
|
||||||
@@ -163,6 +177,53 @@ export default function SettingsPage() {
|
|||||||
</CardContent>
|
</CardContent>
|
||||||
</Card>
|
</Card>
|
||||||
|
|
||||||
|
{/* Clear Stuck Jobs */}
|
||||||
|
<Card>
|
||||||
|
<CardHeader>
|
||||||
|
<CardTitle className="flex items-center gap-2">
|
||||||
|
<RefreshCw className="h-5 w-5" />
|
||||||
|
Clear Stuck Jobs
|
||||||
|
</CardTitle>
|
||||||
|
<CardDescription>
|
||||||
|
Clear stuck or failed background worker jobs from Redis
|
||||||
|
</CardDescription>
|
||||||
|
</CardHeader>
|
||||||
|
<CardContent>
|
||||||
|
<AlertDialog>
|
||||||
|
<AlertDialogTrigger asChild>
|
||||||
|
<Button
|
||||||
|
variant="outline"
|
||||||
|
disabled={clearStuckJobsMutation.isPending}
|
||||||
|
>
|
||||||
|
{clearStuckJobsMutation.isPending ? 'Clearing...' : 'Clear Stuck Jobs'}
|
||||||
|
</Button>
|
||||||
|
</AlertDialogTrigger>
|
||||||
|
<AlertDialogContent>
|
||||||
|
<AlertDialogHeader>
|
||||||
|
<AlertDialogTitle>Clear Stuck Jobs?</AlertDialogTitle>
|
||||||
|
<AlertDialogDescription>
|
||||||
|
This will clear stuck/failed worker jobs from Redis including:
|
||||||
|
<ul className="list-disc list-inside mt-2 space-y-1">
|
||||||
|
<li>Jobs in the retry queue</li>
|
||||||
|
<li>Archived/dead letter jobs</li>
|
||||||
|
<li>Orphaned task metadata</li>
|
||||||
|
</ul>
|
||||||
|
<span className="block mt-2">
|
||||||
|
This is safe to run and can fix issues with notifications not running at expected times.
|
||||||
|
</span>
|
||||||
|
</AlertDialogDescription>
|
||||||
|
</AlertDialogHeader>
|
||||||
|
<AlertDialogFooter>
|
||||||
|
<AlertDialogCancel>Cancel</AlertDialogCancel>
|
||||||
|
<AlertDialogAction onClick={() => clearStuckJobsMutation.mutate()}>
|
||||||
|
Clear Stuck Jobs
|
||||||
|
</AlertDialogAction>
|
||||||
|
</AlertDialogFooter>
|
||||||
|
</AlertDialogContent>
|
||||||
|
</AlertDialog>
|
||||||
|
</CardContent>
|
||||||
|
</Card>
|
||||||
|
|
||||||
{/* Clear All Data */}
|
{/* Clear All Data */}
|
||||||
<Card className="border-destructive">
|
<Card className="border-destructive">
|
||||||
<CardHeader>
|
<CardHeader>
|
||||||
|
|||||||
@@ -684,6 +684,11 @@ export const settingsApi = {
|
|||||||
const response = await api.post<{ message: string; users_deleted: number; preserved_users: number }>('/settings/clear-all-data');
|
const response = await api.post<{ message: string; users_deleted: number; preserved_users: number }>('/settings/clear-all-data');
|
||||||
return response.data;
|
return response.data;
|
||||||
},
|
},
|
||||||
|
|
||||||
|
clearStuckJobs: async (): Promise<{ message: string; keys_deleted: number; deleted_keys: string[] }> => {
|
||||||
|
const response = await api.post<{ message: string; keys_deleted: number; deleted_keys: string[] }>('/settings/clear-stuck-jobs');
|
||||||
|
return response.data;
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
// Limitations types
|
// Limitations types
|
||||||
|
|||||||
@@ -2,10 +2,10 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/hibiken/asynq"
|
"github.com/hibiken/asynq"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
@@ -105,21 +105,21 @@ func main() {
|
|||||||
scheduler := asynq.NewScheduler(redisOpt, nil)
|
scheduler := asynq.NewScheduler(redisOpt, nil)
|
||||||
|
|
||||||
// Schedule task reminder notifications
|
// Schedule task reminder notifications
|
||||||
reminderCron := formatCron(cfg.Worker.TaskReminderHour, cfg.Worker.TaskReminderMinute)
|
reminderCron := formatCron(cfg.Worker.TaskReminderHour)
|
||||||
if _, err := scheduler.Register(reminderCron, asynq.NewTask(jobs.TypeTaskReminder, nil)); err != nil {
|
if _, err := scheduler.Register(reminderCron, asynq.NewTask(jobs.TypeTaskReminder, nil)); err != nil {
|
||||||
log.Fatal().Err(err).Msg("Failed to register task reminder job")
|
log.Fatal().Err(err).Msg("Failed to register task reminder job")
|
||||||
}
|
}
|
||||||
log.Info().Str("cron", reminderCron).Msg("Registered task reminder job")
|
log.Info().Str("cron", reminderCron).Msg("Registered task reminder job")
|
||||||
|
|
||||||
// Schedule overdue reminder at 9 AM UTC
|
// Schedule overdue reminder
|
||||||
overdueCron := formatCron(cfg.Worker.OverdueReminderHour, 0)
|
overdueCron := formatCron(cfg.Worker.OverdueReminderHour)
|
||||||
if _, err := scheduler.Register(overdueCron, asynq.NewTask(jobs.TypeOverdueReminder, nil)); err != nil {
|
if _, err := scheduler.Register(overdueCron, asynq.NewTask(jobs.TypeOverdueReminder, nil)); err != nil {
|
||||||
log.Fatal().Err(err).Msg("Failed to register overdue reminder job")
|
log.Fatal().Err(err).Msg("Failed to register overdue reminder job")
|
||||||
}
|
}
|
||||||
log.Info().Str("cron", overdueCron).Msg("Registered overdue reminder job")
|
log.Info().Str("cron", overdueCron).Msg("Registered overdue reminder job")
|
||||||
|
|
||||||
// Schedule daily digest at 11 AM UTC
|
// Schedule daily digest
|
||||||
dailyCron := formatCron(cfg.Worker.DailyNotifHour, 0)
|
dailyCron := formatCron(cfg.Worker.DailyNotifHour)
|
||||||
if _, err := scheduler.Register(dailyCron, asynq.NewTask(jobs.TypeDailyDigest, nil)); err != nil {
|
if _, err := scheduler.Register(dailyCron, asynq.NewTask(jobs.TypeDailyDigest, nil)); err != nil {
|
||||||
log.Fatal().Err(err).Msg("Failed to register daily digest job")
|
log.Fatal().Err(err).Msg("Failed to register daily digest job")
|
||||||
}
|
}
|
||||||
@@ -154,7 +154,7 @@ func main() {
|
|||||||
log.Info().Msg("Worker stopped")
|
log.Info().Msg("Worker stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
// formatCron creates a cron expression for a specific hour and minute (UTC)
|
// formatCron creates a cron expression for a specific hour (runs at minute 0)
|
||||||
func formatCron(hour, minute int) string {
|
func formatCron(hour int) string {
|
||||||
return time.Date(0, 1, 1, hour, minute, 0, 0, time.UTC).Format("4 15 * * *")
|
return fmt.Sprintf("0 %02d * * *", hour)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -467,6 +467,75 @@ type ClearAllDataResponse struct {
|
|||||||
PreservedUsers int64 `json:"preserved_users"`
|
PreservedUsers int64 `json:"preserved_users"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ClearStuckJobsResponse represents the response after clearing stuck Redis jobs
|
||||||
|
type ClearStuckJobsResponse struct {
|
||||||
|
Message string `json:"message"`
|
||||||
|
KeysDeleted int `json:"keys_deleted"`
|
||||||
|
DeletedKeys []string `json:"deleted_keys"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClearStuckJobs handles POST /api/admin/settings/clear-stuck-jobs
|
||||||
|
// This clears stuck/failed asynq worker jobs from Redis
|
||||||
|
func (h *AdminSettingsHandler) ClearStuckJobs(c *gin.Context) {
|
||||||
|
cache := services.GetCache()
|
||||||
|
if cache == nil {
|
||||||
|
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Redis cache not available"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := c.Request.Context()
|
||||||
|
client := cache.Client()
|
||||||
|
|
||||||
|
var deletedKeys []string
|
||||||
|
|
||||||
|
// Patterns for asynq job keys that can get stuck
|
||||||
|
patterns := []string{
|
||||||
|
"asynq:{default}:retry", // Retry queue
|
||||||
|
"asynq:{default}:archived", // Archived/dead jobs
|
||||||
|
"asynq:{default}:t:*", // Individual task metadata
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, pattern := range patterns {
|
||||||
|
if strings.Contains(pattern, "*") {
|
||||||
|
// Use SCAN for patterns with wildcards
|
||||||
|
iter := client.Scan(ctx, 0, pattern, 0).Iterator()
|
||||||
|
for iter.Next(ctx) {
|
||||||
|
key := iter.Val()
|
||||||
|
if err := client.Del(ctx, key).Err(); err != nil {
|
||||||
|
log.Warn().Err(err).Str("key", key).Msg("Failed to delete key")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
deletedKeys = append(deletedKeys, key)
|
||||||
|
}
|
||||||
|
if err := iter.Err(); err != nil {
|
||||||
|
log.Warn().Err(err).Str("pattern", pattern).Msg("Error scanning keys")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Direct key deletion
|
||||||
|
exists, err := client.Exists(ctx, pattern).Result()
|
||||||
|
if err != nil {
|
||||||
|
log.Warn().Err(err).Str("key", pattern).Msg("Failed to check key existence")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if exists > 0 {
|
||||||
|
if err := client.Del(ctx, pattern).Err(); err != nil {
|
||||||
|
log.Warn().Err(err).Str("key", pattern).Msg("Failed to delete key")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
deletedKeys = append(deletedKeys, pattern)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info().Int("count", len(deletedKeys)).Strs("keys", deletedKeys).Msg("Cleared stuck Redis jobs")
|
||||||
|
|
||||||
|
c.JSON(http.StatusOK, ClearStuckJobsResponse{
|
||||||
|
Message: "Stuck jobs cleared successfully",
|
||||||
|
KeysDeleted: len(deletedKeys),
|
||||||
|
DeletedKeys: deletedKeys,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// ClearAllData handles POST /api/admin/settings/clear-all-data
|
// ClearAllData handles POST /api/admin/settings/clear-all-data
|
||||||
// This clears all data except super admin accounts and lookup tables
|
// This clears all data except super admin accounts and lookup tables
|
||||||
func (h *AdminSettingsHandler) ClearAllData(c *gin.Context) {
|
func (h *AdminSettingsHandler) ClearAllData(c *gin.Context) {
|
||||||
|
|||||||
@@ -342,6 +342,7 @@ func SetupRoutes(router *gin.Engine, db *gorm.DB, cfg *config.Config, deps *Depe
|
|||||||
settings.POST("/seed-test-data", settingsHandler.SeedTestData)
|
settings.POST("/seed-test-data", settingsHandler.SeedTestData)
|
||||||
settings.POST("/seed-task-templates", settingsHandler.SeedTaskTemplates)
|
settings.POST("/seed-task-templates", settingsHandler.SeedTaskTemplates)
|
||||||
settings.POST("/clear-all-data", settingsHandler.ClearAllData)
|
settings.POST("/clear-all-data", settingsHandler.ClearAllData)
|
||||||
|
settings.POST("/clear-stuck-jobs", settingsHandler.ClearStuckJobs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Limitations management (tier limits, upgrade triggers)
|
// Limitations management (tier limits, upgrade triggers)
|
||||||
|
|||||||
@@ -80,7 +80,6 @@ type AppleAuthConfig struct {
|
|||||||
type WorkerConfig struct {
|
type WorkerConfig struct {
|
||||||
// Scheduled job times (UTC)
|
// Scheduled job times (UTC)
|
||||||
TaskReminderHour int
|
TaskReminderHour int
|
||||||
TaskReminderMinute int
|
|
||||||
OverdueReminderHour int
|
OverdueReminderHour int
|
||||||
DailyNotifHour int
|
DailyNotifHour int
|
||||||
}
|
}
|
||||||
@@ -173,7 +172,6 @@ func Load() (*Config, error) {
|
|||||||
},
|
},
|
||||||
Worker: WorkerConfig{
|
Worker: WorkerConfig{
|
||||||
TaskReminderHour: viper.GetInt("TASK_REMINDER_HOUR"),
|
TaskReminderHour: viper.GetInt("TASK_REMINDER_HOUR"),
|
||||||
TaskReminderMinute: viper.GetInt("TASK_REMINDER_MINUTE"),
|
|
||||||
OverdueReminderHour: viper.GetInt("OVERDUE_REMINDER_HOUR"),
|
OverdueReminderHour: viper.GetInt("OVERDUE_REMINDER_HOUR"),
|
||||||
DailyNotifHour: viper.GetInt("DAILY_DIGEST_HOUR"),
|
DailyNotifHour: viper.GetInt("DAILY_DIGEST_HOUR"),
|
||||||
},
|
},
|
||||||
@@ -244,11 +242,10 @@ func setDefaults() {
|
|||||||
viper.SetDefault("APNS_USE_SANDBOX", true)
|
viper.SetDefault("APNS_USE_SANDBOX", true)
|
||||||
viper.SetDefault("APNS_PRODUCTION", false)
|
viper.SetDefault("APNS_PRODUCTION", false)
|
||||||
|
|
||||||
// Worker defaults (all times in UTC)
|
// Worker defaults (all times in UTC, jobs run at minute 0)
|
||||||
viper.SetDefault("TASK_REMINDER_HOUR", 20) // 8:00 PM UTC
|
viper.SetDefault("TASK_REMINDER_HOUR", 14) // 8:00 PM UTC
|
||||||
viper.SetDefault("TASK_REMINDER_MINUTE", 0)
|
viper.SetDefault("OVERDUE_REMINDER_HOUR", 15) // 9:00 AM UTC
|
||||||
viper.SetDefault("OVERDUE_REMINDER_HOUR", 9) // 9:00 AM UTC
|
viper.SetDefault("DAILY_DIGEST_HOUR", 3) // 3:00 AM UTC
|
||||||
viper.SetDefault("DAILY_DIGEST_HOUR", 11) // 11:00 AM UTC
|
|
||||||
|
|
||||||
// Storage defaults
|
// Storage defaults
|
||||||
viper.SetDefault("STORAGE_UPLOAD_DIR", "./uploads")
|
viper.SetDefault("STORAGE_UPLOAD_DIR", "./uploads")
|
||||||
|
|||||||
Reference in New Issue
Block a user