package monitoring import ( "encoding/json" "sync/atomic" "time" "github.com/google/uuid" ) const ( // writerChannelSize is the buffer size for the async log write channel. // Entries beyond this limit are dropped to prevent unbounded memory growth. writerChannelSize = 256 ) // RedisLogWriter implements io.Writer to capture zerolog output to Redis. // It uses a single background goroutine with a buffered channel instead of // spawning a new goroutine per log line, preventing unbounded goroutine growth. type RedisLogWriter struct { buffer *LogBuffer process string enabled atomic.Bool ch chan LogEntry done chan struct{} } // NewRedisLogWriter creates a new writer that captures logs to Redis. // It starts a single background goroutine that drains the buffered channel. func NewRedisLogWriter(buffer *LogBuffer, process string) *RedisLogWriter { w := &RedisLogWriter{ buffer: buffer, process: process, ch: make(chan LogEntry, writerChannelSize), done: make(chan struct{}), } w.enabled.Store(true) // enabled by default // Single background goroutine drains the channel go w.drainLoop() return w } // drainLoop reads entries from the buffered channel and pushes them to Redis. // It runs in a single goroutine for the lifetime of the writer. func (w *RedisLogWriter) drainLoop() { defer close(w.done) for entry := range w.ch { _ = w.buffer.Push(entry) // Ignore errors to avoid blocking log output } } // Close shuts down the background goroutine. It should be called during // graceful shutdown to ensure all buffered entries are flushed. func (w *RedisLogWriter) Close() { close(w.ch) <-w.done // Wait for drain to finish } // SetEnabled enables or disables log capture to Redis func (w *RedisLogWriter) SetEnabled(enabled bool) { w.enabled.Store(enabled) } // IsEnabled returns whether log capture is enabled func (w *RedisLogWriter) IsEnabled() bool { return w.enabled.Load() } // Write implements io.Writer interface. // It parses zerolog JSON output and sends it to the buffered channel for // async Redis writes. If the channel is full, the entry is dropped to // avoid blocking the caller (back-pressure shedding). func (w *RedisLogWriter) Write(p []byte) (n int, err error) { // Skip if monitoring is disabled if !w.enabled.Load() { return len(p), nil } // Parse zerolog JSON output var raw map[string]any if err := json.Unmarshal(p, &raw); err != nil { // Not valid JSON, skip (could be console writer output) return len(p), nil } // Build log entry entry := LogEntry{ ID: uuid.NewString(), Timestamp: time.Now().UTC(), Process: w.process, Fields: make(map[string]any), } // Extract standard zerolog fields if lvl, ok := raw["level"].(string); ok { entry.Level = lvl } if msg, ok := raw["message"].(string); ok { entry.Message = msg } if caller, ok := raw["caller"].(string); ok { entry.Caller = caller } // Extract timestamp if present (zerolog may include it) if ts, ok := raw["time"].(string); ok { if parsed, err := time.Parse(time.RFC3339, ts); err == nil { entry.Timestamp = parsed } } // Copy additional fields (excluding standard ones) for k, v := range raw { switch k { case "level", "message", "caller", "time": // Skip standard fields default: entry.Fields[k] = v } } // Non-blocking send: drop entries if channel is full rather than // spawning unbounded goroutines or blocking the logger select { case w.ch <- entry: // Sent successfully default: // Channel full — drop this entry to avoid back-pressure on the logger } return len(p), nil }