bc3da007db
Step 1 — OTel SDK: cmd/api and cmd/worker initialize a tracer provider that exports OTLP/HTTP to obs.88oakapps.com (Jaeger all-in-one). Sampling is AlwaysSample in dev (DEBUG=true) and TraceIDRatioBased(0.1) in prod, overridable via OTEL_TRACES_SAMPLER_ARG. Service names are honeydue-api and honeydue-worker. otelecho.Middleware opens a span per HTTP request. Step 2 — Manual spans: storage_service.Upload now takes ctx and emits storage.upload + b2.PutObject spans (size_bytes, key, mime_type, bucket, result attrs). APNs Send/SendWithCategory and FCM sendOne emit per-token spans with topic, status_code, reason. Asynq middleware emits asynq.handle:<task_type> per job with retry/payload attrs and records asynq_job_duration_seconds. Step 3 — Database: otelgorm plugin registered in database.Connect, so any SQL emitted via db.WithContext(ctx) attaches to the request span. Every repository now exposes WithContext(ctx) *XRepository as the migration helper. TaskService.ListTasks and GetTasksByResidence are migrated end-to-end (ctx threaded through handler → service → repo); remaining services adopt the same pattern incrementally — pre-migration methods still emit untraced SQL via the unchanged db field. OBS_TRACES_URL and OBS_INGEST_TOKEN flow from deploy/prod.env → honeydue-secrets → api+worker Deployments via secretKeyRef (optional). 02-setup-secrets.sh sources them from prod.env on next run; manifests mark both env vars optional so the deployment rolls without traces if the secret is absent. ch15 observability doc now lists what produces spans today vs the remaining migration work, with the explicit per-method pattern. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
313 lines
11 KiB
Go
313 lines
11 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/hibiken/asynq"
|
|
"github.com/redis/go-redis/v9"
|
|
"github.com/rs/zerolog/log"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
"go.opentelemetry.io/otel/trace"
|
|
|
|
"github.com/treytartt/honeydue-api/internal/config"
|
|
"github.com/treytartt/honeydue-api/internal/database"
|
|
"github.com/treytartt/honeydue-api/internal/monitoring"
|
|
"github.com/treytartt/honeydue-api/internal/prom"
|
|
"github.com/treytartt/honeydue-api/internal/push"
|
|
"github.com/treytartt/honeydue-api/internal/repositories"
|
|
"github.com/treytartt/honeydue-api/internal/services"
|
|
"github.com/treytartt/honeydue-api/internal/tracing"
|
|
"github.com/treytartt/honeydue-api/internal/worker/jobs"
|
|
"github.com/treytartt/honeydue-api/pkg/utils"
|
|
)
|
|
|
|
const workerHealthAddr = ":6060"
|
|
|
|
func main() {
|
|
// Initialize logger
|
|
utils.InitLogger(true)
|
|
|
|
// Load configuration
|
|
cfg, err := config.Load()
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Failed to load configuration")
|
|
}
|
|
|
|
// Check worker kill switch
|
|
if !cfg.Features.WorkerEnabled {
|
|
log.Warn().Msg("Worker disabled by FEATURE_WORKER_ENABLED=false — exiting")
|
|
os.Exit(0)
|
|
}
|
|
|
|
// Initialize OpenTelemetry tracing for the worker process. Same OTLP
|
|
// destination as the api; service.name distinguishes them in Jaeger.
|
|
tracingShutdown, err := tracing.Init(context.Background(), tracing.Config{
|
|
ServiceName: "honeydue-worker",
|
|
Environment: workerDeploymentEnv(cfg.Server.Debug),
|
|
EndpointURL: os.Getenv("OBS_TRACES_URL"),
|
|
BearerToken: os.Getenv("OBS_INGEST_TOKEN"),
|
|
SampleRatio: tracing.SampleRatioFromEnv(),
|
|
})
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("worker tracing init failed — continuing without traces")
|
|
}
|
|
defer func() {
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := tracingShutdown(shutdownCtx); err != nil {
|
|
log.Warn().Err(err).Msg("worker tracing shutdown error")
|
|
}
|
|
}()
|
|
asynqTracer := tracing.Tracer("honeydue/worker/asynq")
|
|
|
|
// Initialize database
|
|
db, err := database.Connect(&cfg.Database, cfg.Server.Debug)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Failed to connect to database")
|
|
}
|
|
log.Info().Msg("Connected to database")
|
|
|
|
// Get underlying *sql.DB for cleanup
|
|
sqlDB, _ := db.DB()
|
|
defer sqlDB.Close()
|
|
|
|
// Initialize push client (APNs + FCM)
|
|
var pushClient *push.Client
|
|
pushClient, err = push.NewClient(&cfg.Push, cfg.Features.PushEnabled)
|
|
if err != nil {
|
|
log.Warn().Err(err).Msg("Failed to initialize push client - push notifications disabled")
|
|
} else {
|
|
log.Info().
|
|
Bool("ios_enabled", pushClient.IsIOSEnabled()).
|
|
Bool("android_enabled", pushClient.IsAndroidEnabled()).
|
|
Msg("Push notification client initialized")
|
|
}
|
|
|
|
// Initialize email service (optional)
|
|
var emailService *services.EmailService
|
|
if cfg.Email.Host != "" {
|
|
emailService = services.NewEmailService(&cfg.Email, cfg.Features.EmailEnabled)
|
|
log.Info().Str("host", cfg.Email.Host).Msg("Email service initialized")
|
|
}
|
|
|
|
// Initialize notification service for actionable push notifications
|
|
notificationRepo := repositories.NewNotificationRepository(db)
|
|
notificationService := services.NewNotificationService(notificationRepo, pushClient)
|
|
log.Info().Msg("Notification service initialized")
|
|
|
|
// Parse Redis URL for Asynq
|
|
redisOpt, err := asynq.ParseRedisURI(cfg.Redis.URL)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Failed to parse Redis URL")
|
|
}
|
|
|
|
// Initialize monitoring service (if Redis is available)
|
|
var monitoringService *monitoring.Service
|
|
redisClientOpt, ok := redisOpt.(asynq.RedisClientOpt)
|
|
if ok {
|
|
redisClient := redis.NewClient(&redis.Options{
|
|
Addr: redisClientOpt.Addr,
|
|
Password: redisClientOpt.Password,
|
|
DB: redisClientOpt.DB,
|
|
})
|
|
|
|
// Verify Redis connection
|
|
if err := redisClient.Ping(context.Background()).Err(); err != nil {
|
|
log.Warn().Err(err).Msg("Failed to connect to Redis for monitoring - monitoring disabled")
|
|
} else {
|
|
monitoringService = monitoring.NewService(monitoring.Config{
|
|
Process: "worker",
|
|
RedisClient: redisClient,
|
|
DB: db, // Pass database for enable_monitoring setting sync
|
|
})
|
|
|
|
// Reinitialize logger with monitoring writer
|
|
utils.InitLoggerWithWriter(cfg.Server.Debug, monitoringService.LogWriter())
|
|
|
|
// Create Asynq inspector for queue statistics
|
|
inspector := asynq.NewInspector(redisOpt)
|
|
monitoringService.SetAsynqInspector(inspector)
|
|
|
|
// Start stats collection
|
|
monitoringService.Start()
|
|
defer monitoringService.Stop()
|
|
|
|
log.Info().
|
|
Bool("log_capture_enabled", monitoringService.IsEnabled()).
|
|
Msg("Monitoring service initialized")
|
|
}
|
|
}
|
|
|
|
// Create Asynq server
|
|
srv := asynq.NewServer(
|
|
redisOpt,
|
|
asynq.Config{
|
|
Concurrency: 10,
|
|
Queues: map[string]int{
|
|
"critical": 6,
|
|
"default": 3,
|
|
"low": 1,
|
|
},
|
|
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
|
|
log.Error().
|
|
Err(err).
|
|
Str("type", task.Type()).
|
|
Bytes("payload", task.Payload()).
|
|
Msg("Task processing failed")
|
|
}),
|
|
},
|
|
)
|
|
|
|
// Create job handler
|
|
jobHandler := jobs.NewHandler(db, pushClient, emailService, notificationService, cfg)
|
|
|
|
// Create Asynq mux and register handlers
|
|
mux := asynq.NewServeMux()
|
|
|
|
// Tracing + metrics middleware: every job runs inside a span and emits
|
|
// asynq_job_duration_seconds{task_type,result}.
|
|
mux.Use(asynqTracingMiddleware(asynqTracer))
|
|
|
|
mux.HandleFunc(jobs.TypeSmartReminder, jobHandler.HandleSmartReminder)
|
|
mux.HandleFunc(jobs.TypeDailyDigest, jobHandler.HandleDailyDigest)
|
|
mux.HandleFunc(jobs.TypeSendEmail, jobHandler.HandleSendEmail)
|
|
mux.HandleFunc(jobs.TypeSendPush, jobHandler.HandleSendPush)
|
|
mux.HandleFunc(jobs.TypeOnboardingEmails, jobHandler.HandleOnboardingEmails)
|
|
mux.HandleFunc(jobs.TypeReminderLogCleanup, jobHandler.HandleReminderLogCleanup)
|
|
|
|
// Register email job handlers (welcome, verification, password reset, password changed)
|
|
if emailService != nil {
|
|
emailJobHandler := jobs.NewEmailJobHandler(emailService)
|
|
emailJobHandler.RegisterHandlers(mux)
|
|
}
|
|
|
|
// Start scheduler for periodic tasks
|
|
scheduler := asynq.NewScheduler(redisOpt, nil)
|
|
|
|
// Schedule smart reminder notifications (runs every hour to support per-user custom times)
|
|
// Replaces old task reminder and overdue reminder with frequency-aware system
|
|
// Uses TaskReminderLog to prevent duplicate notifications
|
|
if _, err := scheduler.Register("0 * * * *", asynq.NewTask(jobs.TypeSmartReminder, nil)); err != nil {
|
|
log.Fatal().Err(err).Msg("Failed to register smart reminder job")
|
|
}
|
|
log.Info().Str("cron", "0 * * * *").Int("default_hour", cfg.Worker.TaskReminderHour).Msg("Registered smart reminder job (runs hourly for per-user times)")
|
|
|
|
// Schedule daily digest (runs every hour to support per-user custom times)
|
|
// The job handler filters users based on their preferred notification hour
|
|
if _, err := scheduler.Register("0 * * * *", asynq.NewTask(jobs.TypeDailyDigest, nil)); err != nil {
|
|
log.Fatal().Err(err).Msg("Failed to register daily digest job")
|
|
}
|
|
log.Info().Str("cron", "0 * * * *").Int("default_hour", cfg.Worker.DailyNotifHour).Msg("Registered daily digest job (runs hourly for per-user times)")
|
|
|
|
// Schedule onboarding emails (runs daily at 10:00 AM UTC)
|
|
// Sends emails to users who haven't created residences or tasks after registration
|
|
if _, err := scheduler.Register("0 10 * * *", asynq.NewTask(jobs.TypeOnboardingEmails, nil)); err != nil {
|
|
log.Fatal().Err(err).Msg("Failed to register onboarding emails job")
|
|
}
|
|
log.Info().Str("cron", "0 10 * * *").Msg("Registered onboarding emails job (runs daily at 10:00 AM UTC)")
|
|
|
|
// Schedule reminder log cleanup (runs daily at 3:00 AM UTC)
|
|
// Removes reminder logs older than 90 days to prevent table bloat
|
|
if _, err := scheduler.Register("0 3 * * *", asynq.NewTask(jobs.TypeReminderLogCleanup, nil)); err != nil {
|
|
log.Fatal().Err(err).Msg("Failed to register reminder log cleanup job")
|
|
}
|
|
log.Info().Str("cron", "0 3 * * *").Msg("Registered reminder log cleanup job (runs daily at 3:00 AM UTC)")
|
|
|
|
// Handle graceful shutdown
|
|
quit := make(chan os.Signal, 1)
|
|
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
// Health server (for container healthchecks; not externally published)
|
|
healthMux := http.NewServeMux()
|
|
healthMux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{"status":"ok"}`))
|
|
})
|
|
healthSrv := &http.Server{
|
|
Addr: workerHealthAddr,
|
|
Handler: healthMux,
|
|
ReadHeaderTimeout: 5 * time.Second,
|
|
}
|
|
go func() {
|
|
log.Info().Str("addr", workerHealthAddr).Msg("Health server listening")
|
|
if err := healthSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
log.Warn().Err(err).Msg("Health server terminated")
|
|
}
|
|
}()
|
|
|
|
// Start scheduler in goroutine
|
|
go func() {
|
|
if err := scheduler.Run(); err != nil {
|
|
log.Fatal().Err(err).Msg("Failed to start scheduler")
|
|
}
|
|
}()
|
|
|
|
// Start worker server in goroutine
|
|
go func() {
|
|
log.Info().Msg("Starting worker server...")
|
|
if err := srv.Run(mux); err != nil {
|
|
log.Fatal().Err(err).Msg("Failed to start worker server")
|
|
}
|
|
}()
|
|
|
|
<-quit
|
|
log.Info().Msg("Shutting down worker...")
|
|
|
|
// Graceful shutdown
|
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer shutdownCancel()
|
|
_ = healthSrv.Shutdown(shutdownCtx)
|
|
srv.Shutdown()
|
|
scheduler.Shutdown()
|
|
|
|
log.Info().Msg("Worker stopped")
|
|
}
|
|
|
|
// asynqTracingMiddleware returns an asynq.MiddlewareFunc that opens a span
|
|
// per task execution and records asynq_job_duration_seconds. Span attrs
|
|
// include task type, queue, retry count, and the result outcome.
|
|
func asynqTracingMiddleware(tracer trace.Tracer) asynq.MiddlewareFunc {
|
|
return func(next asynq.Handler) asynq.Handler {
|
|
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
|
|
ctx, span := tracer.Start(ctx, "asynq.handle:"+t.Type(),
|
|
trace.WithAttributes(
|
|
attribute.String("asynq.task_type", t.Type()),
|
|
attribute.Int("asynq.payload_bytes", len(t.Payload())),
|
|
),
|
|
)
|
|
defer span.End()
|
|
|
|
start := time.Now()
|
|
err := next.ProcessTask(ctx, t)
|
|
dur := time.Since(start)
|
|
result := "ok"
|
|
if err != nil {
|
|
result = "error"
|
|
span.SetStatus(codes.Error, err.Error())
|
|
span.RecordError(err)
|
|
}
|
|
span.SetAttributes(attribute.String("asynq.result", result))
|
|
prom.ObserveAsynqJob(t.Type(), result, dur)
|
|
return err
|
|
})
|
|
}
|
|
}
|
|
|
|
// workerDeploymentEnv mirrors deploymentEnvironment in cmd/api/main.go.
|
|
func workerDeploymentEnv(debug bool) string {
|
|
if env := os.Getenv("DEPLOYMENT_ENVIRONMENT"); env != "" {
|
|
return env
|
|
}
|
|
if debug {
|
|
return "dev"
|
|
}
|
|
return "prod"
|
|
}
|