Major changes: - Migrate all handlers from Gin to Echo framework - Add new apperrors, echohelpers, and validator packages - Update middleware for Echo compatibility - Add ArchivedHandler to task categorization chain (archived tasks go to cancelled_tasks column) - Add 6 new integration tests: - RecurringTaskLifecycle: NextDueDate advancement for weekly/monthly tasks - MultiUserSharing: Complex sharing with user removal - TaskStateTransitions: All state transitions and kanban column changes - DateBoundaryEdgeCases: Threshold boundary testing - CascadeOperations: Residence deletion cascade effects - MultiUserOperations: Shared residence collaboration - Add single-purpose repository functions for kanban columns (GetOverdueTasks, GetDueSoonTasks, etc.) - Fix RemoveUser route param mismatch (userId -> user_id) - Fix determineExpectedColumn helper to correctly prioritize in_progress over overdue 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
195 lines
4.4 KiB
Go
195 lines
4.4 KiB
Go
package monitoring
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/labstack/echo/v4"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
var upgrader = websocket.Upgrader{
|
|
ReadBufferSize: 1024,
|
|
WriteBufferSize: 1024,
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
// Allow connections from admin panel
|
|
return true
|
|
},
|
|
}
|
|
|
|
// Handler provides HTTP handlers for monitoring endpoints
|
|
type Handler struct {
|
|
logBuffer *LogBuffer
|
|
statsStore *StatsStore
|
|
}
|
|
|
|
// NewHandler creates a new monitoring handler
|
|
func NewHandler(logBuffer *LogBuffer, statsStore *StatsStore) *Handler {
|
|
return &Handler{
|
|
logBuffer: logBuffer,
|
|
statsStore: statsStore,
|
|
}
|
|
}
|
|
|
|
// GetLogs returns filtered log entries
|
|
// GET /api/admin/monitoring/logs
|
|
func (h *Handler) GetLogs(c echo.Context) error {
|
|
var filters LogFilters
|
|
if err := c.Bind(&filters); err != nil {
|
|
return c.JSON(http.StatusBadRequest, map[string]interface{}{"error": "Invalid filters"})
|
|
}
|
|
|
|
limit := filters.GetLimit()
|
|
|
|
// Get more entries than requested for filtering
|
|
entries, err := h.logBuffer.GetRecent(limit * 2)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get logs from buffer")
|
|
return c.JSON(http.StatusInternalServerError, map[string]interface{}{"error": "Failed to retrieve logs"})
|
|
}
|
|
|
|
// Apply filters
|
|
filtered := make([]LogEntry, 0, len(entries))
|
|
for _, e := range entries {
|
|
// Level filter
|
|
if filters.Level != "" && e.Level != filters.Level {
|
|
continue
|
|
}
|
|
|
|
// Process filter
|
|
if filters.Process != "" && e.Process != filters.Process {
|
|
continue
|
|
}
|
|
|
|
// Search filter (case-insensitive)
|
|
if filters.Search != "" {
|
|
searchLower := strings.ToLower(filters.Search)
|
|
messageLower := strings.ToLower(e.Message)
|
|
if !strings.Contains(messageLower, searchLower) {
|
|
continue
|
|
}
|
|
}
|
|
|
|
filtered = append(filtered, e)
|
|
if len(filtered) >= limit {
|
|
break
|
|
}
|
|
}
|
|
|
|
return c.JSON(http.StatusOK, map[string]interface{}{
|
|
"logs": filtered,
|
|
"total": len(filtered),
|
|
})
|
|
}
|
|
|
|
// GetStats returns system statistics for all processes
|
|
// GET /api/admin/monitoring/stats
|
|
func (h *Handler) GetStats(c echo.Context) error {
|
|
allStats, err := h.statsStore.GetAllStats()
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get stats from store")
|
|
return c.JSON(http.StatusInternalServerError, map[string]interface{}{"error": "Failed to retrieve stats"})
|
|
}
|
|
|
|
return c.JSON(http.StatusOK, allStats)
|
|
}
|
|
|
|
// ClearLogs clears all logs from the buffer
|
|
// DELETE /api/admin/monitoring/logs
|
|
func (h *Handler) ClearLogs(c echo.Context) error {
|
|
if err := h.logBuffer.Clear(); err != nil {
|
|
log.Error().Err(err).Msg("Failed to clear logs")
|
|
return c.JSON(http.StatusInternalServerError, map[string]interface{}{"error": "Failed to clear logs"})
|
|
}
|
|
|
|
return c.JSON(http.StatusOK, map[string]interface{}{"message": "Logs cleared"})
|
|
}
|
|
|
|
// WebSocket handles real-time log streaming
|
|
// GET /api/admin/monitoring/ws
|
|
func (h *Handler) WebSocket(c echo.Context) error {
|
|
conn, err := upgrader.Upgrade(c.Response().Writer, c.Request(), nil)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to upgrade WebSocket connection")
|
|
}
|
|
defer conn.Close()
|
|
|
|
// Create context that cancels when connection closes
|
|
ctx, cancel := context.WithCancel(c.Request().Context())
|
|
defer cancel()
|
|
|
|
// Subscribe to Redis pubsub for logs
|
|
pubsub := h.logBuffer.Subscribe(ctx)
|
|
defer pubsub.Close()
|
|
|
|
// Handle incoming messages (for filter changes, ping, etc.)
|
|
var wsMu sync.Mutex
|
|
go func() {
|
|
for {
|
|
_, _, err := conn.ReadMessage()
|
|
if err != nil {
|
|
cancel()
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Stream logs from pubsub
|
|
ch := pubsub.Channel()
|
|
statsTicker := time.NewTicker(5 * time.Second)
|
|
defer statsTicker.Stop()
|
|
|
|
// Send initial stats
|
|
h.sendStats(conn, &wsMu)
|
|
|
|
for {
|
|
select {
|
|
case msg := <-ch:
|
|
// Parse log entry
|
|
var entry LogEntry
|
|
if err := json.Unmarshal([]byte(msg.Payload), &entry); err != nil {
|
|
continue
|
|
}
|
|
|
|
// Send log message
|
|
wsMsg := WSMessage{
|
|
Type: WSMessageTypeLog,
|
|
Data: entry,
|
|
}
|
|
|
|
wsMu.Lock()
|
|
err := conn.WriteJSON(wsMsg)
|
|
wsMu.Unlock()
|
|
|
|
if err != nil {
|
|
log.Debug().Err(err).Msg("WebSocket write error")
|
|
}
|
|
|
|
case <-statsTicker.C:
|
|
// Send periodic stats update
|
|
h.sendStats(conn, &wsMu)
|
|
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *Handler) sendStats(conn *websocket.Conn, mu *sync.Mutex) {
|
|
allStats, err := h.statsStore.GetAllStats()
|
|
if err != nil {
|
|
}
|
|
|
|
wsMsg := WSMessage{
|
|
Type: WSMessageTypeStats,
|
|
Data: allStats,
|
|
}
|
|
|
|
mu.Lock()
|
|
conn.WriteJSON(wsMsg)
|
|
mu.Unlock()
|
|
}
|