package monitoring import ( "runtime" "time" "github.com/hibiken/asynq" "github.com/rs/zerolog/log" "github.com/shirou/gopsutil/v3/cpu" "github.com/shirou/gopsutil/v3/disk" "github.com/shirou/gopsutil/v3/load" "github.com/shirou/gopsutil/v3/mem" ) // Collector gathers system and runtime statistics type Collector struct { process string startTime time.Time statsStore *StatsStore httpCollector *HTTPStatsCollector // nil for worker asynqClient *asynq.Inspector // nil for api stopChan chan struct{} } // NewCollector creates a new stats collector func NewCollector(process string, statsStore *StatsStore) *Collector { return &Collector{ process: process, startTime: time.Now(), statsStore: statsStore, stopChan: make(chan struct{}), } } // SetHTTPCollector sets the HTTP stats collector (for API server) func (c *Collector) SetHTTPCollector(httpCollector *HTTPStatsCollector) { c.httpCollector = httpCollector } // SetAsynqInspector sets the Asynq inspector (for Worker) func (c *Collector) SetAsynqInspector(inspector *asynq.Inspector) { c.asynqClient = inspector } // Collect gathers all system statistics func (c *Collector) Collect() SystemStats { stats := SystemStats{ Timestamp: time.Now().UTC(), Process: c.process, } // CPU stats c.collectCPU(&stats) // Read Go runtime memory stats once (used by both memory and runtime collectors) var memStats runtime.MemStats runtime.ReadMemStats(&memStats) // Memory stats (system + Go runtime) c.collectMemory(&stats, &memStats) // Disk stats c.collectDisk(&stats) // Go runtime stats c.collectRuntime(&stats, &memStats) // HTTP stats (API only) if c.httpCollector != nil { httpStats := c.httpCollector.GetStats() stats.HTTP = &httpStats } // Asynq stats (Worker only) if c.asynqClient != nil { asynqStats := c.collectAsynq() stats.Asynq = &asynqStats } return stats } func (c *Collector) collectCPU(stats *SystemStats) { // Get CPU usage percentage (blocks for 200ms to sample) // This is called periodically, so a shorter window is acceptable if cpuPercent, err := cpu.Percent(200*time.Millisecond, false); err == nil && len(cpuPercent) > 0 { stats.CPU.UsagePercent = cpuPercent[0] } stats.CPU.NumCPU = runtime.NumCPU() // Load averages (Unix only, returns 0 on Windows) if avg, err := load.Avg(); err == nil { stats.CPU.LoadAvg1 = avg.Load1 stats.CPU.LoadAvg5 = avg.Load5 stats.CPU.LoadAvg15 = avg.Load15 } } func (c *Collector) collectMemory(stats *SystemStats, m *runtime.MemStats) { // System memory if vmem, err := mem.VirtualMemory(); err == nil { stats.Memory.UsedBytes = vmem.Used stats.Memory.TotalBytes = vmem.Total stats.Memory.UsagePercent = vmem.UsedPercent } // Go runtime memory (reuses pre-read MemStats) stats.Memory.HeapAlloc = m.HeapAlloc stats.Memory.HeapSys = m.HeapSys stats.Memory.HeapInuse = m.HeapInuse } func (c *Collector) collectDisk(stats *SystemStats) { // Root filesystem stats if diskStat, err := disk.Usage("/"); err == nil { stats.Disk.UsedBytes = diskStat.Used stats.Disk.TotalBytes = diskStat.Total stats.Disk.FreeBytes = diskStat.Free stats.Disk.UsagePercent = diskStat.UsedPercent } } func (c *Collector) collectRuntime(stats *SystemStats, m *runtime.MemStats) { stats.Runtime.Goroutines = runtime.NumGoroutine() stats.Runtime.NumGC = m.NumGC if m.NumGC > 0 { stats.Runtime.LastGCPause = m.PauseNs[(m.NumGC+255)%256] } stats.Runtime.Uptime = int64(time.Since(c.startTime).Seconds()) } func (c *Collector) collectAsynq() AsynqStats { stats := AsynqStats{ Queues: make(map[string]QueueStats), } if c.asynqClient == nil { return stats } queues, err := c.asynqClient.Queues() if err != nil { log.Debug().Err(err).Msg("Failed to get asynq queues") return stats } for _, qName := range queues { info, err := c.asynqClient.GetQueueInfo(qName) if err != nil { log.Debug().Err(err).Str("queue", qName).Msg("Failed to get queue info") continue } stats.Queues[qName] = QueueStats{ Pending: info.Pending, Active: info.Active, Scheduled: info.Scheduled, Retry: info.Retry, Archived: info.Archived, Completed: info.Completed, Failed: info.Failed, } } return stats } // StartPublishing begins periodic stats collection and publishing to Redis func (c *Collector) StartPublishing(interval time.Duration) { go func() { ticker := time.NewTicker(interval) defer ticker.Stop() // Collect immediately on start c.publishStats() for { select { case <-ticker.C: c.publishStats() case <-c.stopChan: return } } }() } func (c *Collector) publishStats() { stats := c.Collect() if err := c.statsStore.StoreStats(stats); err != nil { log.Debug().Err(err).Str("process", c.process).Msg("Failed to publish stats to Redis") } } // Stop stops the stats publishing func (c *Collector) Stop() { close(c.stopChan) }