52bf1ff3c7
POST /api/task-completions/ was spending ~1.5-1.75s synchronously on
APNs push + SMTP email + B2 image fetches inside sendTaskCompletedNotification.
Per-user loop made it scale linearly with residence membership; one image
attached + one residence user is the 1.75s baseline observed in the live
honeydue-eli5-overview Grafana panel.
Replace the inline call (and the fire-and-forget goroutine in QuickComplete,
which violated the project's "no goroutines in handlers" rule) with an
Asynq job:
- new task type notification:task_completed (worker/scheduler.go)
- new payload {task_id, completion_id} — IDs only, worker re-reads
canonical state from Postgres so concurrent edits between enqueue
and dequeue are reflected
- new HandleTaskCompletedNotification on jobs.Handler delegates to
TaskService.SendTaskCompletedNotificationByID
- new dispatchTaskCompletedNotification in task_service.go picks
between enqueue (preferred) and inline (fallback) when Redis is
unreachable or the enqueuer isn't wired (tests / local dev)
Other changes required to wire it up:
- widen worker.NewTaskClient signature to accept asynq.RedisClientOpt
so the file-mounted Redis password (audit HIGH-1) can be supplied;
no prior callers, no breakage
- extend worker.Enqueuer interface with EnqueueTaskCompletedNotification
- add TaskEnqueuer field to router.Dependencies; wire from cmd/api/main.go
with the standard typed-nil interface guard
- wire a worker-side TaskService in cmd/worker/main.go so the handler
can use the shared SendTaskCompletedNotificationByID implementation
(storage service shared with the existing upload-cleanup wiring)
Expected impact on POST /api/task-completions/ p50:
~1.75s -> ~120-170ms (DB + tx + Asynq enqueue only)
Notifications still deliver; they just go via the worker instead of in
the request path. MaxRetry=3; "row not found" returns nil so a deleted
task/completion doesn't churn the retry loop.
All 31 test packages pass. No DB migrations.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
289 lines
9.7 KiB
Go
289 lines
9.7 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/hibiken/asynq"
|
|
"github.com/rs/zerolog/log"
|
|
"gorm.io/gorm"
|
|
|
|
"github.com/treytartt/honeydue-api/internal/config"
|
|
"github.com/treytartt/honeydue-api/internal/database"
|
|
"github.com/treytartt/honeydue-api/internal/i18n"
|
|
"github.com/treytartt/honeydue-api/internal/monitoring"
|
|
"github.com/treytartt/honeydue-api/internal/push"
|
|
"github.com/treytartt/honeydue-api/internal/router"
|
|
"github.com/treytartt/honeydue-api/internal/services"
|
|
"github.com/treytartt/honeydue-api/internal/tracing"
|
|
"github.com/treytartt/honeydue-api/internal/worker"
|
|
"github.com/treytartt/honeydue-api/pkg/utils"
|
|
)
|
|
|
|
func main() {
|
|
// Load configuration
|
|
cfg, err := config.Load()
|
|
if err != nil {
|
|
fmt.Printf("Failed to load configuration: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Initialize basic logger first (will be enhanced after Redis connects)
|
|
utils.InitLogger(cfg.Server.Debug)
|
|
|
|
// Initialize i18n
|
|
if err := i18n.Init(); err != nil {
|
|
log.Warn().Err(err).Msg("Failed to initialize i18n - using English only")
|
|
} else {
|
|
log.Info().Strs("languages", i18n.SupportedLanguages).Msg("i18n initialized")
|
|
}
|
|
|
|
log.Info().
|
|
Bool("debug", cfg.Server.Debug).
|
|
Int("port", cfg.Server.Port).
|
|
Str("db_host", cfg.Database.Host).
|
|
Int("db_port", cfg.Database.Port).
|
|
Str("db_name", cfg.Database.Database).
|
|
Str("db_user", cfg.Database.User).
|
|
Str("redis_url", config.MaskURLCredentials(cfg.Redis.URL)).
|
|
Msg("Starting HoneyDue API server")
|
|
|
|
// Initialize OpenTelemetry tracing — exports to obs.88oakapps.com
|
|
// (Jaeger via OTLP/HTTP) when OBS_TRACES_URL is set; otherwise installs
|
|
// a no-op tracer so call sites can use otel.Tracer() unconditionally.
|
|
// config.SecretValue (not os.Getenv) so file-mounted secrets resolve
|
|
// after audit F8 removed these from the process environment.
|
|
tracingShutdown, err := tracing.Init(context.Background(), tracing.Config{
|
|
ServiceName: "honeydue-api",
|
|
Environment: deploymentEnvironment(cfg.Server.Debug),
|
|
EndpointURL: config.SecretValue("OBS_TRACES_URL"),
|
|
BearerToken: config.SecretValue("OBS_INGEST_TOKEN"),
|
|
SampleRatio: tracing.SampleRatioFromEnv(),
|
|
})
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("tracing init failed — continuing without traces")
|
|
}
|
|
defer func() {
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := tracingShutdown(shutdownCtx); err != nil {
|
|
log.Warn().Err(err).Msg("tracing shutdown error")
|
|
}
|
|
}()
|
|
|
|
// Connect to database (retry with backoff)
|
|
var db *gorm.DB
|
|
var dbErr error
|
|
for i := 0; i < 3; i++ {
|
|
db, dbErr = database.Connect(&cfg.Database, cfg.Server.Debug)
|
|
if dbErr == nil {
|
|
break
|
|
}
|
|
log.Warn().Err(dbErr).Int("attempt", i+1).Msg("Failed to connect to database, retrying...")
|
|
time.Sleep(time.Duration(i+1) * time.Second)
|
|
}
|
|
if dbErr != nil {
|
|
log.Error().Err(dbErr).Msg("Failed to connect to database - API will start but database operations will fail")
|
|
} else {
|
|
defer database.Close()
|
|
// Migrations are managed out-of-band by golang-migrate (see
|
|
// cmd/migrate and deploy-k3s/manifests/migrate/job.yaml) so the api
|
|
// no longer runs AutoMigrate at startup. Instead we verify the
|
|
// schema is at the expected version and refuse to start if not —
|
|
// this catches the "operator forgot to run migrate" footgun loudly,
|
|
// at boot, instead of with mysterious runtime errors.
|
|
if err := database.RequireSchemaApplied(); err != nil {
|
|
log.Fatal().Err(err).Msg("Schema precondition failed — run `kubectl -n honeydue create job --from=cronjob/honeydue-migrate` (or `make migrate-up` locally) and retry")
|
|
}
|
|
}
|
|
|
|
// Connect to Redis (optional - don't fail if unavailable)
|
|
var cache *services.CacheService
|
|
cache, err = services.NewCacheService(&cfg.Redis)
|
|
if err != nil {
|
|
log.Warn().Err(err).Msg("Failed to connect to Redis - caching disabled")
|
|
cache = nil
|
|
} else {
|
|
defer cache.Close()
|
|
if database.SeedInitialDataApplied {
|
|
if err := cache.InvalidateSeededData(context.Background()); err != nil {
|
|
log.Warn().Err(err).Msg("Failed to invalidate seeded data cache after initial seed")
|
|
} else {
|
|
log.Info().Msg("Invalidated seeded_data cache after initial seed migration")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Initialize monitoring service (if Redis is available)
|
|
var monitoringService *monitoring.Service
|
|
if cache != nil {
|
|
monitoringService = monitoring.NewService(monitoring.Config{
|
|
Process: "api",
|
|
RedisClient: cache.Client(),
|
|
DB: db, // Pass database for enable_monitoring setting sync
|
|
})
|
|
|
|
// Reinitialize logger with monitoring writer
|
|
utils.InitLoggerWithWriter(cfg.Server.Debug, monitoringService.LogWriter())
|
|
|
|
// Start stats collection
|
|
monitoringService.Start()
|
|
defer monitoringService.Stop()
|
|
|
|
log.Info().
|
|
Bool("log_capture_enabled", monitoringService.IsEnabled()).
|
|
Msg("Monitoring service initialized")
|
|
}
|
|
|
|
// Initialize email service
|
|
var emailService *services.EmailService
|
|
log.Info().
|
|
Str("email_host", cfg.Email.Host).
|
|
Str("email_user", cfg.Email.User).
|
|
Str("email_from", cfg.Email.From).
|
|
Int("email_port", cfg.Email.Port).
|
|
Msg("Email config loaded")
|
|
if cfg.Email.Host != "" && cfg.Email.User != "" {
|
|
emailService = services.NewEmailService(&cfg.Email, cfg.Features.EmailEnabled)
|
|
log.Info().
|
|
Str("host", cfg.Email.Host).
|
|
Msg("Email service initialized")
|
|
} else {
|
|
log.Warn().
|
|
Str("host", cfg.Email.Host).
|
|
Str("user", cfg.Email.User).
|
|
Msg("Email service not configured - emails will not be sent")
|
|
}
|
|
|
|
// Initialize storage service for file uploads (local filesystem or S3-compatible)
|
|
var storageService *services.StorageService
|
|
if cfg.Storage.UploadDir != "" || cfg.Storage.IsS3() {
|
|
storageService, err = services.NewStorageService(&cfg.Storage)
|
|
if err != nil {
|
|
log.Warn().Err(err).Msg("Failed to initialize storage service - uploads disabled")
|
|
} else {
|
|
// Initialize file encryption at rest if configured
|
|
if cfg.Storage.EncryptionKey != "" {
|
|
encSvc, encErr := services.NewEncryptionService(cfg.Storage.EncryptionKey)
|
|
if encErr != nil {
|
|
log.Error().Err(encErr).Msg("Failed to initialize encryption service - files will NOT be encrypted")
|
|
} else {
|
|
storageService.SetEncryptionService(encSvc)
|
|
log.Info().Msg("File encryption at rest enabled")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Initialize PDF service for report generation
|
|
pdfService := services.NewPDFService()
|
|
log.Info().Msg("PDF service initialized")
|
|
|
|
// Initialize push notification client (APNs + FCM)
|
|
var pushClient *push.Client
|
|
pushClient, err = push.NewClient(&cfg.Push, cfg.Features.PushEnabled)
|
|
if err != nil {
|
|
log.Warn().Err(err).Msg("Failed to initialize push client - push notifications disabled")
|
|
} else {
|
|
log.Info().
|
|
Bool("ios_enabled", pushClient.IsIOSEnabled()).
|
|
Bool("android_enabled", pushClient.IsAndroidEnabled()).
|
|
Msg("Push notification client initialized")
|
|
}
|
|
|
|
// Initialize Asynq enqueuer (api-side). Used by services that move
|
|
// long-running work off the request path (currently: task-completion
|
|
// notification fan-out). Same Redis as cmd/worker — file-mounted password
|
|
// applied separately because cfg.Redis.URL does not embed it (audit HIGH-1).
|
|
var taskEnqueuer *worker.TaskClient
|
|
if redisOpt, parseErr := asynq.ParseRedisURI(cfg.Redis.URL); parseErr != nil {
|
|
log.Warn().Err(parseErr).Msg("Failed to parse Redis URL for Asynq enqueuer — completion notifications will run inline")
|
|
} else if clientOpt, ok := redisOpt.(asynq.RedisClientOpt); ok {
|
|
if cfg.Redis.Password != "" {
|
|
clientOpt.Password = cfg.Redis.Password
|
|
}
|
|
taskEnqueuer = worker.NewTaskClient(clientOpt)
|
|
defer func() {
|
|
if cerr := taskEnqueuer.Close(); cerr != nil {
|
|
log.Warn().Err(cerr).Msg("Failed to close Asynq enqueuer on shutdown")
|
|
}
|
|
}()
|
|
log.Info().Msg("Asynq enqueuer initialized")
|
|
} else {
|
|
log.Warn().Msg("Redis opt is not RedisClientOpt — Asynq enqueuer skipped; completion notifications will run inline")
|
|
}
|
|
|
|
// Setup router with dependencies (includes admin panel at /admin)
|
|
deps := &router.Dependencies{
|
|
DB: db,
|
|
Cache: cache,
|
|
Config: cfg,
|
|
EmailService: emailService,
|
|
PDFService: pdfService,
|
|
PushClient: pushClient,
|
|
StorageService: storageService,
|
|
MonitoringService: monitoringService,
|
|
}
|
|
// Only assign the enqueuer when we actually constructed one. Assigning a
|
|
// nil *worker.TaskClient directly would create a typed-nil interface that
|
|
// fails the `if deps.TaskEnqueuer != nil` check in router.SetupRouter.
|
|
if taskEnqueuer != nil {
|
|
deps.TaskEnqueuer = taskEnqueuer
|
|
}
|
|
e := router.SetupRouter(deps)
|
|
|
|
// Create HTTP server
|
|
srv := &http.Server{
|
|
Addr: fmt.Sprintf(":%d", cfg.Server.Port),
|
|
Handler: e,
|
|
ReadTimeout: 30 * time.Second,
|
|
WriteTimeout: 30 * time.Second,
|
|
IdleTimeout: 60 * time.Second,
|
|
}
|
|
|
|
// Start server in goroutine
|
|
go func() {
|
|
log.Info().
|
|
Str("addr", srv.Addr).
|
|
Msg("HTTP server listening")
|
|
|
|
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
log.Fatal().Err(err).Msg("Failed to start HTTP server")
|
|
}
|
|
}()
|
|
|
|
// Wait for interrupt signal for graceful shutdown
|
|
quit := make(chan os.Signal, 1)
|
|
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
|
<-quit
|
|
|
|
log.Info().Msg("Shutting down server...")
|
|
|
|
// Graceful shutdown with timeout
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
if err := srv.Shutdown(ctx); err != nil {
|
|
log.Fatal().Err(err).Msg("Server forced to shutdown")
|
|
}
|
|
|
|
log.Info().Msg("Server exited")
|
|
}
|
|
|
|
// deploymentEnvironment turns the boolean Debug flag into the conventional
|
|
// environment label spans get tagged with.
|
|
func deploymentEnvironment(debug bool) string {
|
|
if env := os.Getenv("DEPLOYMENT_ENVIRONMENT"); env != "" {
|
|
return env
|
|
}
|
|
if debug {
|
|
return "dev"
|
|
}
|
|
return "prod"
|
|
}
|