package monitoring import ( "context" "encoding/json" "net/http" "strings" "sync" "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/rs/zerolog/log" ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { // Allow connections from admin panel return true }, } // Handler provides HTTP handlers for monitoring endpoints type Handler struct { logBuffer *LogBuffer statsStore *StatsStore } // NewHandler creates a new monitoring handler func NewHandler(logBuffer *LogBuffer, statsStore *StatsStore) *Handler { return &Handler{ logBuffer: logBuffer, statsStore: statsStore, } } // GetLogs returns filtered log entries // GET /api/admin/monitoring/logs func (h *Handler) GetLogs(c *gin.Context) { var filters LogFilters if err := c.ShouldBindQuery(&filters); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid filters"}) return } limit := filters.GetLimit() // Get more entries than requested for filtering entries, err := h.logBuffer.GetRecent(limit * 2) if err != nil { log.Error().Err(err).Msg("Failed to get logs from buffer") c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to retrieve logs"}) return } // Apply filters filtered := make([]LogEntry, 0, len(entries)) for _, e := range entries { // Level filter if filters.Level != "" && e.Level != filters.Level { continue } // Process filter if filters.Process != "" && e.Process != filters.Process { continue } // Search filter (case-insensitive) if filters.Search != "" { searchLower := strings.ToLower(filters.Search) messageLower := strings.ToLower(e.Message) if !strings.Contains(messageLower, searchLower) { continue } } filtered = append(filtered, e) if len(filtered) >= limit { break } } c.JSON(http.StatusOK, gin.H{ "logs": filtered, "total": len(filtered), }) } // GetStats returns system statistics for all processes // GET /api/admin/monitoring/stats func (h *Handler) GetStats(c *gin.Context) { allStats, err := h.statsStore.GetAllStats() if err != nil { log.Error().Err(err).Msg("Failed to get stats from store") c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to retrieve stats"}) return } c.JSON(http.StatusOK, allStats) } // ClearLogs clears all logs from the buffer // DELETE /api/admin/monitoring/logs func (h *Handler) ClearLogs(c *gin.Context) { if err := h.logBuffer.Clear(); err != nil { log.Error().Err(err).Msg("Failed to clear logs") c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to clear logs"}) return } c.JSON(http.StatusOK, gin.H{"message": "Logs cleared"}) } // WebSocket handles real-time log streaming // GET /api/admin/monitoring/ws func (h *Handler) WebSocket(c *gin.Context) { conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { log.Error().Err(err).Msg("Failed to upgrade WebSocket connection") return } defer conn.Close() // Create context that cancels when connection closes ctx, cancel := context.WithCancel(c.Request.Context()) defer cancel() // Subscribe to Redis pubsub for logs pubsub := h.logBuffer.Subscribe(ctx) defer pubsub.Close() // Handle incoming messages (for filter changes, ping, etc.) var wsMu sync.Mutex go func() { for { _, _, err := conn.ReadMessage() if err != nil { cancel() return } } }() // Stream logs from pubsub ch := pubsub.Channel() statsTicker := time.NewTicker(5 * time.Second) defer statsTicker.Stop() // Send initial stats h.sendStats(conn, &wsMu) for { select { case msg := <-ch: // Parse log entry var entry LogEntry if err := json.Unmarshal([]byte(msg.Payload), &entry); err != nil { continue } // Send log message wsMsg := WSMessage{ Type: WSMessageTypeLog, Data: entry, } wsMu.Lock() err := conn.WriteJSON(wsMsg) wsMu.Unlock() if err != nil { log.Debug().Err(err).Msg("WebSocket write error") return } case <-statsTicker.C: // Send periodic stats update h.sendStats(conn, &wsMu) case <-ctx.Done(): return } } } func (h *Handler) sendStats(conn *websocket.Conn, mu *sync.Mutex) { allStats, err := h.statsStore.GetAllStats() if err != nil { return } wsMsg := WSMessage{ Type: WSMessageTypeStats, Data: allStats, } mu.Lock() conn.WriteJSON(wsMsg) mu.Unlock() }