Wire OpenTelemetry tracing — HTTP, B2, APNs, FCM, asynq, GORM (partial)
Step 1 — OTel SDK: cmd/api and cmd/worker initialize a tracer provider that exports OTLP/HTTP to obs.88oakapps.com (Jaeger all-in-one). Sampling is AlwaysSample in dev (DEBUG=true) and TraceIDRatioBased(0.1) in prod, overridable via OTEL_TRACES_SAMPLER_ARG. Service names are honeydue-api and honeydue-worker. otelecho.Middleware opens a span per HTTP request. Step 2 — Manual spans: storage_service.Upload now takes ctx and emits storage.upload + b2.PutObject spans (size_bytes, key, mime_type, bucket, result attrs). APNs Send/SendWithCategory and FCM sendOne emit per-token spans with topic, status_code, reason. Asynq middleware emits asynq.handle:<task_type> per job with retry/payload attrs and records asynq_job_duration_seconds. Step 3 — Database: otelgorm plugin registered in database.Connect, so any SQL emitted via db.WithContext(ctx) attaches to the request span. Every repository now exposes WithContext(ctx) *XRepository as the migration helper. TaskService.ListTasks and GetTasksByResidence are migrated end-to-end (ctx threaded through handler → service → repo); remaining services adopt the same pattern incrementally — pre-migration methods still emit untraced SQL via the unchanged db field. OBS_TRACES_URL and OBS_INGEST_TOKEN flow from deploy/prod.env → honeydue-secrets → api+worker Deployments via secretKeyRef (optional). 02-setup-secrets.sh sources them from prod.env on next run; manifests mark both env vars optional so the deployment rolls without traces if the secret is absent. ch15 observability doc now lists what produces spans today vs the remaining migration work, with the explicit per-method pattern. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -15,6 +15,8 @@ import (
|
||||
"github.com/treytartt/honeydue-api/internal/config"
|
||||
"github.com/treytartt/honeydue-api/internal/models"
|
||||
"github.com/treytartt/honeydue-api/internal/prom"
|
||||
|
||||
"github.com/uptrace/opentelemetry-go-extra/otelgorm"
|
||||
)
|
||||
|
||||
// migrationAdvisoryLockKey is the pg_advisory_lock key that serializes
|
||||
@@ -92,6 +94,15 @@ func Connect(cfg *config.DatabaseConfig, debug bool) (*gorm.DB, error) {
|
||||
log.Warn().Err(err).Msg("failed to register prometheus GORM callbacks; metrics will be partial")
|
||||
}
|
||||
|
||||
// Register otelgorm plugin — emits a span per SQL statement, attached to
|
||||
// whatever trace context is set via db.WithContext(ctx). Repositories that
|
||||
// have been migrated to use WithContext (see internal/repositories/*.go)
|
||||
// will produce nested SQL spans inside the request trace; pre-migration
|
||||
// repositories silently emit untraced queries.
|
||||
if err := db.Use(otelgorm.NewPlugin(otelgorm.WithDBName(cfg.Database))); err != nil {
|
||||
log.Warn().Err(err).Msg("failed to register otelgorm plugin; SQL spans disabled")
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -201,7 +201,7 @@ func (h *DocumentHandler) CreateDocument(c echo.Context) error {
|
||||
if h.storageService == nil {
|
||||
return apperrors.Internal(nil)
|
||||
}
|
||||
result, err := h.storageService.Upload(uploadedFile, "documents")
|
||||
result, err := h.storageService.Upload(c.Request().Context(), uploadedFile, "documents")
|
||||
if err != nil {
|
||||
return apperrors.BadRequest("error.failed_to_upload_file")
|
||||
}
|
||||
@@ -342,7 +342,7 @@ func (h *DocumentHandler) UploadDocumentImage(c echo.Context) error {
|
||||
return apperrors.Internal(nil)
|
||||
}
|
||||
|
||||
result, err := h.storageService.Upload(uploadedFile, "images")
|
||||
result, err := h.storageService.Upload(c.Request().Context(), uploadedFile, "images")
|
||||
if err != nil {
|
||||
return apperrors.BadRequest("error.failed_to_upload_file")
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ func (h *TaskHandler) ListTasks(c echo.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
response, err := h.taskService.ListTasks(user.ID, daysThreshold, userNow)
|
||||
response, err := h.taskService.ListTasks(c.Request().Context(), user.ID, daysThreshold, userNow)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -121,7 +121,7 @@ func (h *TaskHandler) GetTasksByResidence(c echo.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
response, err := h.taskService.GetTasksByResidence(uint(residenceID), user.ID, daysThreshold, userNow)
|
||||
response, err := h.taskService.GetTasksByResidence(c.Request().Context(), uint(residenceID), user.ID, daysThreshold, userNow)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -446,7 +446,7 @@ func (h *TaskHandler) CreateCompletion(c echo.Context) error {
|
||||
for _, fieldName := range []string{"images", "image", "photo", "files"} {
|
||||
files := c.Request().MultipartForm.File[fieldName]
|
||||
for _, file := range files {
|
||||
result, err := h.storageService.Upload(file, "completions")
|
||||
result, err := h.storageService.Upload(c.Request().Context(), file, "completions")
|
||||
if err != nil {
|
||||
return apperrors.BadRequest("error.failed_to_upload_image")
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ func (h *UploadHandler) UploadImage(c echo.Context) error {
|
||||
category = "images"
|
||||
}
|
||||
|
||||
result, err := h.storageService.Upload(file, category)
|
||||
result, err := h.storageService.Upload(c.Request().Context(), file, category)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -64,7 +64,7 @@ func (h *UploadHandler) UploadDocument(c echo.Context) error {
|
||||
return apperrors.BadRequest("error.no_file_provided")
|
||||
}
|
||||
|
||||
result, err := h.storageService.Upload(file, "documents")
|
||||
result, err := h.storageService.Upload(c.Request().Context(), file, "documents")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -80,7 +80,7 @@ func (h *UploadHandler) UploadCompletion(c echo.Context) error {
|
||||
return apperrors.BadRequest("error.no_file_provided")
|
||||
}
|
||||
|
||||
result, err := h.storageService.Upload(file, "completions")
|
||||
result, err := h.storageService.Upload(c.Request().Context(), file, "completions")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
+44
-2
@@ -9,11 +9,17 @@ import (
|
||||
"github.com/sideshow/apns2"
|
||||
"github.com/sideshow/apns2/payload"
|
||||
"github.com/sideshow/apns2/token"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/treytartt/honeydue-api/internal/config"
|
||||
"github.com/treytartt/honeydue-api/internal/prom"
|
||||
"github.com/treytartt/honeydue-api/internal/tracing"
|
||||
)
|
||||
|
||||
var apnsTracer = tracing.Tracer("honeydue/push/apns")
|
||||
|
||||
// APNsClient handles direct communication with Apple Push Notification service
|
||||
type APNsClient struct {
|
||||
client *apns2.Client
|
||||
@@ -86,10 +92,20 @@ func (c *APNsClient) Send(ctx context.Context, tokens []string, title, message s
|
||||
Priority: apns2.PriorityHigh,
|
||||
}
|
||||
|
||||
sendCtx, span := apnsTracer.Start(ctx, "apns.send",
|
||||
trace.WithAttributes(
|
||||
attribute.String("apns.topic", c.topic),
|
||||
attribute.String("apns.token", truncateToken(deviceToken)),
|
||||
attribute.String("apns.priority", "high"),
|
||||
),
|
||||
)
|
||||
sendStart := time.Now()
|
||||
res, err := c.client.PushWithContext(ctx, notification)
|
||||
res, err := c.client.PushWithContext(sendCtx, notification)
|
||||
if err != nil {
|
||||
prom.ObserveAPNsSend("error", time.Since(sendStart))
|
||||
span.SetStatus(codes.Error, "push failed")
|
||||
span.RecordError(err)
|
||||
span.End()
|
||||
log.Error().
|
||||
Err(err).
|
||||
Str("token", truncateToken(deviceToken)).
|
||||
@@ -100,6 +116,12 @@ func (c *APNsClient) Send(ctx context.Context, tokens []string, title, message s
|
||||
|
||||
if !res.Sent() {
|
||||
prom.ObserveAPNsSend("bad_token", time.Since(sendStart))
|
||||
span.SetAttributes(
|
||||
attribute.Int("apns.status_code", res.StatusCode),
|
||||
attribute.String("apns.reason", res.Reason),
|
||||
)
|
||||
span.SetStatus(codes.Error, "bad token")
|
||||
span.End()
|
||||
log.Error().
|
||||
Str("token", truncateToken(deviceToken)).
|
||||
Str("reason", res.Reason).
|
||||
@@ -110,6 +132,8 @@ func (c *APNsClient) Send(ctx context.Context, tokens []string, title, message s
|
||||
}
|
||||
|
||||
prom.ObserveAPNsSend("ok", time.Since(sendStart))
|
||||
span.SetAttributes(attribute.String("apns.id", res.ApnsID))
|
||||
span.End()
|
||||
successCount++
|
||||
log.Debug().
|
||||
Str("token", truncateToken(deviceToken)).
|
||||
@@ -160,10 +184,20 @@ func (c *APNsClient) SendWithCategory(ctx context.Context, tokens []string, titl
|
||||
Priority: apns2.PriorityHigh,
|
||||
}
|
||||
|
||||
sendCtx, span := apnsTracer.Start(ctx, "apns.send.category",
|
||||
trace.WithAttributes(
|
||||
attribute.String("apns.topic", c.topic),
|
||||
attribute.String("apns.token", truncateToken(deviceToken)),
|
||||
attribute.String("apns.category_id", categoryID),
|
||||
),
|
||||
)
|
||||
sendStart := time.Now()
|
||||
res, err := c.client.PushWithContext(ctx, notification)
|
||||
res, err := c.client.PushWithContext(sendCtx, notification)
|
||||
if err != nil {
|
||||
prom.ObserveAPNsSend("error", time.Since(sendStart))
|
||||
span.SetStatus(codes.Error, "push failed")
|
||||
span.RecordError(err)
|
||||
span.End()
|
||||
log.Error().
|
||||
Err(err).
|
||||
Str("token", truncateToken(deviceToken)).
|
||||
@@ -175,6 +209,12 @@ func (c *APNsClient) SendWithCategory(ctx context.Context, tokens []string, titl
|
||||
|
||||
if !res.Sent() {
|
||||
prom.ObserveAPNsSend("bad_token", time.Since(sendStart))
|
||||
span.SetAttributes(
|
||||
attribute.Int("apns.status_code", res.StatusCode),
|
||||
attribute.String("apns.reason", res.Reason),
|
||||
)
|
||||
span.SetStatus(codes.Error, "bad token")
|
||||
span.End()
|
||||
log.Error().
|
||||
Str("token", truncateToken(deviceToken)).
|
||||
Str("reason", res.Reason).
|
||||
@@ -186,6 +226,8 @@ func (c *APNsClient) SendWithCategory(ctx context.Context, tokens []string, titl
|
||||
}
|
||||
|
||||
prom.ObserveAPNsSend("ok", time.Since(sendStart))
|
||||
span.SetAttributes(attribute.String("apns.id", res.ApnsID))
|
||||
span.End()
|
||||
successCount++
|
||||
log.Debug().
|
||||
Str("token", truncateToken(deviceToken)).
|
||||
|
||||
+22
-4
@@ -12,12 +12,18 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/oauth2/google"
|
||||
|
||||
"github.com/treytartt/honeydue-api/internal/config"
|
||||
"github.com/treytartt/honeydue-api/internal/prom"
|
||||
"github.com/treytartt/honeydue-api/internal/tracing"
|
||||
)
|
||||
|
||||
var fcmTracer = tracing.Tracer("honeydue/push/fcm")
|
||||
|
||||
const (
|
||||
// fcmV1EndpointFmt is the FCM HTTP v1 API endpoint template.
|
||||
fcmV1EndpointFmt = "https://fcm.googleapis.com/v1/projects/%s/messages:send"
|
||||
@@ -214,9 +220,15 @@ func (c *FCMClient) Send(ctx context.Context, tokens []string, title, message st
|
||||
var sendErrors []error
|
||||
successCount := 0
|
||||
|
||||
for _, token := range tokens {
|
||||
for _, tokenStr := range tokens {
|
||||
sendCtx, span := fcmTracer.Start(ctx, "fcm.send",
|
||||
trace.WithAttributes(
|
||||
attribute.String("fcm.token", truncateToken(tokenStr)),
|
||||
attribute.String("fcm.priority", "HIGH"),
|
||||
),
|
||||
)
|
||||
sendStart := time.Now()
|
||||
err := c.sendOne(ctx, token, title, message, data)
|
||||
err := c.sendOne(sendCtx, tokenStr, title, message, data)
|
||||
if err != nil {
|
||||
result := "error"
|
||||
var fcmErr *FCMSendError
|
||||
@@ -224,18 +236,24 @@ func (c *FCMClient) Send(ctx context.Context, tokens []string, title, message st
|
||||
result = "bad_token"
|
||||
}
|
||||
prom.ObserveFCMSend(result, time.Since(sendStart))
|
||||
span.SetAttributes(attribute.String("fcm.result", result))
|
||||
span.SetStatus(codes.Error, result)
|
||||
span.RecordError(err)
|
||||
span.End()
|
||||
log.Error().
|
||||
Err(err).
|
||||
Str("token", truncateToken(token)).
|
||||
Str("token", truncateToken(tokenStr)).
|
||||
Msg("FCM v1 notification failed")
|
||||
sendErrors = append(sendErrors, err)
|
||||
continue
|
||||
}
|
||||
|
||||
prom.ObserveFCMSend("ok", time.Since(sendStart))
|
||||
span.SetAttributes(attribute.String("fcm.result", "ok"))
|
||||
span.End()
|
||||
successCount++
|
||||
log.Debug().
|
||||
Str("token", truncateToken(token)).
|
||||
Str("token", truncateToken(tokenStr)).
|
||||
Msg("FCM v1 notification sent successfully")
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
@@ -105,3 +106,10 @@ func (r *AdminRepository) ExistsByEmail(email string) (bool, error) {
|
||||
}
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
// WithContext returns a copy of the repository whose underlying *gorm.DB carries
|
||||
// the supplied context. SQL emitted via this copy gets attached to ctx's trace span
|
||||
// (when otelgorm is registered) and respects ctx cancellation/deadlines.
|
||||
func (r *AdminRepository) WithContext(ctx context.Context) *AdminRepository {
|
||||
return &AdminRepository{db: r.db.WithContext(ctx)}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"github.com/treytartt/honeydue-api/internal/models"
|
||||
@@ -193,3 +194,10 @@ func (r *ContractorRepository) FindSpecialtyByID(id uint) (*models.ContractorSpe
|
||||
}
|
||||
return &specialty, nil
|
||||
}
|
||||
|
||||
// WithContext returns a copy of the repository whose underlying *gorm.DB carries
|
||||
// the supplied context. SQL emitted via this copy gets attached to ctx's trace span
|
||||
// (when otelgorm is registered) and respects ctx cancellation/deadlines.
|
||||
func (r *ContractorRepository) WithContext(ctx context.Context) *ContractorRepository {
|
||||
return &ContractorRepository{db: r.db.WithContext(ctx)}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
@@ -214,3 +215,10 @@ func (r *DocumentRepository) FindImageByID(id uint) (*models.DocumentImage, erro
|
||||
}
|
||||
return &image, nil
|
||||
}
|
||||
|
||||
// WithContext returns a copy of the repository whose underlying *gorm.DB carries
|
||||
// the supplied context. SQL emitted via this copy gets attached to ctx's trace span
|
||||
// (when otelgorm is registered) and respects ctx cancellation/deadlines.
|
||||
func (r *DocumentRepository) WithContext(ctx context.Context) *DocumentRepository {
|
||||
return &DocumentRepository{db: r.db.WithContext(ctx)}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
@@ -291,3 +292,10 @@ func (r *NotificationRepository) GetActiveTokensForUser(userID uint) (iosTokens
|
||||
|
||||
return iosTokens, androidTokens, nil
|
||||
}
|
||||
|
||||
// WithContext returns a copy of the repository whose underlying *gorm.DB carries
|
||||
// the supplied context. SQL emitted via this copy gets attached to ctx's trace span
|
||||
// (when otelgorm is registered) and respects ctx cancellation/deadlines.
|
||||
func (r *NotificationRepository) WithContext(ctx context.Context) *NotificationRepository {
|
||||
return &NotificationRepository{db: r.db.WithContext(ctx)}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
@@ -224,3 +225,10 @@ func (r *ReminderRepository) GetRecentReminderStats(sinceHours int) (map[string]
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// WithContext returns a copy of the repository whose underlying *gorm.DB carries
|
||||
// the supplied context. SQL emitted via this copy gets attached to ctx's trace span
|
||||
// (when otelgorm is registered) and respects ctx cancellation/deadlines.
|
||||
func (r *ReminderRepository) WithContext(ctx context.Context) *ReminderRepository {
|
||||
return &ReminderRepository{db: r.db.WithContext(ctx)}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"math/big"
|
||||
@@ -360,3 +361,10 @@ func (r *ResidenceRepository) GetTasksForReport(residenceID uint) ([]models.Task
|
||||
Find(&tasks).Error
|
||||
return tasks, err
|
||||
}
|
||||
|
||||
// WithContext returns a copy of the repository whose underlying *gorm.DB carries
|
||||
// the supplied context. SQL emitted via this copy gets attached to ctx's trace span
|
||||
// (when otelgorm is registered) and respects ctx cancellation/deadlines.
|
||||
func (r *ResidenceRepository) WithContext(ctx context.Context) *ResidenceRepository {
|
||||
return &ResidenceRepository{db: r.db.WithContext(ctx)}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
@@ -327,3 +328,10 @@ func (r *SubscriptionRepository) UpdateExpiresAt(userID uint, expiresAt time.Tim
|
||||
return r.db.Model(&models.UserSubscription{}).Where("user_id = ?", userID).
|
||||
Update("expires_at", expiresAt).Error
|
||||
}
|
||||
|
||||
// WithContext returns a copy of the repository whose underlying *gorm.DB carries
|
||||
// the supplied context. SQL emitted via this copy gets attached to ctx's trace span
|
||||
// (when otelgorm is registered) and respects ctx cancellation/deadlines.
|
||||
func (r *SubscriptionRepository) WithContext(ctx context.Context) *SubscriptionRepository {
|
||||
return &SubscriptionRepository{db: r.db.WithContext(ctx)}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
@@ -1057,3 +1058,10 @@ func (r *TaskRepository) GetBatchCompletionSummaries(residenceIDs []uint, now ti
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// WithContext returns a copy of the repository whose underlying *gorm.DB carries
|
||||
// the supplied context. SQL emitted via this copy gets attached to ctx's trace span
|
||||
// (when otelgorm is registered) and respects ctx cancellation/deadlines.
|
||||
func (r *TaskRepository) WithContext(ctx context.Context) *TaskRepository {
|
||||
return &TaskRepository{db: r.db.WithContext(ctx)}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"gorm.io/gorm"
|
||||
@@ -122,3 +123,10 @@ func (r *TaskTemplateRepository) GetGroupedByCategory() (map[string][]models.Tas
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// WithContext returns a copy of the repository whose underlying *gorm.DB carries
|
||||
// the supplied context. SQL emitted via this copy gets attached to ctx's trace span
|
||||
// (when otelgorm is registered) and respects ctx cancellation/deadlines.
|
||||
func (r *TaskTemplateRepository) WithContext(ctx context.Context) *TaskTemplateRepository {
|
||||
return &TaskTemplateRepository{db: r.db.WithContext(ctx)}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -772,3 +773,10 @@ func (r *UserRepository) CreateGoogleSocialAuth(auth *models.GoogleSocialAuth) e
|
||||
func (r *UserRepository) UpdateGoogleSocialAuth(auth *models.GoogleSocialAuth) error {
|
||||
return r.db.Save(auth).Error
|
||||
}
|
||||
|
||||
// WithContext returns a copy of the repository whose underlying *gorm.DB carries
|
||||
// the supplied context. SQL emitted via this copy gets attached to ctx's trace span
|
||||
// (when otelgorm is registered) and respects ctx cancellation/deadlines.
|
||||
func (r *UserRepository) WithContext(ctx context.Context) *UserRepository {
|
||||
return &UserRepository{db: r.db.WithContext(ctx)}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
@@ -52,3 +53,10 @@ func (r *WebhookEventRepository) RecordEvent(provider, eventID, eventType, paylo
|
||||
}
|
||||
return r.db.Create(event).Error
|
||||
}
|
||||
|
||||
// WithContext returns a copy of the repository whose underlying *gorm.DB carries
|
||||
// the supplied context. SQL emitted via this copy gets attached to ctx's trace span
|
||||
// (when otelgorm is registered) and respects ctx cancellation/deadlines.
|
||||
func (r *WebhookEventRepository) WithContext(ctx context.Context) *WebhookEventRepository {
|
||||
return &WebhookEventRepository{db: r.db.WithContext(ctx)}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/labstack/echo/v4"
|
||||
"github.com/labstack/echo/v4/middleware"
|
||||
"github.com/rs/zerolog/log"
|
||||
"go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"github.com/treytartt/honeydue-api/internal/admin"
|
||||
@@ -62,6 +63,11 @@ func SetupRouter(deps *Dependencies) *echo.Echo {
|
||||
e.Use(utils.EchoRecovery())
|
||||
e.Use(custommiddleware.StructuredLogger())
|
||||
|
||||
// OpenTelemetry HTTP middleware — opens a span per request, attaches the
|
||||
// route pattern, method, status, and request_id. Sits early so subsequent
|
||||
// middleware + handlers run inside the request span.
|
||||
e.Use(otelecho.Middleware("honeydue-api"))
|
||||
|
||||
// Security headers (X-Frame-Options, X-Content-Type-Options, X-XSS-Protection, etc.)
|
||||
//
|
||||
// CSP is permissive enough to serve the marketing landing page at / (which
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
@@ -14,8 +15,14 @@ import (
|
||||
|
||||
"github.com/treytartt/honeydue-api/internal/config"
|
||||
"github.com/treytartt/honeydue-api/internal/prom"
|
||||
"github.com/treytartt/honeydue-api/internal/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
var storageTracer = tracing.Tracer("honeydue/services/storage")
|
||||
|
||||
// StorageService handles file uploads, validation, encryption, and URL generation.
|
||||
// It delegates raw I/O to a StorageBackend (local filesystem or S3-compatible).
|
||||
type StorageService struct {
|
||||
@@ -66,8 +73,17 @@ func NewStorageService(cfg *config.StorageConfig) (*StorageService, error) {
|
||||
return &StorageService{cfg: cfg, backend: backend, allowedTypes: allowedTypes}, nil
|
||||
}
|
||||
|
||||
// Upload saves a file to storage (local or S3)
|
||||
func (s *StorageService) Upload(file *multipart.FileHeader, category string) (*UploadResult, error) {
|
||||
// Upload saves a file to storage (local or S3). The ctx is used to attach
|
||||
// the underlying B2/S3 PutObject span to the request trace.
|
||||
func (s *StorageService) Upload(ctx context.Context, file *multipart.FileHeader, category string) (*UploadResult, error) {
|
||||
ctx, span := storageTracer.Start(ctx, "storage.upload",
|
||||
trace.WithAttributes(
|
||||
attribute.String("file.name", file.Filename),
|
||||
attribute.Int64("file.size_bytes", file.Size),
|
||||
attribute.String("upload.category", category),
|
||||
),
|
||||
)
|
||||
defer span.End()
|
||||
// Validate file size
|
||||
if file.Size > s.cfg.MaxFileSize {
|
||||
return nil, fmt.Errorf("file size %d exceeds maximum allowed %d bytes", file.Size, s.cfg.MaxFileSize)
|
||||
@@ -150,18 +166,31 @@ func (s *StorageService) Upload(file *multipart.FileHeader, category string) (*U
|
||||
}
|
||||
}
|
||||
|
||||
// Write to backend (B2/S3 round trip — instrumented for Prometheus)
|
||||
// Write to backend (B2/S3 round trip — instrumented for Prometheus + traces)
|
||||
bucket := s.cfg.S3Bucket
|
||||
if bucket == "" {
|
||||
bucket = "local"
|
||||
}
|
||||
_, putSpan := storageTracer.Start(ctx, "b2.PutObject",
|
||||
trace.WithAttributes(
|
||||
attribute.String("b2.bucket", bucket),
|
||||
attribute.String("b2.key", key),
|
||||
attribute.Int64("b2.size_bytes", int64(len(fileData))),
|
||||
attribute.String("b2.mime_type", mimeType),
|
||||
),
|
||||
)
|
||||
uploadStart := time.Now()
|
||||
if err := s.backend.Write(key, fileData); err != nil {
|
||||
prom.ObserveB2Upload(bucket, "error", time.Since(uploadStart), 0)
|
||||
putSpan.SetStatus(codes.Error, "write failed")
|
||||
putSpan.RecordError(err)
|
||||
putSpan.End()
|
||||
return nil, fmt.Errorf("failed to save file: %w", err)
|
||||
}
|
||||
written := int64(len(fileData))
|
||||
prom.ObserveB2Upload(bucket, "ok", time.Since(uploadStart), written)
|
||||
putSpan.SetAttributes(attribute.Int64("b2.bytes_written", written))
|
||||
putSpan.End()
|
||||
|
||||
// Generate URL (always uses the original filename without .enc suffix)
|
||||
url := fmt.Sprintf("%s/%s/%s", s.cfg.BaseURL, subdir, newFilename)
|
||||
|
||||
@@ -103,13 +103,13 @@ func (s *TaskService) GetTask(taskID, userID uint) (*responses.TaskResponse, err
|
||||
|
||||
// ListTasks lists all tasks accessible to a user as a kanban board.
|
||||
// The `now` parameter should be the start of day in the user's timezone for accurate overdue detection.
|
||||
func (s *TaskService) ListTasks(userID uint, daysThreshold int, now time.Time) (*responses.KanbanBoardResponse, error) {
|
||||
func (s *TaskService) ListTasks(ctx context.Context, userID uint, daysThreshold int, now time.Time) (*responses.KanbanBoardResponse, error) {
|
||||
if daysThreshold <= 0 {
|
||||
daysThreshold = 30 // Default
|
||||
}
|
||||
|
||||
// Get all residence IDs accessible to user (lightweight - no preloads)
|
||||
residenceIDs, err := s.residenceRepo.FindResidenceIDsByUser(userID)
|
||||
residenceIDs, err := s.residenceRepo.WithContext(ctx).FindResidenceIDsByUser(userID)
|
||||
if err != nil {
|
||||
return nil, apperrors.Internal(err)
|
||||
}
|
||||
@@ -124,7 +124,7 @@ func (s *TaskService) ListTasks(userID uint, daysThreshold int, now time.Time) (
|
||||
}
|
||||
|
||||
// Get kanban data aggregated across all residences using user's timezone-aware time
|
||||
board, err := s.taskRepo.GetKanbanDataForMultipleResidences(residenceIDs, daysThreshold, now)
|
||||
board, err := s.taskRepo.WithContext(ctx).GetKanbanDataForMultipleResidences(residenceIDs, daysThreshold, now)
|
||||
if err != nil {
|
||||
return nil, apperrors.Internal(err)
|
||||
}
|
||||
@@ -136,9 +136,10 @@ func (s *TaskService) ListTasks(userID uint, daysThreshold int, now time.Time) (
|
||||
|
||||
// GetTasksByResidence gets tasks for a specific residence (kanban board).
|
||||
// The `now` parameter should be the start of day in the user's timezone for accurate overdue detection.
|
||||
func (s *TaskService) GetTasksByResidence(residenceID, userID uint, daysThreshold int, now time.Time) (*responses.KanbanBoardResponse, error) {
|
||||
// Check access
|
||||
hasAccess, err := s.residenceRepo.HasAccess(residenceID, userID)
|
||||
func (s *TaskService) GetTasksByResidence(ctx context.Context, residenceID, userID uint, daysThreshold int, now time.Time) (*responses.KanbanBoardResponse, error) {
|
||||
// Check access — uses repo.WithContext(ctx) so the SQL span is attached
|
||||
// to the inbound HTTP request's trace via otelgorm.
|
||||
hasAccess, err := s.residenceRepo.WithContext(ctx).HasAccess(residenceID, userID)
|
||||
if err != nil {
|
||||
return nil, apperrors.Internal(err)
|
||||
}
|
||||
@@ -151,7 +152,7 @@ func (s *TaskService) GetTasksByResidence(residenceID, userID uint, daysThreshol
|
||||
}
|
||||
|
||||
// Get kanban data using user's timezone-aware time
|
||||
board, err := s.taskRepo.GetKanbanData(residenceID, daysThreshold, now)
|
||||
board, err := s.taskRepo.WithContext(ctx).GetKanbanData(residenceID, daysThreshold, now)
|
||||
if err != nil {
|
||||
return nil, apperrors.Internal(err)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -301,7 +302,7 @@ func TestTaskService_ListTasks(t *testing.T) {
|
||||
testutil.CreateTestTask(t, db, residence.ID, user.ID, "Task 2")
|
||||
testutil.CreateTestTask(t, db, residence.ID, user.ID, "Task 3")
|
||||
|
||||
resp, err := service.ListTasks(user.ID, 30, time.Now().UTC())
|
||||
resp, err := service.ListTasks(context.Background(), user.ID, 30, time.Now().UTC())
|
||||
require.NoError(t, err)
|
||||
// ListTasks returns a KanbanBoardResponse with columns
|
||||
// Count total tasks across all columns
|
||||
|
||||
@@ -0,0 +1,161 @@
|
||||
// Package tracing wires the OpenTelemetry SDK with an OTLP/HTTP exporter
|
||||
// targeting obs.88oakapps.com (Jaeger all-in-one behind nginx + bearer auth).
|
||||
//
|
||||
// The package owns the global TracerProvider for the api process; everything
|
||||
// else acquires a tracer via tracing.Tracer(name).
|
||||
//
|
||||
// Sampling defaults to AlwaysSample in DEBUG mode and TraceIDRatioBased(0.1)
|
||||
// otherwise, controllable via OTEL_TRACES_SAMPLER_ARG.
|
||||
package tracing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.opentelemetry.io/otel/trace/noop"
|
||||
)
|
||||
|
||||
// Config controls the tracer provider that Init installs globally.
|
||||
type Config struct {
|
||||
// ServiceName labels every span with service.name=<this>. Required.
|
||||
ServiceName string
|
||||
|
||||
// Environment labels every span with deployment.environment.
|
||||
// Conventionally "prod", "dev", "local".
|
||||
Environment string
|
||||
|
||||
// EndpointURL is the full OTLP/HTTP traces URL — e.g.
|
||||
// https://obs.88oakapps.com/v1/traces. Empty means tracing is disabled
|
||||
// (returns a no-op provider).
|
||||
EndpointURL string
|
||||
|
||||
// BearerToken, if non-empty, is sent as Authorization: Bearer <token>.
|
||||
BearerToken string
|
||||
|
||||
// SampleRatio is the fraction of root traces sampled. 1.0 = all, 0.1 = 10%.
|
||||
// 0 disables sampling entirely; -1 means "AlwaysSample" (debug).
|
||||
SampleRatio float64
|
||||
|
||||
// Insecure forces plain HTTP. Only useful for local testing.
|
||||
Insecure bool
|
||||
}
|
||||
|
||||
// Init configures the global TracerProvider and returns a shutdown function.
|
||||
// Call shutdown on graceful exit so spans in flight get flushed.
|
||||
//
|
||||
// Init is safe to call when EndpointURL is empty: it installs a no-op
|
||||
// provider and returns a no-op shutdown.
|
||||
func Init(ctx context.Context, cfg Config) (shutdown func(context.Context) error, err error) {
|
||||
if cfg.EndpointURL == "" {
|
||||
log.Info().Msg("tracing: no OBS_TRACES_URL configured, installing no-op tracer")
|
||||
otel.SetTracerProvider(noop.NewTracerProvider())
|
||||
return func(context.Context) error { return nil }, nil
|
||||
}
|
||||
|
||||
parsed, err := url.Parse(cfg.EndpointURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid OBS_TRACES_URL %q: %w", cfg.EndpointURL, err)
|
||||
}
|
||||
|
||||
opts := []otlptracehttp.Option{
|
||||
otlptracehttp.WithEndpoint(parsed.Host),
|
||||
otlptracehttp.WithURLPath(parsed.Path),
|
||||
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
|
||||
otlptracehttp.WithTimeout(10 * time.Second),
|
||||
}
|
||||
if cfg.Insecure || parsed.Scheme == "http" {
|
||||
opts = append(opts, otlptracehttp.WithInsecure())
|
||||
}
|
||||
if cfg.BearerToken != "" {
|
||||
opts = append(opts, otlptracehttp.WithHeaders(map[string]string{
|
||||
"Authorization": "Bearer " + cfg.BearerToken,
|
||||
}))
|
||||
}
|
||||
|
||||
exporter, err := otlptrace.New(ctx, otlptracehttp.NewClient(opts...))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create OTLP exporter: %w", err)
|
||||
}
|
||||
|
||||
res, err := resource.Merge(resource.Default(), resource.NewWithAttributes(
|
||||
semconv.SchemaURL,
|
||||
semconv.ServiceName(cfg.ServiceName),
|
||||
semconv.DeploymentEnvironmentName(cfg.Environment),
|
||||
))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("build resource: %w", err)
|
||||
}
|
||||
|
||||
var sampler sdktrace.Sampler
|
||||
switch {
|
||||
case cfg.SampleRatio < 0:
|
||||
sampler = sdktrace.AlwaysSample()
|
||||
case cfg.SampleRatio == 0:
|
||||
sampler = sdktrace.NeverSample()
|
||||
case cfg.SampleRatio >= 1:
|
||||
sampler = sdktrace.AlwaysSample()
|
||||
default:
|
||||
// ParentBased so the inbound parent's sampling decision wins;
|
||||
// otherwise root-span ratio applies.
|
||||
sampler = sdktrace.ParentBased(sdktrace.TraceIDRatioBased(cfg.SampleRatio))
|
||||
}
|
||||
|
||||
tp := sdktrace.NewTracerProvider(
|
||||
sdktrace.WithBatcher(exporter,
|
||||
sdktrace.WithBatchTimeout(5*time.Second),
|
||||
sdktrace.WithMaxExportBatchSize(512),
|
||||
),
|
||||
sdktrace.WithResource(res),
|
||||
sdktrace.WithSampler(sampler),
|
||||
)
|
||||
|
||||
otel.SetTracerProvider(tp)
|
||||
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
|
||||
propagation.TraceContext{},
|
||||
propagation.Baggage{},
|
||||
))
|
||||
|
||||
log.Info().
|
||||
Str("endpoint", cfg.EndpointURL).
|
||||
Str("service", cfg.ServiceName).
|
||||
Str("env", cfg.Environment).
|
||||
Float64("sample_ratio", cfg.SampleRatio).
|
||||
Bool("auth", cfg.BearerToken != "").
|
||||
Msg("tracing: OTLP exporter initialized")
|
||||
|
||||
return tp.Shutdown, nil
|
||||
}
|
||||
|
||||
// Tracer returns a named tracer from the global provider. Safe to call before
|
||||
// Init (returns a no-op tracer in that case).
|
||||
func Tracer(name string) trace.Tracer {
|
||||
return otel.Tracer(name)
|
||||
}
|
||||
|
||||
// SampleRatioFromEnv reads OTEL_TRACES_SAMPLER_ARG with sensible defaults.
|
||||
// Returns -1 ("always") when DEBUG=true, 0.1 ("10%") otherwise.
|
||||
func SampleRatioFromEnv() float64 {
|
||||
if v := strings.TrimSpace(os.Getenv("OTEL_TRACES_SAMPLER_ARG")); v != "" {
|
||||
if f, err := strconv.ParseFloat(v, 64); err == nil {
|
||||
return f
|
||||
}
|
||||
}
|
||||
if strings.EqualFold(os.Getenv("DEBUG"), "true") {
|
||||
return -1
|
||||
}
|
||||
return 0.1
|
||||
}
|
||||
Reference in New Issue
Block a user