package monitoring import ( "context" "encoding/json" "time" "github.com/redis/go-redis/v9" ) // Redis key constants for monitoring const ( LogsKey = "monitoring:logs" LogsChannel = "monitoring:logs:channel" StatsKeyPrefix = "monitoring:stats:" MaxLogEntries = 1000 LogsTTL = 24 * time.Hour StatsExpiration = 30 * time.Second // Stats expire if not updated ) // LogBuffer provides Redis-backed ring buffer for log entries type LogBuffer struct { client *redis.Client } // NewLogBuffer creates a new log buffer with the given Redis client func NewLogBuffer(client *redis.Client) *LogBuffer { return &LogBuffer{client: client} } // Push adds a log entry to the buffer and publishes it for real-time streaming func (b *LogBuffer) Push(entry LogEntry) error { ctx := context.Background() data, err := json.Marshal(entry) if err != nil { return err } // Use pipeline for atomic operations pipe := b.client.Pipeline() // Push to list (ring buffer) pipe.LPush(ctx, LogsKey, data) // Trim to max entries pipe.LTrim(ctx, LogsKey, 0, MaxLogEntries-1) // Publish for real-time subscribers pipe.Publish(ctx, LogsChannel, data) _, err = pipe.Exec(ctx) return err } // GetRecent retrieves the most recent log entries func (b *LogBuffer) GetRecent(count int) ([]LogEntry, error) { ctx := context.Background() if count <= 0 { count = 100 } if count > MaxLogEntries { count = MaxLogEntries } results, err := b.client.LRange(ctx, LogsKey, 0, int64(count-1)).Result() if err != nil { return nil, err } entries := make([]LogEntry, 0, len(results)) for _, r := range results { var entry LogEntry if json.Unmarshal([]byte(r), &entry) == nil { entries = append(entries, entry) } } return entries, nil } // Subscribe returns a Redis pubsub channel for real-time log streaming func (b *LogBuffer) Subscribe(ctx context.Context) *redis.PubSub { return b.client.Subscribe(ctx, LogsChannel) } // Clear removes all logs from the buffer func (b *LogBuffer) Clear() error { ctx := context.Background() return b.client.Del(ctx, LogsKey).Err() } // Count returns the number of logs in the buffer func (b *LogBuffer) Count() (int64, error) { ctx := context.Background() return b.client.LLen(ctx, LogsKey).Result() } // StatsStore provides Redis storage for system statistics type StatsStore struct { client *redis.Client } // NewStatsStore creates a new stats store with the given Redis client func NewStatsStore(client *redis.Client) *StatsStore { return &StatsStore{client: client} } // StoreStats stores system stats for a process func (s *StatsStore) StoreStats(stats SystemStats) error { ctx := context.Background() data, err := json.Marshal(stats) if err != nil { return err } key := StatsKeyPrefix + stats.Process return s.client.Set(ctx, key, data, StatsExpiration).Err() } // GetStats retrieves stats for a specific process func (s *StatsStore) GetStats(process string) (*SystemStats, error) { ctx := context.Background() key := StatsKeyPrefix + process data, err := s.client.Get(ctx, key).Bytes() if err != nil { if err == redis.Nil { return nil, nil // No stats available } return nil, err } var stats SystemStats if err := json.Unmarshal(data, &stats); err != nil { return nil, err } return &stats, nil } // GetAllStats retrieves stats for all processes (api and worker) func (s *StatsStore) GetAllStats() (map[string]*SystemStats, error) { result := make(map[string]*SystemStats) apiStats, err := s.GetStats("api") if err != nil { return nil, err } if apiStats != nil { result["api"] = apiStats } workerStats, err := s.GetStats("worker") if err != nil { return nil, err } if workerStats != nil { result["worker"] = workerStats } return result, nil }