Security: - Replace all binding: tags with validate: + c.Validate() in admin handlers - Add rate limiting to auth endpoints (login, register, password reset) - Add security headers (HSTS, XSS protection, nosniff, frame options) - Wire Google Pub/Sub token verification into webhook handler - Replace ParseUnverified with proper OIDC/JWKS key verification - Verify inner Apple JWS signatures in webhook handler - Add io.LimitReader (1MB) to all webhook body reads - Add ownership verification to file deletion - Move hardcoded admin credentials to env vars - Add uniqueIndex to User.Email - Hide ConfirmationCode from JSON serialization - Mask confirmation codes in admin responses - Use http.DetectContentType for upload validation - Fix path traversal in storage service - Replace os.Getenv with Viper in stripe service - Sanitize Redis URLs before logging - Separate DEBUG_FIXED_CODES from DEBUG flag - Reject weak SECRET_KEY in production - Add host check on /_next/* proxy routes - Use explicit localhost CORS origins in debug mode - Replace err.Error() with generic messages in all admin error responses Critical fixes: - Rewrite FCM to HTTP v1 API with OAuth 2.0 service account auth - Fix user_customuser -> auth_user table names in raw SQL - Fix dashboard verified query to use UserProfile model - Add escapeLikeWildcards() to prevent SQL wildcard injection Bug fixes: - Add bounds checks for days/expiring_soon query params (1-3650) - Add receipt_data/transaction_id empty-check to RestoreSubscription - Change Active bool -> *bool in device handler - Check all unchecked GORM/FindByIDWithProfile errors - Add validation for notification hour fields (0-23) - Add max=10000 validation on task description updates Transactions & data integrity: - Wrap registration flow in transaction - Wrap QuickComplete in transaction - Move image creation inside completion transaction - Wrap SetSpecialties in transaction - Wrap GetOrCreateToken in transaction - Wrap completion+image deletion in transaction Performance: - Batch completion summaries (2 queries vs 2N) - Reuse single http.Client in IAP validation - Cache dashboard counts (30s TTL) - Batch COUNT queries in admin user list - Add Limit(500) to document queries - Add reminder_stage+due_date filters to reminder queries - Parse AllowedTypes once at init - In-memory user cache in auth middleware (30s TTL) - Timezone change detection cache - Optimize P95 with per-endpoint sorted buffers - Replace crypto/md5 with hash/fnv for ETags Code quality: - Add sync.Once to all monitoring Stop()/Close() methods - Replace 8 fmt.Printf with zerolog in auth service - Log previously discarded errors - Standardize delete response shapes - Route hardcoded English through i18n - Remove FileURL from DocumentResponse (keep MediaURL only) - Thread user timezone through kanban board responses - Initialize empty slices to prevent null JSON - Extract shared field map for task Update/UpdateTx - Delete unused SoftDeleteModel, min(), formatCron, legacy handlers Worker & jobs: - Wire Asynq email infrastructure into worker - Register HandleReminderLogCleanup with daily 3AM cron - Use per-user timezone in HandleSmartReminder - Replace direct DB queries with repository calls - Delete legacy reminder handlers (~200 lines) - Delete unused task type constants Dependencies: - Replace archived jung-kurt/gofpdf with go-pdf/fpdf - Replace unmaintained gomail.v2 with wneessen/go-mail - Add TODO for Echo jwt v3 transitive dep removal Test infrastructure: - Fix MakeRequest/SeedLookupData error handling - Replace os.Exit(0) with t.Skip() in scope/consistency tests - Add 11 new FCM v1 tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
204 lines
4.9 KiB
Go
204 lines
4.9 KiB
Go
package monitoring
|
|
|
|
import (
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/hibiken/asynq"
|
|
"github.com/rs/zerolog/log"
|
|
"github.com/shirou/gopsutil/v3/cpu"
|
|
"github.com/shirou/gopsutil/v3/disk"
|
|
"github.com/shirou/gopsutil/v3/load"
|
|
"github.com/shirou/gopsutil/v3/mem"
|
|
)
|
|
|
|
// Collector gathers system and runtime statistics
|
|
type Collector struct {
|
|
process string
|
|
startTime time.Time
|
|
statsStore *StatsStore
|
|
httpCollector *HTTPStatsCollector // nil for worker
|
|
asynqClient *asynq.Inspector // nil for api
|
|
stopChan chan struct{}
|
|
stopOnce sync.Once
|
|
}
|
|
|
|
// NewCollector creates a new stats collector
|
|
func NewCollector(process string, statsStore *StatsStore) *Collector {
|
|
return &Collector{
|
|
process: process,
|
|
startTime: time.Now(),
|
|
statsStore: statsStore,
|
|
stopChan: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// SetHTTPCollector sets the HTTP stats collector (for API server)
|
|
func (c *Collector) SetHTTPCollector(httpCollector *HTTPStatsCollector) {
|
|
c.httpCollector = httpCollector
|
|
}
|
|
|
|
// SetAsynqInspector sets the Asynq inspector (for Worker)
|
|
func (c *Collector) SetAsynqInspector(inspector *asynq.Inspector) {
|
|
c.asynqClient = inspector
|
|
}
|
|
|
|
// Collect gathers all system statistics
|
|
func (c *Collector) Collect() SystemStats {
|
|
stats := SystemStats{
|
|
Timestamp: time.Now().UTC(),
|
|
Process: c.process,
|
|
}
|
|
|
|
// CPU stats
|
|
c.collectCPU(&stats)
|
|
|
|
// Read Go runtime memory stats once (used by both memory and runtime collectors)
|
|
var memStats runtime.MemStats
|
|
runtime.ReadMemStats(&memStats)
|
|
|
|
// Memory stats (system + Go runtime)
|
|
c.collectMemory(&stats, &memStats)
|
|
|
|
// Disk stats
|
|
c.collectDisk(&stats)
|
|
|
|
// Go runtime stats
|
|
c.collectRuntime(&stats, &memStats)
|
|
|
|
// HTTP stats (API only)
|
|
if c.httpCollector != nil {
|
|
httpStats := c.httpCollector.GetStats()
|
|
stats.HTTP = &httpStats
|
|
}
|
|
|
|
// Asynq stats (Worker only)
|
|
if c.asynqClient != nil {
|
|
asynqStats := c.collectAsynq()
|
|
stats.Asynq = &asynqStats
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
func (c *Collector) collectCPU(stats *SystemStats) {
|
|
// Get CPU usage percentage (blocks for 200ms to sample)
|
|
// This is called periodically, so a shorter window is acceptable
|
|
if cpuPercent, err := cpu.Percent(200*time.Millisecond, false); err == nil && len(cpuPercent) > 0 {
|
|
stats.CPU.UsagePercent = cpuPercent[0]
|
|
}
|
|
|
|
stats.CPU.NumCPU = runtime.NumCPU()
|
|
|
|
// Load averages (Unix only, returns 0 on Windows)
|
|
if avg, err := load.Avg(); err == nil {
|
|
stats.CPU.LoadAvg1 = avg.Load1
|
|
stats.CPU.LoadAvg5 = avg.Load5
|
|
stats.CPU.LoadAvg15 = avg.Load15
|
|
}
|
|
}
|
|
|
|
func (c *Collector) collectMemory(stats *SystemStats, m *runtime.MemStats) {
|
|
// System memory
|
|
if vmem, err := mem.VirtualMemory(); err == nil {
|
|
stats.Memory.UsedBytes = vmem.Used
|
|
stats.Memory.TotalBytes = vmem.Total
|
|
stats.Memory.UsagePercent = vmem.UsedPercent
|
|
}
|
|
|
|
// Go runtime memory (reuses pre-read MemStats)
|
|
stats.Memory.HeapAlloc = m.HeapAlloc
|
|
stats.Memory.HeapSys = m.HeapSys
|
|
stats.Memory.HeapInuse = m.HeapInuse
|
|
}
|
|
|
|
func (c *Collector) collectDisk(stats *SystemStats) {
|
|
// Root filesystem stats
|
|
if diskStat, err := disk.Usage("/"); err == nil {
|
|
stats.Disk.UsedBytes = diskStat.Used
|
|
stats.Disk.TotalBytes = diskStat.Total
|
|
stats.Disk.FreeBytes = diskStat.Free
|
|
stats.Disk.UsagePercent = diskStat.UsedPercent
|
|
}
|
|
}
|
|
|
|
func (c *Collector) collectRuntime(stats *SystemStats, m *runtime.MemStats) {
|
|
stats.Runtime.Goroutines = runtime.NumGoroutine()
|
|
stats.Runtime.NumGC = m.NumGC
|
|
if m.NumGC > 0 {
|
|
stats.Runtime.LastGCPause = m.PauseNs[(m.NumGC+255)%256]
|
|
}
|
|
stats.Runtime.Uptime = int64(time.Since(c.startTime).Seconds())
|
|
}
|
|
|
|
func (c *Collector) collectAsynq() AsynqStats {
|
|
stats := AsynqStats{
|
|
Queues: make(map[string]QueueStats),
|
|
}
|
|
|
|
if c.asynqClient == nil {
|
|
return stats
|
|
}
|
|
|
|
queues, err := c.asynqClient.Queues()
|
|
if err != nil {
|
|
log.Debug().Err(err).Msg("Failed to get asynq queues")
|
|
return stats
|
|
}
|
|
|
|
for _, qName := range queues {
|
|
info, err := c.asynqClient.GetQueueInfo(qName)
|
|
if err != nil {
|
|
log.Debug().Err(err).Str("queue", qName).Msg("Failed to get queue info")
|
|
continue
|
|
}
|
|
|
|
stats.Queues[qName] = QueueStats{
|
|
Pending: info.Pending,
|
|
Active: info.Active,
|
|
Scheduled: info.Scheduled,
|
|
Retry: info.Retry,
|
|
Archived: info.Archived,
|
|
Completed: info.Completed,
|
|
Failed: info.Failed,
|
|
}
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
// StartPublishing begins periodic stats collection and publishing to Redis
|
|
func (c *Collector) StartPublishing(interval time.Duration) {
|
|
go func() {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
// Collect immediately on start
|
|
c.publishStats()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
c.publishStats()
|
|
case <-c.stopChan:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (c *Collector) publishStats() {
|
|
stats := c.Collect()
|
|
if err := c.statsStore.StoreStats(stats); err != nil {
|
|
log.Debug().Err(err).Str("process", c.process).Msg("Failed to publish stats to Redis")
|
|
}
|
|
}
|
|
|
|
// Stop stops the stats publishing. It is safe to call multiple times.
|
|
func (c *Collector) Stop() {
|
|
c.stopOnce.Do(func() {
|
|
close(c.stopChan)
|
|
})
|
|
}
|