Files
honeyDueAPI/internal/monitoring/handler.go
Trey t 7690f07a2b Harden API security: input validation, safe auth extraction, new tests, and deploy config
Comprehensive security hardening from audit findings:
- Add validation tags to all DTO request structs (max lengths, ranges, enums)
- Replace unsafe type assertions with MustGetAuthUser helper across all handlers
- Remove query-param token auth from admin middleware (prevents URL token leakage)
- Add request validation calls in handlers that were missing c.Validate()
- Remove goroutines in handlers (timezone update now synchronous)
- Add sanitize middleware and path traversal protection (path_utils)
- Stop resetting admin passwords on migration restart
- Warn on well-known default SECRET_KEY
- Add ~30 new test files covering security regressions, auth safety, repos, and services
- Add deploy/ config, audit digests, and AUDIT_FINDINGS documentation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 09:48:01 -06:00

202 lines
4.7 KiB
Go

package monitoring
import (
"context"
"encoding/json"
"net/http"
"strings"
"sync"
"time"
"github.com/labstack/echo/v4"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
origin := r.Header.Get("Origin")
if origin == "" {
// Same-origin requests may omit the Origin header
return true
}
// Allow if origin matches the request host
return strings.HasPrefix(origin, "https://"+r.Host) || strings.HasPrefix(origin, "http://"+r.Host)
},
}
// Handler provides HTTP handlers for monitoring endpoints
type Handler struct {
logBuffer *LogBuffer
statsStore *StatsStore
}
// NewHandler creates a new monitoring handler
func NewHandler(logBuffer *LogBuffer, statsStore *StatsStore) *Handler {
return &Handler{
logBuffer: logBuffer,
statsStore: statsStore,
}
}
// GetLogs returns filtered log entries
// GET /api/admin/monitoring/logs
func (h *Handler) GetLogs(c echo.Context) error {
var filters LogFilters
if err := c.Bind(&filters); err != nil {
return c.JSON(http.StatusBadRequest, map[string]interface{}{"error": "Invalid filters"})
}
limit := filters.GetLimit()
// Get more entries than requested for filtering
entries, err := h.logBuffer.GetRecent(limit * 2)
if err != nil {
log.Error().Err(err).Msg("Failed to get logs from buffer")
return c.JSON(http.StatusInternalServerError, map[string]interface{}{"error": "Failed to retrieve logs"})
}
// Apply filters
filtered := make([]LogEntry, 0, len(entries))
for _, e := range entries {
// Level filter
if filters.Level != "" && e.Level != filters.Level {
continue
}
// Process filter
if filters.Process != "" && e.Process != filters.Process {
continue
}
// Search filter (case-insensitive)
if filters.Search != "" {
searchLower := strings.ToLower(filters.Search)
messageLower := strings.ToLower(e.Message)
if !strings.Contains(messageLower, searchLower) {
continue
}
}
filtered = append(filtered, e)
if len(filtered) >= limit {
break
}
}
return c.JSON(http.StatusOK, map[string]interface{}{
"logs": filtered,
"total": len(filtered),
})
}
// GetStats returns system statistics for all processes
// GET /api/admin/monitoring/stats
func (h *Handler) GetStats(c echo.Context) error {
allStats, err := h.statsStore.GetAllStats()
if err != nil {
log.Error().Err(err).Msg("Failed to get stats from store")
return c.JSON(http.StatusInternalServerError, map[string]interface{}{"error": "Failed to retrieve stats"})
}
return c.JSON(http.StatusOK, allStats)
}
// ClearLogs clears all logs from the buffer
// DELETE /api/admin/monitoring/logs
func (h *Handler) ClearLogs(c echo.Context) error {
if err := h.logBuffer.Clear(); err != nil {
log.Error().Err(err).Msg("Failed to clear logs")
return c.JSON(http.StatusInternalServerError, map[string]interface{}{"error": "Failed to clear logs"})
}
return c.JSON(http.StatusOK, map[string]interface{}{"message": "Logs cleared"})
}
// WebSocket handles real-time log streaming
// GET /api/admin/monitoring/ws
func (h *Handler) WebSocket(c echo.Context) error {
conn, err := upgrader.Upgrade(c.Response().Writer, c.Request(), nil)
if err != nil {
log.Error().Err(err).Msg("Failed to upgrade WebSocket connection")
return err
}
defer conn.Close()
// Create context that cancels when connection closes
ctx, cancel := context.WithCancel(c.Request().Context())
defer cancel()
// Subscribe to Redis pubsub for logs
pubsub := h.logBuffer.Subscribe(ctx)
defer pubsub.Close()
// Handle incoming messages (for filter changes, ping, etc.)
var wsMu sync.Mutex
go func() {
for {
_, _, err := conn.ReadMessage()
if err != nil {
cancel()
}
}
}()
// Stream logs from pubsub
ch := pubsub.Channel()
statsTicker := time.NewTicker(5 * time.Second)
defer statsTicker.Stop()
// Send initial stats
h.sendStats(conn, &wsMu)
for {
select {
case msg := <-ch:
// Parse log entry
var entry LogEntry
if err := json.Unmarshal([]byte(msg.Payload), &entry); err != nil {
continue
}
// Send log message
wsMsg := WSMessage{
Type: WSMessageTypeLog,
Data: entry,
}
wsMu.Lock()
err := conn.WriteJSON(wsMsg)
wsMu.Unlock()
if err != nil {
log.Debug().Err(err).Msg("WebSocket write error")
}
case <-statsTicker.C:
// Send periodic stats update
h.sendStats(conn, &wsMu)
case <-ctx.Done():
return nil
}
}
}
func (h *Handler) sendStats(conn *websocket.Conn, mu *sync.Mutex) {
allStats, err := h.statsStore.GetAllStats()
if err != nil {
}
wsMsg := WSMessage{
Type: WSMessageTypeStats,
Data: allStats,
}
mu.Lock()
conn.WriteJSON(wsMsg)
mu.Unlock()
}