package monitoring import ( "io" "sync" "time" "github.com/hibiken/asynq" "github.com/labstack/echo/v4" "github.com/redis/go-redis/v9" "github.com/rs/zerolog/log" "gorm.io/gorm" "github.com/treytartt/honeydue-api/internal/models" ) const ( // DefaultStatsInterval is the default interval for collecting/publishing stats DefaultStatsInterval = 5 * time.Second // SettingsSyncInterval is how often to check the database for enable_monitoring setting SettingsSyncInterval = 30 * time.Second ) // Service orchestrates all monitoring components type Service struct { process string logBuffer *LogBuffer statsStore *StatsStore collector *Collector httpCollector *HTTPStatsCollector handler *Handler logWriter *RedisLogWriter db *gorm.DB settingsStopCh chan struct{} stopOnce sync.Once statsInterval time.Duration } // Config holds configuration for the monitoring service type Config struct { Process string // "api" or "worker" RedisClient *redis.Client // Redis client for log buffer StatsInterval time.Duration // Interval for stats collection (default 5s) DB *gorm.DB // Database for checking enable_monitoring setting (optional) } // NewService creates a new monitoring service func NewService(cfg Config) *Service { if cfg.StatsInterval == 0 { cfg.StatsInterval = DefaultStatsInterval } // Create components logBuffer := NewLogBuffer(cfg.RedisClient) statsStore := NewStatsStore(cfg.RedisClient) collector := NewCollector(cfg.Process, statsStore) handler := NewHandler(logBuffer, statsStore) logWriter := NewRedisLogWriter(logBuffer, cfg.Process) // For API server, create HTTP stats collector var httpCollector *HTTPStatsCollector if cfg.Process == "api" { httpCollector = NewHTTPStatsCollector() collector.SetHTTPCollector(httpCollector) } svc := &Service{ process: cfg.Process, logBuffer: logBuffer, statsStore: statsStore, collector: collector, httpCollector: httpCollector, handler: handler, logWriter: logWriter, db: cfg.DB, settingsStopCh: make(chan struct{}), statsInterval: cfg.StatsInterval, } // Check initial setting from database if cfg.DB != nil { svc.syncSettingsFromDB() } return svc } // SetAsynqInspector sets the Asynq inspector for worker stats func (s *Service) SetAsynqInspector(inspector *asynq.Inspector) { s.collector.SetAsynqInspector(inspector) } // Start begins collecting and publishing stats func (s *Service) Start() { log.Info(). Str("process", s.process). Dur("interval", s.statsInterval). Bool("enabled", s.logWriter.IsEnabled()). Msg("Starting monitoring service") s.collector.StartPublishing(s.statsInterval) // Start settings sync if database is available if s.db != nil { go s.startSettingsSync() } } // Stop stops the monitoring service. It is safe to call multiple times. func (s *Service) Stop() { s.stopOnce.Do(func() { // Stop settings sync close(s.settingsStopCh) s.collector.Stop() // Flush and close the log writer's background goroutine s.logWriter.Close() log.Info().Str("process", s.process).Msg("Monitoring service stopped") }) } // syncSettingsFromDB checks the database for the enable_monitoring setting func (s *Service) syncSettingsFromDB() { if s.db == nil { return } var settings models.SubscriptionSettings err := s.db.First(&settings, 1).Error if err != nil { if err == gorm.ErrRecordNotFound { // No settings record, default to enabled s.logWriter.SetEnabled(true) } // On other errors, keep current state return } wasEnabled := s.logWriter.IsEnabled() s.logWriter.SetEnabled(settings.EnableMonitoring) if wasEnabled != settings.EnableMonitoring { log.Info(). Str("process", s.process). Bool("enabled", settings.EnableMonitoring). Msg("Monitoring log capture setting changed") } } // startSettingsSync periodically checks the database for settings changes func (s *Service) startSettingsSync() { ticker := time.NewTicker(SettingsSyncInterval) defer ticker.Stop() for { select { case <-s.settingsStopCh: return case <-ticker.C: s.syncSettingsFromDB() } } } // SetEnabled manually enables or disables log capture func (s *Service) SetEnabled(enabled bool) { s.logWriter.SetEnabled(enabled) } // IsEnabled returns whether log capture is enabled func (s *Service) IsEnabled() bool { return s.logWriter.IsEnabled() } // SetDB sets the database connection for settings sync // This can be called after NewService if DB wasn't available during initialization func (s *Service) SetDB(db *gorm.DB) { s.db = db s.syncSettingsFromDB() } // LogWriter returns an io.Writer for zerolog that captures logs to Redis func (s *Service) LogWriter() io.Writer { return s.logWriter } // Handler returns the HTTP handler for monitoring endpoints func (s *Service) Handler() *Handler { return s.handler } // HTTPCollector returns the HTTP stats collector (nil for worker) func (s *Service) HTTPCollector() *HTTPStatsCollector { return s.httpCollector } // MetricsMiddleware returns the Echo middleware for HTTP metrics (API server only) func (s *Service) MetricsMiddleware() echo.MiddlewareFunc { if s.httpCollector == nil { return nil } return MetricsMiddleware(s.httpCollector) }