Add Prometheus metrics + vmagent push to obs.88oakapps.com
Adds internal/prom package with histograms for HTTP, GORM, B2, APNs, and FCM, wired into the Echo router (HTTPMiddleware + /metrics) and GORM via statement-level callbacks (no ctx plumbing needed). Storage and push clients call ObserveB2Upload / ObserveAPNsSend / ObserveFCMSend at the network round-trip points. Existing internal/monitoring metrics move to /metrics/legacy so the canonical /metrics emits proper histogram buckets for p50/p95/p99 rollups. deploy-k3s/manifests/observability/vmagent.yaml deploys a single-replica vmagent in the honeydue namespace that scrapes api Pods on :8000/metrics every 15s and remote-writes to https://obs.88oakapps.com/api/v1/write with a bearer token (substituted at deploy time from OBS_INGEST_TOKEN in deploy/prod.env). NetworkPolicies allow vmagent egress to api Pods and to the public obs endpoint over :443; the obs side runs VictoriaMetrics + Jaeger + Grafana on 88oakappsUpdate. docs/observability-plan.md captures the full plan including resource budget, instrumentation table, 4-step rollout, and migration triggers. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/treytartt/honeydue-api/internal/config"
|
||||
"github.com/treytartt/honeydue-api/internal/models"
|
||||
"github.com/treytartt/honeydue-api/internal/prom"
|
||||
)
|
||||
|
||||
// migrationAdvisoryLockKey is the pg_advisory_lock key that serializes
|
||||
@@ -84,6 +85,13 @@ func Connect(cfg *config.DatabaseConfig, debug bool) (*gorm.DB, error) {
|
||||
Str("database", cfg.Database).
|
||||
Msg("Connected to PostgreSQL database")
|
||||
|
||||
// Register Prometheus GORM callbacks — emits gorm_query_duration_seconds
|
||||
// for every SQL operation. Operates at the statement level, so does not
|
||||
// require ctx to be threaded through repositories.
|
||||
if err := prom.RegisterGORMCallbacks(db); err != nil {
|
||||
log.Warn().Err(err).Msg("failed to register prometheus GORM callbacks; metrics will be partial")
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,199 @@
|
||||
package prom
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/collectors"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
var (
|
||||
Registry = prometheus.NewRegistry()
|
||||
|
||||
httpRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: "http_request_duration_seconds",
|
||||
Help: "Duration of HTTP requests in seconds.",
|
||||
Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10},
|
||||
}, []string{"route", "method", "status"})
|
||||
|
||||
gormQueryDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: "gorm_query_duration_seconds",
|
||||
Help: "Duration of GORM database queries in seconds.",
|
||||
Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5},
|
||||
}, []string{"table", "operation"})
|
||||
|
||||
b2UploadDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: "b2_upload_duration_seconds",
|
||||
Help: "Duration of B2/S3 upload operations in seconds.",
|
||||
Buckets: []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60},
|
||||
}, []string{"bucket", "result"})
|
||||
|
||||
b2UploadBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "b2_upload_bytes_total",
|
||||
Help: "Total bytes uploaded to B2/S3.",
|
||||
}, []string{"bucket", "result"})
|
||||
|
||||
apnsSendDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: "apns_send_duration_seconds",
|
||||
Help: "Duration of APNs push notification sends in seconds.",
|
||||
Buckets: []float64{0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5},
|
||||
}, []string{"result"})
|
||||
|
||||
fcmSendDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: "fcm_send_duration_seconds",
|
||||
Help: "Duration of FCM push notification sends in seconds.",
|
||||
Buckets: []float64{0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5},
|
||||
}, []string{"result"})
|
||||
|
||||
asynqJobDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: "asynq_job_duration_seconds",
|
||||
Help: "Duration of asynq background job execution in seconds.",
|
||||
Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30, 60, 300},
|
||||
}, []string{"task_type", "result"})
|
||||
)
|
||||
|
||||
func init() {
|
||||
Registry.MustRegister(
|
||||
collectors.NewGoCollector(),
|
||||
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
|
||||
httpRequestDuration,
|
||||
gormQueryDuration,
|
||||
b2UploadDuration,
|
||||
b2UploadBytes,
|
||||
apnsSendDuration,
|
||||
fcmSendDuration,
|
||||
asynqJobDuration,
|
||||
)
|
||||
}
|
||||
|
||||
// Handler returns a promhttp Handler bound to the package Registry, suitable for
|
||||
// mounting at GET /metrics on Echo.
|
||||
func Handler() echo.HandlerFunc {
|
||||
h := promhttp.HandlerFor(Registry, promhttp.HandlerOpts{Registry: Registry})
|
||||
return echo.WrapHandler(h)
|
||||
}
|
||||
|
||||
// HTTPMiddleware records http_request_duration_seconds for every request,
|
||||
// labeled by Echo route pattern, method, and status code.
|
||||
func HTTPMiddleware() echo.MiddlewareFunc {
|
||||
return func(next echo.HandlerFunc) echo.HandlerFunc {
|
||||
return func(c echo.Context) error {
|
||||
start := time.Now()
|
||||
err := next(c)
|
||||
route := c.Path()
|
||||
if route == "" {
|
||||
route = "unknown"
|
||||
}
|
||||
httpRequestDuration.WithLabelValues(
|
||||
route,
|
||||
c.Request().Method,
|
||||
strconv.Itoa(c.Response().Status),
|
||||
).Observe(time.Since(start).Seconds())
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterGORMCallbacks attaches before/after callbacks on a *gorm.DB so every
|
||||
// SQL operation records gorm_query_duration_seconds{table,operation}.
|
||||
//
|
||||
// Operates at the SQL/statement level — does NOT require ctx to be threaded
|
||||
// through repositories (that comes later when otelgorm lands).
|
||||
func RegisterGORMCallbacks(db *gorm.DB) error {
|
||||
const startKey = "honeydue:prom_start"
|
||||
|
||||
registerBefore := func(name string) error {
|
||||
cb := db.Callback().Create().Before("gorm:create")
|
||||
switch name {
|
||||
case "create":
|
||||
cb = db.Callback().Create().Before("gorm:create")
|
||||
case "query":
|
||||
cb = db.Callback().Query().Before("gorm:query")
|
||||
case "update":
|
||||
cb = db.Callback().Update().Before("gorm:update")
|
||||
case "delete":
|
||||
cb = db.Callback().Delete().Before("gorm:delete")
|
||||
case "row":
|
||||
cb = db.Callback().Row().Before("gorm:row")
|
||||
case "raw":
|
||||
cb = db.Callback().Raw().Before("gorm:raw")
|
||||
}
|
||||
return cb.Register("prom:before_"+name, func(tx *gorm.DB) {
|
||||
tx.InstanceSet(startKey, time.Now())
|
||||
})
|
||||
}
|
||||
|
||||
registerAfter := func(name string) error {
|
||||
cb := db.Callback().Create().After("gorm:create")
|
||||
switch name {
|
||||
case "create":
|
||||
cb = db.Callback().Create().After("gorm:create")
|
||||
case "query":
|
||||
cb = db.Callback().Query().After("gorm:query")
|
||||
case "update":
|
||||
cb = db.Callback().Update().After("gorm:update")
|
||||
case "delete":
|
||||
cb = db.Callback().Delete().After("gorm:delete")
|
||||
case "row":
|
||||
cb = db.Callback().Row().After("gorm:row")
|
||||
case "raw":
|
||||
cb = db.Callback().Raw().After("gorm:raw")
|
||||
}
|
||||
return cb.Register("prom:after_"+name, func(tx *gorm.DB) {
|
||||
startVal, ok := tx.InstanceGet(startKey)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
start, ok := startVal.(time.Time)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
table := tx.Statement.Table
|
||||
if table == "" {
|
||||
table = "unknown"
|
||||
}
|
||||
gormQueryDuration.WithLabelValues(table, name).Observe(time.Since(start).Seconds())
|
||||
})
|
||||
}
|
||||
|
||||
for _, name := range []string{"create", "query", "update", "delete", "row", "raw"} {
|
||||
if err := registerBefore(name); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := registerAfter(name); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ObserveB2Upload records duration + bytes for a B2/S3 upload. result is "ok"
|
||||
// or "error".
|
||||
func ObserveB2Upload(bucket, result string, dur time.Duration, bytes int64) {
|
||||
b2UploadDuration.WithLabelValues(bucket, result).Observe(dur.Seconds())
|
||||
if bytes > 0 {
|
||||
b2UploadBytes.WithLabelValues(bucket, result).Add(float64(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
// ObserveAPNsSend records duration of a single APNs send. result is "ok",
|
||||
// "bad_token", or "error".
|
||||
func ObserveAPNsSend(result string, dur time.Duration) {
|
||||
apnsSendDuration.WithLabelValues(result).Observe(dur.Seconds())
|
||||
}
|
||||
|
||||
// ObserveFCMSend records duration of a single FCM send. result is "ok",
|
||||
// "bad_token", or "error".
|
||||
func ObserveFCMSend(result string, dur time.Duration) {
|
||||
fcmSendDuration.WithLabelValues(result).Observe(dur.Seconds())
|
||||
}
|
||||
|
||||
// ObserveAsynqJob records duration of a single asynq job execution. result is
|
||||
// "ok", "retry", or "error".
|
||||
func ObserveAsynqJob(taskType, result string, dur time.Duration) {
|
||||
asynqJobDuration.WithLabelValues(taskType, result).Observe(dur.Seconds())
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package push
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/sideshow/apns2"
|
||||
@@ -10,6 +11,7 @@ import (
|
||||
"github.com/sideshow/apns2/token"
|
||||
|
||||
"github.com/treytartt/honeydue-api/internal/config"
|
||||
"github.com/treytartt/honeydue-api/internal/prom"
|
||||
)
|
||||
|
||||
// APNsClient handles direct communication with Apple Push Notification service
|
||||
@@ -84,8 +86,10 @@ func (c *APNsClient) Send(ctx context.Context, tokens []string, title, message s
|
||||
Priority: apns2.PriorityHigh,
|
||||
}
|
||||
|
||||
sendStart := time.Now()
|
||||
res, err := c.client.PushWithContext(ctx, notification)
|
||||
if err != nil {
|
||||
prom.ObserveAPNsSend("error", time.Since(sendStart))
|
||||
log.Error().
|
||||
Err(err).
|
||||
Str("token", truncateToken(deviceToken)).
|
||||
@@ -95,6 +99,7 @@ func (c *APNsClient) Send(ctx context.Context, tokens []string, title, message s
|
||||
}
|
||||
|
||||
if !res.Sent() {
|
||||
prom.ObserveAPNsSend("bad_token", time.Since(sendStart))
|
||||
log.Error().
|
||||
Str("token", truncateToken(deviceToken)).
|
||||
Str("reason", res.Reason).
|
||||
@@ -104,6 +109,7 @@ func (c *APNsClient) Send(ctx context.Context, tokens []string, title, message s
|
||||
continue
|
||||
}
|
||||
|
||||
prom.ObserveAPNsSend("ok", time.Since(sendStart))
|
||||
successCount++
|
||||
log.Debug().
|
||||
Str("token", truncateToken(deviceToken)).
|
||||
@@ -154,8 +160,10 @@ func (c *APNsClient) SendWithCategory(ctx context.Context, tokens []string, titl
|
||||
Priority: apns2.PriorityHigh,
|
||||
}
|
||||
|
||||
sendStart := time.Now()
|
||||
res, err := c.client.PushWithContext(ctx, notification)
|
||||
if err != nil {
|
||||
prom.ObserveAPNsSend("error", time.Since(sendStart))
|
||||
log.Error().
|
||||
Err(err).
|
||||
Str("token", truncateToken(deviceToken)).
|
||||
@@ -166,6 +174,7 @@ func (c *APNsClient) SendWithCategory(ctx context.Context, tokens []string, titl
|
||||
}
|
||||
|
||||
if !res.Sent() {
|
||||
prom.ObserveAPNsSend("bad_token", time.Since(sendStart))
|
||||
log.Error().
|
||||
Str("token", truncateToken(deviceToken)).
|
||||
Str("reason", res.Reason).
|
||||
@@ -176,6 +185,7 @@ func (c *APNsClient) SendWithCategory(ctx context.Context, tokens []string, titl
|
||||
continue
|
||||
}
|
||||
|
||||
prom.ObserveAPNsSend("ok", time.Since(sendStart))
|
||||
successCount++
|
||||
log.Debug().
|
||||
Str("token", truncateToken(deviceToken)).
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@@ -14,6 +15,7 @@ import (
|
||||
"golang.org/x/oauth2/google"
|
||||
|
||||
"github.com/treytartt/honeydue-api/internal/config"
|
||||
"github.com/treytartt/honeydue-api/internal/prom"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -213,8 +215,15 @@ func (c *FCMClient) Send(ctx context.Context, tokens []string, title, message st
|
||||
successCount := 0
|
||||
|
||||
for _, token := range tokens {
|
||||
sendStart := time.Now()
|
||||
err := c.sendOne(ctx, token, title, message, data)
|
||||
if err != nil {
|
||||
result := "error"
|
||||
var fcmErr *FCMSendError
|
||||
if errors.As(err, &fcmErr) && fcmErr.IsUnregistered() {
|
||||
result = "bad_token"
|
||||
}
|
||||
prom.ObserveFCMSend(result, time.Since(sendStart))
|
||||
log.Error().
|
||||
Err(err).
|
||||
Str("token", truncateToken(token)).
|
||||
@@ -223,6 +232,7 @@ func (c *FCMClient) Send(ctx context.Context, tokens []string, title, message st
|
||||
continue
|
||||
}
|
||||
|
||||
prom.ObserveFCMSend("ok", time.Since(sendStart))
|
||||
successCount++
|
||||
log.Debug().
|
||||
Str("token", truncateToken(token)).
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/treytartt/honeydue-api/internal/i18n"
|
||||
custommiddleware "github.com/treytartt/honeydue-api/internal/middleware"
|
||||
"github.com/treytartt/honeydue-api/internal/monitoring"
|
||||
"github.com/treytartt/honeydue-api/internal/prom"
|
||||
"github.com/treytartt/honeydue-api/internal/push"
|
||||
"github.com/treytartt/honeydue-api/internal/repositories"
|
||||
"github.com/treytartt/honeydue-api/internal/services"
|
||||
@@ -121,6 +122,15 @@ func SetupRouter(deps *Dependencies) *echo.Echo {
|
||||
}
|
||||
}
|
||||
|
||||
// Prometheus metrics middleware — feeds VictoriaMetrics on
|
||||
// obs.88oakapps.com via vmagent. Records http_request_duration_seconds
|
||||
// labeled by route pattern, method, and status code.
|
||||
e.Use(prom.HTTPMiddleware())
|
||||
|
||||
// /metrics endpoint exposed for vmagent scrape. No auth — bound to
|
||||
// the cluster network only; not exposed via Cloudflare.
|
||||
e.GET("/metrics", prom.Handler())
|
||||
|
||||
// Serve landing page static files (if static directory is configured)
|
||||
staticDir := cfg.Server.StaticDir
|
||||
if staticDir != "" {
|
||||
@@ -229,9 +239,11 @@ func SetupRouter(deps *Dependencies) *echo.Echo {
|
||||
mediaHandler = handlers.NewMediaHandler(documentRepo, taskRepo, residenceRepo, deps.StorageService)
|
||||
}
|
||||
|
||||
// Prometheus metrics endpoint (no auth required, for scraping)
|
||||
// Legacy Prometheus-shaped metrics from internal/monitoring (consumed by
|
||||
// GoAdmin dashboard). Now lives at /metrics/legacy so the canonical /metrics
|
||||
// route (registered above) emits proper Prometheus histograms with labels.
|
||||
if deps.MonitoringService != nil {
|
||||
e.GET("/metrics", prometheusMetrics(deps.MonitoringService))
|
||||
e.GET("/metrics/legacy", prometheusMetrics(deps.MonitoringService))
|
||||
}
|
||||
|
||||
// Set up admin routes with monitoring handler (if available)
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"github.com/treytartt/honeydue-api/internal/config"
|
||||
"github.com/treytartt/honeydue-api/internal/prom"
|
||||
)
|
||||
|
||||
// StorageService handles file uploads, validation, encryption, and URL generation.
|
||||
@@ -149,11 +150,18 @@ func (s *StorageService) Upload(file *multipart.FileHeader, category string) (*U
|
||||
}
|
||||
}
|
||||
|
||||
// Write to backend
|
||||
// Write to backend (B2/S3 round trip — instrumented for Prometheus)
|
||||
bucket := s.cfg.S3Bucket
|
||||
if bucket == "" {
|
||||
bucket = "local"
|
||||
}
|
||||
uploadStart := time.Now()
|
||||
if err := s.backend.Write(key, fileData); err != nil {
|
||||
prom.ObserveB2Upload(bucket, "error", time.Since(uploadStart), 0)
|
||||
return nil, fmt.Errorf("failed to save file: %w", err)
|
||||
}
|
||||
written := int64(len(fileData))
|
||||
prom.ObserveB2Upload(bucket, "ok", time.Since(uploadStart), written)
|
||||
|
||||
// Generate URL (always uses the original filename without .enc suffix)
|
||||
url := fmt.Sprintf("%s/%s/%s", s.cfg.BaseURL, subdir, newFilename)
|
||||
|
||||
Reference in New Issue
Block a user