Add real-time log monitoring and system stats dashboard
Implements a comprehensive monitoring system for the admin interface: Backend: - New monitoring package with Redis ring buffer for log storage - Zerolog MultiWriter to capture logs to Redis - System stats collection (CPU, memory, disk, goroutines, GC) - HTTP metrics middleware (request counts, latency, error rates) - Asynq queue stats for worker process - WebSocket endpoint for real-time log streaming - Admin auth middleware now accepts token in query params (for WebSocket) Frontend: - New monitoring page with tabs (Overview, Logs, API Stats, Worker Stats) - Real-time log viewer with level filtering and search - System stats cards showing CPU, memory, goroutines, uptime - HTTP endpoint statistics table - Asynq queue depth visualization - Enable/disable monitoring toggle in settings Memory safeguards: - Max 200 unique endpoints tracked - Hourly stats reset to prevent unbounded growth - Max 1000 log entries in ring buffer - Max 1000 latency samples for P95 calculation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -29,6 +29,7 @@ func NewAdminSettingsHandler(db *gorm.DB) *AdminSettingsHandler {
|
||||
// SettingsResponse represents the settings response
|
||||
type SettingsResponse struct {
|
||||
EnableLimitations bool `json:"enable_limitations"`
|
||||
EnableMonitoring bool `json:"enable_monitoring"`
|
||||
}
|
||||
|
||||
// GetSettings handles GET /api/admin/settings
|
||||
@@ -37,7 +38,7 @@ func (h *AdminSettingsHandler) GetSettings(c *gin.Context) {
|
||||
if err := h.db.First(&settings, 1).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
// Create default settings
|
||||
settings = models.SubscriptionSettings{ID: 1, EnableLimitations: false}
|
||||
settings = models.SubscriptionSettings{ID: 1, EnableLimitations: false, EnableMonitoring: true}
|
||||
h.db.Create(&settings)
|
||||
} else {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to fetch settings"})
|
||||
@@ -47,12 +48,14 @@ func (h *AdminSettingsHandler) GetSettings(c *gin.Context) {
|
||||
|
||||
c.JSON(http.StatusOK, SettingsResponse{
|
||||
EnableLimitations: settings.EnableLimitations,
|
||||
EnableMonitoring: settings.EnableMonitoring,
|
||||
})
|
||||
}
|
||||
|
||||
// UpdateSettingsRequest represents the update request
|
||||
type UpdateSettingsRequest struct {
|
||||
EnableLimitations *bool `json:"enable_limitations"`
|
||||
EnableMonitoring *bool `json:"enable_monitoring"`
|
||||
}
|
||||
|
||||
// UpdateSettings handles PUT /api/admin/settings
|
||||
@@ -66,7 +69,7 @@ func (h *AdminSettingsHandler) UpdateSettings(c *gin.Context) {
|
||||
var settings models.SubscriptionSettings
|
||||
if err := h.db.First(&settings, 1).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
settings = models.SubscriptionSettings{ID: 1}
|
||||
settings = models.SubscriptionSettings{ID: 1, EnableMonitoring: true}
|
||||
} else {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to fetch settings"})
|
||||
return
|
||||
@@ -77,6 +80,10 @@ func (h *AdminSettingsHandler) UpdateSettings(c *gin.Context) {
|
||||
settings.EnableLimitations = *req.EnableLimitations
|
||||
}
|
||||
|
||||
if req.EnableMonitoring != nil {
|
||||
settings.EnableMonitoring = *req.EnableMonitoring
|
||||
}
|
||||
|
||||
if err := h.db.Save(&settings).Error; err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update settings"})
|
||||
return
|
||||
@@ -84,6 +91,7 @@ func (h *AdminSettingsHandler) UpdateSettings(c *gin.Context) {
|
||||
|
||||
c.JSON(http.StatusOK, SettingsResponse{
|
||||
EnableLimitations: settings.EnableLimitations,
|
||||
EnableMonitoring: settings.EnableMonitoring,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/treytartt/casera-api/internal/admin/handlers"
|
||||
"github.com/treytartt/casera-api/internal/config"
|
||||
"github.com/treytartt/casera-api/internal/middleware"
|
||||
"github.com/treytartt/casera-api/internal/monitoring"
|
||||
"github.com/treytartt/casera-api/internal/push"
|
||||
"github.com/treytartt/casera-api/internal/repositories"
|
||||
"github.com/treytartt/casera-api/internal/services"
|
||||
@@ -22,6 +23,7 @@ type Dependencies struct {
|
||||
EmailService *services.EmailService
|
||||
PushClient *push.Client
|
||||
OnboardingService *services.OnboardingEmailService
|
||||
MonitoringHandler *monitoring.Handler
|
||||
}
|
||||
|
||||
// SetupRoutes configures all admin routes
|
||||
@@ -424,6 +426,17 @@ func SetupRoutes(router *gin.Engine, db *gorm.DB, cfg *config.Config, deps *Depe
|
||||
onboardingEmails.GET("/:id", onboardingHandler.Get)
|
||||
onboardingEmails.DELETE("/:id", onboardingHandler.Delete)
|
||||
}
|
||||
|
||||
// System monitoring (logs, stats, websocket)
|
||||
if deps != nil && deps.MonitoringHandler != nil {
|
||||
monitoringGroup := protected.Group("/monitoring")
|
||||
{
|
||||
monitoringGroup.GET("/logs", deps.MonitoringHandler.GetLogs)
|
||||
monitoringGroup.GET("/stats", deps.MonitoringHandler.GetStats)
|
||||
monitoringGroup.DELETE("/logs", deps.MonitoringHandler.ClearLogs)
|
||||
monitoringGroup.GET("/ws", deps.MonitoringHandler.WebSocket)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,21 +32,27 @@ type AdminClaims struct {
|
||||
// AdminAuthMiddleware creates a middleware that validates admin JWT tokens
|
||||
func AdminAuthMiddleware(cfg *config.Config, adminRepo *repositories.AdminRepository) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
var tokenString string
|
||||
|
||||
// Get token from Authorization header
|
||||
authHeader := c.GetHeader("Authorization")
|
||||
if authHeader == "" {
|
||||
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "Authorization header required"})
|
||||
return
|
||||
if authHeader != "" {
|
||||
// Check Bearer prefix
|
||||
parts := strings.SplitN(authHeader, " ", 2)
|
||||
if len(parts) == 2 && strings.ToLower(parts[0]) == "bearer" {
|
||||
tokenString = parts[1]
|
||||
}
|
||||
}
|
||||
|
||||
// Check Bearer prefix
|
||||
parts := strings.SplitN(authHeader, " ", 2)
|
||||
if len(parts) != 2 || strings.ToLower(parts[0]) != "bearer" {
|
||||
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "Invalid authorization header format"})
|
||||
return
|
||||
// If no header token, check query parameter (for WebSocket connections)
|
||||
if tokenString == "" {
|
||||
tokenString = c.Query("token")
|
||||
}
|
||||
|
||||
tokenString := parts[1]
|
||||
if tokenString == "" {
|
||||
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "Authorization required"})
|
||||
return
|
||||
}
|
||||
|
||||
// Parse and validate token
|
||||
claims := &AdminClaims{}
|
||||
|
||||
@@ -16,6 +16,7 @@ const (
|
||||
type SubscriptionSettings struct {
|
||||
ID uint `gorm:"primaryKey" json:"id"`
|
||||
EnableLimitations bool `gorm:"column:enable_limitations;default:false" json:"enable_limitations"`
|
||||
EnableMonitoring bool `gorm:"column:enable_monitoring;default:true" json:"enable_monitoring"`
|
||||
}
|
||||
|
||||
// TableName returns the table name for GORM
|
||||
|
||||
165
internal/monitoring/buffer.go
Normal file
165
internal/monitoring/buffer.go
Normal file
@@ -0,0 +1,165 @@
|
||||
package monitoring
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// Redis key constants for monitoring
|
||||
const (
|
||||
LogsKey = "monitoring:logs"
|
||||
LogsChannel = "monitoring:logs:channel"
|
||||
StatsKeyPrefix = "monitoring:stats:"
|
||||
MaxLogEntries = 1000
|
||||
LogsTTL = 24 * time.Hour
|
||||
StatsExpiration = 30 * time.Second // Stats expire if not updated
|
||||
)
|
||||
|
||||
// LogBuffer provides Redis-backed ring buffer for log entries
|
||||
type LogBuffer struct {
|
||||
client *redis.Client
|
||||
}
|
||||
|
||||
// NewLogBuffer creates a new log buffer with the given Redis client
|
||||
func NewLogBuffer(client *redis.Client) *LogBuffer {
|
||||
return &LogBuffer{client: client}
|
||||
}
|
||||
|
||||
// Push adds a log entry to the buffer and publishes it for real-time streaming
|
||||
func (b *LogBuffer) Push(entry LogEntry) error {
|
||||
ctx := context.Background()
|
||||
|
||||
data, err := json.Marshal(entry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Use pipeline for atomic operations
|
||||
pipe := b.client.Pipeline()
|
||||
|
||||
// Push to list (ring buffer)
|
||||
pipe.LPush(ctx, LogsKey, data)
|
||||
|
||||
// Trim to max entries
|
||||
pipe.LTrim(ctx, LogsKey, 0, MaxLogEntries-1)
|
||||
|
||||
// Publish for real-time subscribers
|
||||
pipe.Publish(ctx, LogsChannel, data)
|
||||
|
||||
_, err = pipe.Exec(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetRecent retrieves the most recent log entries
|
||||
func (b *LogBuffer) GetRecent(count int) ([]LogEntry, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
if count <= 0 {
|
||||
count = 100
|
||||
}
|
||||
if count > MaxLogEntries {
|
||||
count = MaxLogEntries
|
||||
}
|
||||
|
||||
results, err := b.client.LRange(ctx, LogsKey, 0, int64(count-1)).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
entries := make([]LogEntry, 0, len(results))
|
||||
for _, r := range results {
|
||||
var entry LogEntry
|
||||
if json.Unmarshal([]byte(r), &entry) == nil {
|
||||
entries = append(entries, entry)
|
||||
}
|
||||
}
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
// Subscribe returns a Redis pubsub channel for real-time log streaming
|
||||
func (b *LogBuffer) Subscribe(ctx context.Context) *redis.PubSub {
|
||||
return b.client.Subscribe(ctx, LogsChannel)
|
||||
}
|
||||
|
||||
// Clear removes all logs from the buffer
|
||||
func (b *LogBuffer) Clear() error {
|
||||
ctx := context.Background()
|
||||
return b.client.Del(ctx, LogsKey).Err()
|
||||
}
|
||||
|
||||
// Count returns the number of logs in the buffer
|
||||
func (b *LogBuffer) Count() (int64, error) {
|
||||
ctx := context.Background()
|
||||
return b.client.LLen(ctx, LogsKey).Result()
|
||||
}
|
||||
|
||||
// StatsStore provides Redis storage for system statistics
|
||||
type StatsStore struct {
|
||||
client *redis.Client
|
||||
}
|
||||
|
||||
// NewStatsStore creates a new stats store with the given Redis client
|
||||
func NewStatsStore(client *redis.Client) *StatsStore {
|
||||
return &StatsStore{client: client}
|
||||
}
|
||||
|
||||
// StoreStats stores system stats for a process
|
||||
func (s *StatsStore) StoreStats(stats SystemStats) error {
|
||||
ctx := context.Background()
|
||||
|
||||
data, err := json.Marshal(stats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key := StatsKeyPrefix + stats.Process
|
||||
return s.client.Set(ctx, key, data, StatsExpiration).Err()
|
||||
}
|
||||
|
||||
// GetStats retrieves stats for a specific process
|
||||
func (s *StatsStore) GetStats(process string) (*SystemStats, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
key := StatsKeyPrefix + process
|
||||
data, err := s.client.Get(ctx, key).Bytes()
|
||||
if err != nil {
|
||||
if err == redis.Nil {
|
||||
return nil, nil // No stats available
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var stats SystemStats
|
||||
if err := json.Unmarshal(data, &stats); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &stats, nil
|
||||
}
|
||||
|
||||
// GetAllStats retrieves stats for all processes (api and worker)
|
||||
func (s *StatsStore) GetAllStats() (map[string]*SystemStats, error) {
|
||||
result := make(map[string]*SystemStats)
|
||||
|
||||
apiStats, err := s.GetStats("api")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if apiStats != nil {
|
||||
result["api"] = apiStats
|
||||
}
|
||||
|
||||
workerStats, err := s.GetStats("worker")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if workerStats != nil {
|
||||
result["worker"] = workerStats
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
199
internal/monitoring/collector.go
Normal file
199
internal/monitoring/collector.go
Normal file
@@ -0,0 +1,199 @@
|
||||
package monitoring
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/shirou/gopsutil/v3/cpu"
|
||||
"github.com/shirou/gopsutil/v3/disk"
|
||||
"github.com/shirou/gopsutil/v3/load"
|
||||
"github.com/shirou/gopsutil/v3/mem"
|
||||
)
|
||||
|
||||
// Collector gathers system and runtime statistics
|
||||
type Collector struct {
|
||||
process string
|
||||
startTime time.Time
|
||||
statsStore *StatsStore
|
||||
httpCollector *HTTPStatsCollector // nil for worker
|
||||
asynqClient *asynq.Inspector // nil for api
|
||||
stopChan chan struct{}
|
||||
}
|
||||
|
||||
// NewCollector creates a new stats collector
|
||||
func NewCollector(process string, statsStore *StatsStore) *Collector {
|
||||
return &Collector{
|
||||
process: process,
|
||||
startTime: time.Now(),
|
||||
statsStore: statsStore,
|
||||
stopChan: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// SetHTTPCollector sets the HTTP stats collector (for API server)
|
||||
func (c *Collector) SetHTTPCollector(httpCollector *HTTPStatsCollector) {
|
||||
c.httpCollector = httpCollector
|
||||
}
|
||||
|
||||
// SetAsynqInspector sets the Asynq inspector (for Worker)
|
||||
func (c *Collector) SetAsynqInspector(inspector *asynq.Inspector) {
|
||||
c.asynqClient = inspector
|
||||
}
|
||||
|
||||
// Collect gathers all system statistics
|
||||
func (c *Collector) Collect() SystemStats {
|
||||
stats := SystemStats{
|
||||
Timestamp: time.Now().UTC(),
|
||||
Process: c.process,
|
||||
}
|
||||
|
||||
// CPU stats
|
||||
c.collectCPU(&stats)
|
||||
|
||||
// Memory stats (system + Go runtime)
|
||||
c.collectMemory(&stats)
|
||||
|
||||
// Disk stats
|
||||
c.collectDisk(&stats)
|
||||
|
||||
// Go runtime stats
|
||||
c.collectRuntime(&stats)
|
||||
|
||||
// HTTP stats (API only)
|
||||
if c.httpCollector != nil {
|
||||
httpStats := c.httpCollector.GetStats()
|
||||
stats.HTTP = &httpStats
|
||||
}
|
||||
|
||||
// Asynq stats (Worker only)
|
||||
if c.asynqClient != nil {
|
||||
asynqStats := c.collectAsynq()
|
||||
stats.Asynq = &asynqStats
|
||||
}
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
func (c *Collector) collectCPU(stats *SystemStats) {
|
||||
// Get CPU usage percentage (this blocks for ~100ms to sample)
|
||||
if cpuPercent, err := cpu.Percent(100*time.Millisecond, false); err == nil && len(cpuPercent) > 0 {
|
||||
stats.CPU.UsagePercent = cpuPercent[0]
|
||||
}
|
||||
|
||||
stats.CPU.NumCPU = runtime.NumCPU()
|
||||
|
||||
// Load averages (Unix only, returns 0 on Windows)
|
||||
if avg, err := load.Avg(); err == nil {
|
||||
stats.CPU.LoadAvg1 = avg.Load1
|
||||
stats.CPU.LoadAvg5 = avg.Load5
|
||||
stats.CPU.LoadAvg15 = avg.Load15
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Collector) collectMemory(stats *SystemStats) {
|
||||
// System memory
|
||||
if vmem, err := mem.VirtualMemory(); err == nil {
|
||||
stats.Memory.UsedBytes = vmem.Used
|
||||
stats.Memory.TotalBytes = vmem.Total
|
||||
stats.Memory.UsagePercent = vmem.UsedPercent
|
||||
}
|
||||
|
||||
// Go runtime memory
|
||||
var m runtime.MemStats
|
||||
runtime.ReadMemStats(&m)
|
||||
stats.Memory.HeapAlloc = m.HeapAlloc
|
||||
stats.Memory.HeapSys = m.HeapSys
|
||||
stats.Memory.HeapInuse = m.HeapInuse
|
||||
}
|
||||
|
||||
func (c *Collector) collectDisk(stats *SystemStats) {
|
||||
// Root filesystem stats
|
||||
if diskStat, err := disk.Usage("/"); err == nil {
|
||||
stats.Disk.UsedBytes = diskStat.Used
|
||||
stats.Disk.TotalBytes = diskStat.Total
|
||||
stats.Disk.FreeBytes = diskStat.Free
|
||||
stats.Disk.UsagePercent = diskStat.UsedPercent
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Collector) collectRuntime(stats *SystemStats) {
|
||||
var m runtime.MemStats
|
||||
runtime.ReadMemStats(&m)
|
||||
|
||||
stats.Runtime.Goroutines = runtime.NumGoroutine()
|
||||
stats.Runtime.NumGC = m.NumGC
|
||||
if m.NumGC > 0 {
|
||||
stats.Runtime.LastGCPause = m.PauseNs[(m.NumGC+255)%256]
|
||||
}
|
||||
stats.Runtime.Uptime = int64(time.Since(c.startTime).Seconds())
|
||||
}
|
||||
|
||||
func (c *Collector) collectAsynq() AsynqStats {
|
||||
stats := AsynqStats{
|
||||
Queues: make(map[string]QueueStats),
|
||||
}
|
||||
|
||||
if c.asynqClient == nil {
|
||||
return stats
|
||||
}
|
||||
|
||||
queues, err := c.asynqClient.Queues()
|
||||
if err != nil {
|
||||
log.Debug().Err(err).Msg("Failed to get asynq queues")
|
||||
return stats
|
||||
}
|
||||
|
||||
for _, qName := range queues {
|
||||
info, err := c.asynqClient.GetQueueInfo(qName)
|
||||
if err != nil {
|
||||
log.Debug().Err(err).Str("queue", qName).Msg("Failed to get queue info")
|
||||
continue
|
||||
}
|
||||
|
||||
stats.Queues[qName] = QueueStats{
|
||||
Pending: info.Pending,
|
||||
Active: info.Active,
|
||||
Scheduled: info.Scheduled,
|
||||
Retry: info.Retry,
|
||||
Archived: info.Archived,
|
||||
Completed: info.Completed,
|
||||
Failed: info.Failed,
|
||||
}
|
||||
}
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
// StartPublishing begins periodic stats collection and publishing to Redis
|
||||
func (c *Collector) StartPublishing(interval time.Duration) {
|
||||
go func() {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Collect immediately on start
|
||||
c.publishStats()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
c.publishStats()
|
||||
case <-c.stopChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *Collector) publishStats() {
|
||||
stats := c.Collect()
|
||||
if err := c.statsStore.StoreStats(stats); err != nil {
|
||||
log.Debug().Err(err).Str("process", c.process).Msg("Failed to publish stats to Redis")
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the stats publishing
|
||||
func (c *Collector) Stop() {
|
||||
close(c.stopChan)
|
||||
}
|
||||
203
internal/monitoring/handler.go
Normal file
203
internal/monitoring/handler.go
Normal file
@@ -0,0 +1,203 @@
|
||||
package monitoring
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
// Allow connections from admin panel
|
||||
return true
|
||||
},
|
||||
}
|
||||
|
||||
// Handler provides HTTP handlers for monitoring endpoints
|
||||
type Handler struct {
|
||||
logBuffer *LogBuffer
|
||||
statsStore *StatsStore
|
||||
}
|
||||
|
||||
// NewHandler creates a new monitoring handler
|
||||
func NewHandler(logBuffer *LogBuffer, statsStore *StatsStore) *Handler {
|
||||
return &Handler{
|
||||
logBuffer: logBuffer,
|
||||
statsStore: statsStore,
|
||||
}
|
||||
}
|
||||
|
||||
// GetLogs returns filtered log entries
|
||||
// GET /api/admin/monitoring/logs
|
||||
func (h *Handler) GetLogs(c *gin.Context) {
|
||||
var filters LogFilters
|
||||
if err := c.ShouldBindQuery(&filters); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid filters"})
|
||||
return
|
||||
}
|
||||
|
||||
limit := filters.GetLimit()
|
||||
|
||||
// Get more entries than requested for filtering
|
||||
entries, err := h.logBuffer.GetRecent(limit * 2)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to get logs from buffer")
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to retrieve logs"})
|
||||
return
|
||||
}
|
||||
|
||||
// Apply filters
|
||||
filtered := make([]LogEntry, 0, len(entries))
|
||||
for _, e := range entries {
|
||||
// Level filter
|
||||
if filters.Level != "" && e.Level != filters.Level {
|
||||
continue
|
||||
}
|
||||
|
||||
// Process filter
|
||||
if filters.Process != "" && e.Process != filters.Process {
|
||||
continue
|
||||
}
|
||||
|
||||
// Search filter (case-insensitive)
|
||||
if filters.Search != "" {
|
||||
searchLower := strings.ToLower(filters.Search)
|
||||
messageLower := strings.ToLower(e.Message)
|
||||
if !strings.Contains(messageLower, searchLower) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
filtered = append(filtered, e)
|
||||
if len(filtered) >= limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"logs": filtered,
|
||||
"total": len(filtered),
|
||||
})
|
||||
}
|
||||
|
||||
// GetStats returns system statistics for all processes
|
||||
// GET /api/admin/monitoring/stats
|
||||
func (h *Handler) GetStats(c *gin.Context) {
|
||||
allStats, err := h.statsStore.GetAllStats()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to get stats from store")
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to retrieve stats"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, allStats)
|
||||
}
|
||||
|
||||
// ClearLogs clears all logs from the buffer
|
||||
// DELETE /api/admin/monitoring/logs
|
||||
func (h *Handler) ClearLogs(c *gin.Context) {
|
||||
if err := h.logBuffer.Clear(); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to clear logs")
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to clear logs"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"message": "Logs cleared"})
|
||||
}
|
||||
|
||||
// WebSocket handles real-time log streaming
|
||||
// GET /api/admin/monitoring/ws
|
||||
func (h *Handler) WebSocket(c *gin.Context) {
|
||||
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to upgrade WebSocket connection")
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Create context that cancels when connection closes
|
||||
ctx, cancel := context.WithCancel(c.Request.Context())
|
||||
defer cancel()
|
||||
|
||||
// Subscribe to Redis pubsub for logs
|
||||
pubsub := h.logBuffer.Subscribe(ctx)
|
||||
defer pubsub.Close()
|
||||
|
||||
// Handle incoming messages (for filter changes, ping, etc.)
|
||||
var wsMu sync.Mutex
|
||||
go func() {
|
||||
for {
|
||||
_, _, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Stream logs from pubsub
|
||||
ch := pubsub.Channel()
|
||||
statsTicker := time.NewTicker(5 * time.Second)
|
||||
defer statsTicker.Stop()
|
||||
|
||||
// Send initial stats
|
||||
h.sendStats(conn, &wsMu)
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg := <-ch:
|
||||
// Parse log entry
|
||||
var entry LogEntry
|
||||
if err := json.Unmarshal([]byte(msg.Payload), &entry); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Send log message
|
||||
wsMsg := WSMessage{
|
||||
Type: WSMessageTypeLog,
|
||||
Data: entry,
|
||||
}
|
||||
|
||||
wsMu.Lock()
|
||||
err := conn.WriteJSON(wsMsg)
|
||||
wsMu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
log.Debug().Err(err).Msg("WebSocket write error")
|
||||
return
|
||||
}
|
||||
|
||||
case <-statsTicker.C:
|
||||
// Send periodic stats update
|
||||
h.sendStats(conn, &wsMu)
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) sendStats(conn *websocket.Conn, mu *sync.Mutex) {
|
||||
allStats, err := h.statsStore.GetAllStats()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
wsMsg := WSMessage{
|
||||
Type: WSMessageTypeStats,
|
||||
Data: allStats,
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
conn.WriteJSON(wsMsg)
|
||||
mu.Unlock()
|
||||
}
|
||||
215
internal/monitoring/middleware.go
Normal file
215
internal/monitoring/middleware.go
Normal file
@@ -0,0 +1,215 @@
|
||||
package monitoring
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// HTTPStatsCollector collects HTTP request metrics
|
||||
type HTTPStatsCollector struct {
|
||||
mu sync.RWMutex
|
||||
requests map[string]int64 // endpoint -> count
|
||||
totalLatency map[string]time.Duration // endpoint -> total latency
|
||||
errors map[string]int64 // endpoint -> error count
|
||||
byStatus map[int]int64 // status code -> count
|
||||
latencies []latencySample // recent latency samples for P95
|
||||
startTime time.Time
|
||||
lastReset time.Time
|
||||
}
|
||||
|
||||
type latencySample struct {
|
||||
endpoint string
|
||||
latency time.Duration
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
const (
|
||||
maxLatencySamples = 1000
|
||||
maxEndpoints = 200 // Cap unique endpoints tracked
|
||||
statsResetPeriod = 1 * time.Hour // Reset stats periodically to prevent unbounded growth
|
||||
)
|
||||
|
||||
// NewHTTPStatsCollector creates a new HTTP stats collector
|
||||
func NewHTTPStatsCollector() *HTTPStatsCollector {
|
||||
now := time.Now()
|
||||
return &HTTPStatsCollector{
|
||||
requests: make(map[string]int64),
|
||||
totalLatency: make(map[string]time.Duration),
|
||||
errors: make(map[string]int64),
|
||||
byStatus: make(map[int]int64),
|
||||
latencies: make([]latencySample, 0, maxLatencySamples),
|
||||
startTime: now,
|
||||
lastReset: now,
|
||||
}
|
||||
}
|
||||
|
||||
// Record records a single HTTP request
|
||||
func (c *HTTPStatsCollector) Record(endpoint string, latency time.Duration, status int) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// Periodically reset to prevent unbounded memory growth
|
||||
if time.Since(c.lastReset) > statsResetPeriod {
|
||||
c.resetLocked()
|
||||
}
|
||||
|
||||
// Check if we've hit the endpoint limit and this is a new endpoint
|
||||
if _, exists := c.requests[endpoint]; !exists && len(c.requests) >= maxEndpoints {
|
||||
// Use a catch-all bucket for overflow endpoints
|
||||
endpoint = "OTHER"
|
||||
}
|
||||
|
||||
c.requests[endpoint]++
|
||||
c.totalLatency[endpoint] += latency
|
||||
c.byStatus[status]++
|
||||
|
||||
if status >= 400 {
|
||||
c.errors[endpoint]++
|
||||
}
|
||||
|
||||
// Store latency sample
|
||||
c.latencies = append(c.latencies, latencySample{
|
||||
endpoint: endpoint,
|
||||
latency: latency,
|
||||
timestamp: time.Now(),
|
||||
})
|
||||
|
||||
// Keep only recent samples
|
||||
if len(c.latencies) > maxLatencySamples {
|
||||
c.latencies = c.latencies[len(c.latencies)-maxLatencySamples:]
|
||||
}
|
||||
}
|
||||
|
||||
// resetLocked resets stats while holding the lock
|
||||
func (c *HTTPStatsCollector) resetLocked() {
|
||||
c.requests = make(map[string]int64)
|
||||
c.totalLatency = make(map[string]time.Duration)
|
||||
c.errors = make(map[string]int64)
|
||||
c.byStatus = make(map[int]int64)
|
||||
c.latencies = make([]latencySample, 0, maxLatencySamples)
|
||||
c.lastReset = time.Now()
|
||||
// Keep startTime for uptime calculation
|
||||
}
|
||||
|
||||
// GetStats returns the current HTTP statistics
|
||||
func (c *HTTPStatsCollector) GetStats() HTTPStats {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
stats := HTTPStats{
|
||||
ByEndpoint: make(map[string]EndpointStats),
|
||||
ByStatusCode: make(map[int]int64),
|
||||
}
|
||||
|
||||
var totalRequests int64
|
||||
var totalErrors int64
|
||||
var totalLatency time.Duration
|
||||
|
||||
for endpoint, count := range c.requests {
|
||||
avgLatency := c.totalLatency[endpoint] / time.Duration(count)
|
||||
errCount := c.errors[endpoint]
|
||||
errRate := float64(0)
|
||||
if count > 0 {
|
||||
errRate = float64(errCount) / float64(count)
|
||||
}
|
||||
|
||||
stats.ByEndpoint[endpoint] = EndpointStats{
|
||||
Count: count,
|
||||
AvgLatencyMs: float64(avgLatency.Milliseconds()),
|
||||
ErrorRate: errRate,
|
||||
P95LatencyMs: c.calculateP95(endpoint),
|
||||
}
|
||||
|
||||
totalRequests += count
|
||||
totalErrors += errCount
|
||||
totalLatency += c.totalLatency[endpoint]
|
||||
}
|
||||
|
||||
// Copy status code counts
|
||||
for status, count := range c.byStatus {
|
||||
stats.ByStatusCode[status] = count
|
||||
}
|
||||
|
||||
stats.RequestsTotal = totalRequests
|
||||
if totalRequests > 0 {
|
||||
stats.AvgLatencyMs = float64(totalLatency.Milliseconds()) / float64(totalRequests)
|
||||
stats.ErrorRate = float64(totalErrors) / float64(totalRequests)
|
||||
}
|
||||
|
||||
uptime := time.Since(c.startTime).Minutes()
|
||||
if uptime > 0 {
|
||||
stats.RequestsPerMinute = float64(totalRequests) / uptime
|
||||
}
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
// calculateP95 calculates the 95th percentile latency for an endpoint
|
||||
// Must be called with read lock held
|
||||
func (c *HTTPStatsCollector) calculateP95(endpoint string) float64 {
|
||||
var endpointLatencies []time.Duration
|
||||
|
||||
for _, sample := range c.latencies {
|
||||
if sample.endpoint == endpoint {
|
||||
endpointLatencies = append(endpointLatencies, sample.latency)
|
||||
}
|
||||
}
|
||||
|
||||
if len(endpointLatencies) == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Sort latencies
|
||||
sort.Slice(endpointLatencies, func(i, j int) bool {
|
||||
return endpointLatencies[i] < endpointLatencies[j]
|
||||
})
|
||||
|
||||
// Calculate P95 index
|
||||
p95Index := int(float64(len(endpointLatencies)) * 0.95)
|
||||
if p95Index >= len(endpointLatencies) {
|
||||
p95Index = len(endpointLatencies) - 1
|
||||
}
|
||||
|
||||
return float64(endpointLatencies[p95Index].Milliseconds())
|
||||
}
|
||||
|
||||
// Reset clears all collected stats
|
||||
func (c *HTTPStatsCollector) Reset() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.requests = make(map[string]int64)
|
||||
c.totalLatency = make(map[string]time.Duration)
|
||||
c.errors = make(map[string]int64)
|
||||
c.byStatus = make(map[int]int64)
|
||||
c.latencies = make([]latencySample, 0, maxLatencySamples)
|
||||
c.startTime = time.Now()
|
||||
}
|
||||
|
||||
// MetricsMiddleware returns a Gin middleware that collects request metrics
|
||||
func MetricsMiddleware(collector *HTTPStatsCollector) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
start := time.Now()
|
||||
|
||||
// Process request
|
||||
c.Next()
|
||||
|
||||
// Calculate latency
|
||||
latency := time.Since(start)
|
||||
|
||||
// Get endpoint pattern (use route path, fallback to actual path)
|
||||
endpoint := c.FullPath()
|
||||
if endpoint == "" {
|
||||
endpoint = c.Request.URL.Path
|
||||
}
|
||||
|
||||
// Combine method with path for unique endpoint identification
|
||||
endpoint = c.Request.Method + " " + endpoint
|
||||
|
||||
// Record metrics
|
||||
collector.Record(endpoint, latency, c.Writer.Status())
|
||||
}
|
||||
}
|
||||
128
internal/monitoring/models.go
Normal file
128
internal/monitoring/models.go
Normal file
@@ -0,0 +1,128 @@
|
||||
package monitoring
|
||||
|
||||
import "time"
|
||||
|
||||
// LogEntry represents a single log entry captured from zerolog
|
||||
type LogEntry struct {
|
||||
ID string `json:"id"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Level string `json:"level"` // debug, info, warn, error, fatal
|
||||
Message string `json:"message"`
|
||||
Caller string `json:"caller"` // file:line
|
||||
Process string `json:"process"` // "api" or "worker"
|
||||
Fields map[string]any `json:"fields"` // Additional structured fields
|
||||
}
|
||||
|
||||
// SystemStats contains all system and runtime statistics
|
||||
type SystemStats struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Process string `json:"process"`
|
||||
CPU CPUStats `json:"cpu"`
|
||||
Memory MemoryStats `json:"memory"`
|
||||
Disk DiskStats `json:"disk"`
|
||||
Runtime RuntimeStats `json:"runtime"`
|
||||
HTTP *HTTPStats `json:"http,omitempty"` // API only
|
||||
Asynq *AsynqStats `json:"asynq,omitempty"` // Worker only
|
||||
}
|
||||
|
||||
// CPUStats contains CPU usage information
|
||||
type CPUStats struct {
|
||||
UsagePercent float64 `json:"usage_percent"`
|
||||
NumCPU int `json:"num_cpu"`
|
||||
LoadAvg1 float64 `json:"load_avg_1"`
|
||||
LoadAvg5 float64 `json:"load_avg_5"`
|
||||
LoadAvg15 float64 `json:"load_avg_15"`
|
||||
}
|
||||
|
||||
// MemoryStats contains both system and Go runtime memory info
|
||||
type MemoryStats struct {
|
||||
// System memory
|
||||
UsedBytes uint64 `json:"used_bytes"`
|
||||
TotalBytes uint64 `json:"total_bytes"`
|
||||
UsagePercent float64 `json:"usage_percent"`
|
||||
// Go heap
|
||||
HeapAlloc uint64 `json:"heap_alloc"`
|
||||
HeapSys uint64 `json:"heap_sys"`
|
||||
HeapInuse uint64 `json:"heap_inuse"`
|
||||
}
|
||||
|
||||
// DiskStats contains disk usage information
|
||||
type DiskStats struct {
|
||||
UsedBytes uint64 `json:"used_bytes"`
|
||||
TotalBytes uint64 `json:"total_bytes"`
|
||||
FreeBytes uint64 `json:"free_bytes"`
|
||||
UsagePercent float64 `json:"usage_percent"`
|
||||
}
|
||||
|
||||
// RuntimeStats contains Go runtime information
|
||||
type RuntimeStats struct {
|
||||
Goroutines int `json:"goroutines"`
|
||||
NumGC uint32 `json:"num_gc"`
|
||||
LastGCPause uint64 `json:"last_gc_pause_ns"`
|
||||
Uptime int64 `json:"uptime_seconds"`
|
||||
}
|
||||
|
||||
// HTTPStats contains HTTP request metrics (API server only)
|
||||
type HTTPStats struct {
|
||||
RequestsTotal int64 `json:"requests_total"`
|
||||
RequestsPerMinute float64 `json:"requests_per_minute"`
|
||||
AvgLatencyMs float64 `json:"avg_latency_ms"`
|
||||
ErrorRate float64 `json:"error_rate"`
|
||||
ByEndpoint map[string]EndpointStats `json:"by_endpoint"`
|
||||
ByStatusCode map[int]int64 `json:"by_status_code"`
|
||||
}
|
||||
|
||||
// EndpointStats contains per-endpoint HTTP metrics
|
||||
type EndpointStats struct {
|
||||
Count int64 `json:"count"`
|
||||
AvgLatencyMs float64 `json:"avg_latency_ms"`
|
||||
P95LatencyMs float64 `json:"p95_latency_ms"`
|
||||
ErrorRate float64 `json:"error_rate"`
|
||||
}
|
||||
|
||||
// AsynqStats contains Asynq job queue metrics (Worker only)
|
||||
type AsynqStats struct {
|
||||
Queues map[string]QueueStats `json:"queues"`
|
||||
}
|
||||
|
||||
// QueueStats contains stats for a single Asynq queue
|
||||
type QueueStats struct {
|
||||
Pending int `json:"pending"`
|
||||
Active int `json:"active"`
|
||||
Scheduled int `json:"scheduled"`
|
||||
Retry int `json:"retry"`
|
||||
Archived int `json:"archived"`
|
||||
Completed int `json:"completed"`
|
||||
Failed int `json:"failed"`
|
||||
}
|
||||
|
||||
// LogFilters for querying logs
|
||||
type LogFilters struct {
|
||||
Level string `form:"level"`
|
||||
Process string `form:"process"`
|
||||
Search string `form:"search"`
|
||||
Limit int `form:"limit,default=100"`
|
||||
}
|
||||
|
||||
// GetLimit returns the limit with bounds checking
|
||||
func (f *LogFilters) GetLimit() int {
|
||||
if f.Limit <= 0 {
|
||||
return 100
|
||||
}
|
||||
if f.Limit > 1000 {
|
||||
return 1000
|
||||
}
|
||||
return f.Limit
|
||||
}
|
||||
|
||||
// WebSocket message types
|
||||
const (
|
||||
WSMessageTypeLog = "log"
|
||||
WSMessageTypeStats = "stats"
|
||||
)
|
||||
|
||||
// WSMessage wraps messages sent over WebSocket
|
||||
type WSMessage struct {
|
||||
Type string `json:"type"` // "log" or "stats"
|
||||
Data any `json:"data"`
|
||||
}
|
||||
194
internal/monitoring/service.go
Normal file
194
internal/monitoring/service.go
Normal file
@@ -0,0 +1,194 @@
|
||||
package monitoring
|
||||
|
||||
import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/rs/zerolog/log"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"github.com/treytartt/casera-api/internal/models"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultStatsInterval is the default interval for collecting/publishing stats
|
||||
DefaultStatsInterval = 5 * time.Second
|
||||
// SettingsSyncInterval is how often to check the database for enable_monitoring setting
|
||||
SettingsSyncInterval = 30 * time.Second
|
||||
)
|
||||
|
||||
// Service orchestrates all monitoring components
|
||||
type Service struct {
|
||||
process string
|
||||
logBuffer *LogBuffer
|
||||
statsStore *StatsStore
|
||||
collector *Collector
|
||||
httpCollector *HTTPStatsCollector
|
||||
handler *Handler
|
||||
logWriter *RedisLogWriter
|
||||
db *gorm.DB
|
||||
settingsStopCh chan struct{}
|
||||
}
|
||||
|
||||
// Config holds configuration for the monitoring service
|
||||
type Config struct {
|
||||
Process string // "api" or "worker"
|
||||
RedisClient *redis.Client // Redis client for log buffer
|
||||
StatsInterval time.Duration // Interval for stats collection (default 5s)
|
||||
DB *gorm.DB // Database for checking enable_monitoring setting (optional)
|
||||
}
|
||||
|
||||
// NewService creates a new monitoring service
|
||||
func NewService(cfg Config) *Service {
|
||||
if cfg.StatsInterval == 0 {
|
||||
cfg.StatsInterval = DefaultStatsInterval
|
||||
}
|
||||
|
||||
// Create components
|
||||
logBuffer := NewLogBuffer(cfg.RedisClient)
|
||||
statsStore := NewStatsStore(cfg.RedisClient)
|
||||
collector := NewCollector(cfg.Process, statsStore)
|
||||
handler := NewHandler(logBuffer, statsStore)
|
||||
logWriter := NewRedisLogWriter(logBuffer, cfg.Process)
|
||||
|
||||
// For API server, create HTTP stats collector
|
||||
var httpCollector *HTTPStatsCollector
|
||||
if cfg.Process == "api" {
|
||||
httpCollector = NewHTTPStatsCollector()
|
||||
collector.SetHTTPCollector(httpCollector)
|
||||
}
|
||||
|
||||
svc := &Service{
|
||||
process: cfg.Process,
|
||||
logBuffer: logBuffer,
|
||||
statsStore: statsStore,
|
||||
collector: collector,
|
||||
httpCollector: httpCollector,
|
||||
handler: handler,
|
||||
logWriter: logWriter,
|
||||
db: cfg.DB,
|
||||
settingsStopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Check initial setting from database
|
||||
if cfg.DB != nil {
|
||||
svc.syncSettingsFromDB()
|
||||
}
|
||||
|
||||
return svc
|
||||
}
|
||||
|
||||
// SetAsynqInspector sets the Asynq inspector for worker stats
|
||||
func (s *Service) SetAsynqInspector(inspector *asynq.Inspector) {
|
||||
s.collector.SetAsynqInspector(inspector)
|
||||
}
|
||||
|
||||
// Start begins collecting and publishing stats
|
||||
func (s *Service) Start() {
|
||||
log.Info().
|
||||
Str("process", s.process).
|
||||
Dur("interval", DefaultStatsInterval).
|
||||
Bool("enabled", s.logWriter.IsEnabled()).
|
||||
Msg("Starting monitoring service")
|
||||
|
||||
s.collector.StartPublishing(DefaultStatsInterval)
|
||||
|
||||
// Start settings sync if database is available
|
||||
if s.db != nil {
|
||||
go s.startSettingsSync()
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the monitoring service
|
||||
func (s *Service) Stop() {
|
||||
// Stop settings sync
|
||||
close(s.settingsStopCh)
|
||||
|
||||
s.collector.Stop()
|
||||
log.Info().Str("process", s.process).Msg("Monitoring service stopped")
|
||||
}
|
||||
|
||||
// syncSettingsFromDB checks the database for the enable_monitoring setting
|
||||
func (s *Service) syncSettingsFromDB() {
|
||||
if s.db == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var settings models.SubscriptionSettings
|
||||
err := s.db.First(&settings, 1).Error
|
||||
if err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
// No settings record, default to enabled
|
||||
s.logWriter.SetEnabled(true)
|
||||
}
|
||||
// On other errors, keep current state
|
||||
return
|
||||
}
|
||||
|
||||
wasEnabled := s.logWriter.IsEnabled()
|
||||
s.logWriter.SetEnabled(settings.EnableMonitoring)
|
||||
|
||||
if wasEnabled != settings.EnableMonitoring {
|
||||
log.Info().
|
||||
Str("process", s.process).
|
||||
Bool("enabled", settings.EnableMonitoring).
|
||||
Msg("Monitoring log capture setting changed")
|
||||
}
|
||||
}
|
||||
|
||||
// startSettingsSync periodically checks the database for settings changes
|
||||
func (s *Service) startSettingsSync() {
|
||||
ticker := time.NewTicker(SettingsSyncInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.settingsStopCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.syncSettingsFromDB()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SetEnabled manually enables or disables log capture
|
||||
func (s *Service) SetEnabled(enabled bool) {
|
||||
s.logWriter.SetEnabled(enabled)
|
||||
}
|
||||
|
||||
// IsEnabled returns whether log capture is enabled
|
||||
func (s *Service) IsEnabled() bool {
|
||||
return s.logWriter.IsEnabled()
|
||||
}
|
||||
|
||||
// SetDB sets the database connection for settings sync
|
||||
// This can be called after NewService if DB wasn't available during initialization
|
||||
func (s *Service) SetDB(db *gorm.DB) {
|
||||
s.db = db
|
||||
s.syncSettingsFromDB()
|
||||
}
|
||||
|
||||
// LogWriter returns an io.Writer for zerolog that captures logs to Redis
|
||||
func (s *Service) LogWriter() io.Writer {
|
||||
return s.logWriter
|
||||
}
|
||||
|
||||
// Handler returns the HTTP handler for monitoring endpoints
|
||||
func (s *Service) Handler() *Handler {
|
||||
return s.handler
|
||||
}
|
||||
|
||||
// HTTPCollector returns the HTTP stats collector (nil for worker)
|
||||
func (s *Service) HTTPCollector() *HTTPStatsCollector {
|
||||
return s.httpCollector
|
||||
}
|
||||
|
||||
// MetricsMiddleware returns the Gin middleware for HTTP metrics (API server only)
|
||||
func (s *Service) MetricsMiddleware() interface{} {
|
||||
if s.httpCollector == nil {
|
||||
return nil
|
||||
}
|
||||
return MetricsMiddleware(s.httpCollector)
|
||||
}
|
||||
95
internal/monitoring/writer.go
Normal file
95
internal/monitoring/writer.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package monitoring
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// RedisLogWriter implements io.Writer to capture zerolog output to Redis
|
||||
type RedisLogWriter struct {
|
||||
buffer *LogBuffer
|
||||
process string
|
||||
enabled atomic.Bool
|
||||
}
|
||||
|
||||
// NewRedisLogWriter creates a new writer that captures logs to Redis
|
||||
func NewRedisLogWriter(buffer *LogBuffer, process string) *RedisLogWriter {
|
||||
w := &RedisLogWriter{
|
||||
buffer: buffer,
|
||||
process: process,
|
||||
}
|
||||
w.enabled.Store(true) // enabled by default
|
||||
return w
|
||||
}
|
||||
|
||||
// SetEnabled enables or disables log capture to Redis
|
||||
func (w *RedisLogWriter) SetEnabled(enabled bool) {
|
||||
w.enabled.Store(enabled)
|
||||
}
|
||||
|
||||
// IsEnabled returns whether log capture is enabled
|
||||
func (w *RedisLogWriter) IsEnabled() bool {
|
||||
return w.enabled.Load()
|
||||
}
|
||||
|
||||
// Write implements io.Writer interface
|
||||
// It parses zerolog JSON output and writes to Redis asynchronously
|
||||
func (w *RedisLogWriter) Write(p []byte) (n int, err error) {
|
||||
// Skip if monitoring is disabled
|
||||
if !w.enabled.Load() {
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// Parse zerolog JSON output
|
||||
var raw map[string]any
|
||||
if err := json.Unmarshal(p, &raw); err != nil {
|
||||
// Not valid JSON, skip (could be console writer output)
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// Build log entry
|
||||
entry := LogEntry{
|
||||
ID: uuid.NewString(),
|
||||
Timestamp: time.Now().UTC(),
|
||||
Process: w.process,
|
||||
Fields: make(map[string]any),
|
||||
}
|
||||
|
||||
// Extract standard zerolog fields
|
||||
if lvl, ok := raw["level"].(string); ok {
|
||||
entry.Level = lvl
|
||||
}
|
||||
if msg, ok := raw["message"].(string); ok {
|
||||
entry.Message = msg
|
||||
}
|
||||
if caller, ok := raw["caller"].(string); ok {
|
||||
entry.Caller = caller
|
||||
}
|
||||
|
||||
// Extract timestamp if present (zerolog may include it)
|
||||
if ts, ok := raw["time"].(string); ok {
|
||||
if parsed, err := time.Parse(time.RFC3339, ts); err == nil {
|
||||
entry.Timestamp = parsed
|
||||
}
|
||||
}
|
||||
|
||||
// Copy additional fields (excluding standard ones)
|
||||
for k, v := range raw {
|
||||
switch k {
|
||||
case "level", "message", "caller", "time":
|
||||
// Skip standard fields
|
||||
default:
|
||||
entry.Fields[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
// Write to Redis asynchronously to avoid blocking
|
||||
go func() {
|
||||
_ = w.buffer.Push(entry) // Ignore errors to avoid blocking log output
|
||||
}()
|
||||
|
||||
return len(p), nil
|
||||
}
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/treytartt/casera-api/internal/handlers"
|
||||
"github.com/treytartt/casera-api/internal/i18n"
|
||||
"github.com/treytartt/casera-api/internal/middleware"
|
||||
"github.com/treytartt/casera-api/internal/monitoring"
|
||||
"github.com/treytartt/casera-api/internal/push"
|
||||
"github.com/treytartt/casera-api/internal/repositories"
|
||||
"github.com/treytartt/casera-api/internal/services"
|
||||
@@ -23,13 +24,14 @@ const Version = "2.0.0"
|
||||
|
||||
// Dependencies holds all dependencies needed by the router
|
||||
type Dependencies struct {
|
||||
DB *gorm.DB
|
||||
Cache *services.CacheService
|
||||
Config *config.Config
|
||||
EmailService *services.EmailService
|
||||
PDFService *services.PDFService
|
||||
PushClient *push.Client // Direct APNs/FCM client
|
||||
StorageService *services.StorageService
|
||||
DB *gorm.DB
|
||||
Cache *services.CacheService
|
||||
Config *config.Config
|
||||
EmailService *services.EmailService
|
||||
PDFService *services.PDFService
|
||||
PushClient *push.Client // Direct APNs/FCM client
|
||||
StorageService *services.StorageService
|
||||
MonitoringService *monitoring.Service
|
||||
}
|
||||
|
||||
// SetupRouter creates and configures the Gin router
|
||||
@@ -51,6 +53,13 @@ func SetupRouter(deps *Dependencies) *gin.Engine {
|
||||
r.Use(corsMiddleware(cfg))
|
||||
r.Use(i18n.Middleware())
|
||||
|
||||
// Monitoring metrics middleware (if monitoring is enabled)
|
||||
if deps.MonitoringService != nil {
|
||||
if metricsMiddleware := deps.MonitoringService.MetricsMiddleware(); metricsMiddleware != nil {
|
||||
r.Use(metricsMiddleware.(gin.HandlerFunc))
|
||||
}
|
||||
}
|
||||
|
||||
// Serve landing page static files (if static directory is configured)
|
||||
staticDir := cfg.Server.StaticDir
|
||||
if staticDir != "" {
|
||||
@@ -137,11 +146,17 @@ func SetupRouter(deps *Dependencies) *gin.Engine {
|
||||
mediaHandler = handlers.NewMediaHandler(documentRepo, taskRepo, residenceRepo, deps.StorageService)
|
||||
}
|
||||
|
||||
// Set up admin routes (separate auth system)
|
||||
// Set up admin routes with monitoring handler (if available)
|
||||
var monitoringHandler *monitoring.Handler
|
||||
if deps.MonitoringService != nil {
|
||||
monitoringHandler = deps.MonitoringService.Handler()
|
||||
}
|
||||
|
||||
adminDeps := &admin.Dependencies{
|
||||
EmailService: deps.EmailService,
|
||||
PushClient: deps.PushClient,
|
||||
OnboardingService: onboardingService,
|
||||
MonitoringHandler: monitoringHandler,
|
||||
}
|
||||
admin.SetupRoutes(r, deps.DB, cfg, adminDeps)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user