Files
honeyDueAPI/internal/repositories/notification_repo.go
T
Trey t bc3da007db
Backend CI / Test (push) Has been cancelled
Backend CI / Contract Tests (push) Has been cancelled
Backend CI / Build (push) Has been cancelled
Backend CI / Lint (push) Has been cancelled
Backend CI / Secret Scanning (push) Has been cancelled
Wire OpenTelemetry tracing — HTTP, B2, APNs, FCM, asynq, GORM (partial)
Step 1 — OTel SDK: cmd/api and cmd/worker initialize a tracer provider
that exports OTLP/HTTP to obs.88oakapps.com (Jaeger all-in-one). Sampling
is AlwaysSample in dev (DEBUG=true) and TraceIDRatioBased(0.1) in prod,
overridable via OTEL_TRACES_SAMPLER_ARG. Service names are honeydue-api
and honeydue-worker. otelecho.Middleware opens a span per HTTP request.

Step 2 — Manual spans: storage_service.Upload now takes ctx and emits
storage.upload + b2.PutObject spans (size_bytes, key, mime_type, bucket,
result attrs). APNs Send/SendWithCategory and FCM sendOne emit per-token
spans with topic, status_code, reason. Asynq middleware emits
asynq.handle:<task_type> per job with retry/payload attrs and records
asynq_job_duration_seconds.

Step 3 — Database: otelgorm plugin registered in database.Connect, so
any SQL emitted via db.WithContext(ctx) attaches to the request span.
Every repository now exposes WithContext(ctx) *XRepository as the
migration helper. TaskService.ListTasks and GetTasksByResidence are
migrated end-to-end (ctx threaded through handler → service → repo);
remaining services adopt the same pattern incrementally — pre-migration
methods still emit untraced SQL via the unchanged db field.

OBS_TRACES_URL and OBS_INGEST_TOKEN flow from deploy/prod.env →
honeydue-secrets → api+worker Deployments via secretKeyRef (optional).
02-setup-secrets.sh sources them from prod.env on next run; manifests
mark both env vars optional so the deployment rolls without traces if
the secret is absent.

ch15 observability doc now lists what produces spans today vs the
remaining migration work, with the explicit per-method pattern.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 15:28:05 -05:00

302 lines
9.1 KiB
Go

package repositories
import (
"context"
"errors"
"time"
"gorm.io/gorm"
"github.com/treytartt/honeydue-api/internal/models"
)
// NotificationRepository handles database operations for notifications
type NotificationRepository struct {
db *gorm.DB
}
// NewNotificationRepository creates a new notification repository
func NewNotificationRepository(db *gorm.DB) *NotificationRepository {
return &NotificationRepository{db: db}
}
// === Notifications ===
// FindByID finds a notification by ID
func (r *NotificationRepository) FindByID(id uint) (*models.Notification, error) {
var notification models.Notification
err := r.db.First(&notification, id).Error
if err != nil {
return nil, err
}
return &notification, nil
}
// FindByUser finds all notifications for a user
func (r *NotificationRepository) FindByUser(userID uint, limit, offset int) ([]models.Notification, error) {
var notifications []models.Notification
query := r.db.Where("user_id = ?", userID).
Order("created_at DESC")
if limit > 0 {
query = query.Limit(limit)
}
if offset > 0 {
query = query.Offset(offset)
}
err := query.Find(&notifications).Error
return notifications, err
}
// Create creates a new notification
func (r *NotificationRepository) Create(notification *models.Notification) error {
return r.db.Create(notification).Error
}
// MarkAsRead marks a notification as read
func (r *NotificationRepository) MarkAsRead(id uint) error {
now := time.Now().UTC()
return r.db.Model(&models.Notification{}).
Where("id = ?", id).
Updates(map[string]interface{}{
"read": true,
"read_at": now,
}).Error
}
// MarkAllAsRead marks all notifications for a user as read
func (r *NotificationRepository) MarkAllAsRead(userID uint) error {
now := time.Now().UTC()
return r.db.Model(&models.Notification{}).
Where("user_id = ? AND read = ?", userID, false).
Updates(map[string]interface{}{
"read": true,
"read_at": now,
}).Error
}
// MarkAsSent marks a notification as sent
func (r *NotificationRepository) MarkAsSent(id uint) error {
now := time.Now().UTC()
return r.db.Model(&models.Notification{}).
Where("id = ?", id).
Updates(map[string]interface{}{
"sent": true,
"sent_at": now,
}).Error
}
// SetError sets an error message on a notification
func (r *NotificationRepository) SetError(id uint, errorMsg string) error {
return r.db.Model(&models.Notification{}).
Where("id = ?", id).
Update("error_message", errorMsg).Error
}
// CountUnread counts unread notifications for a user
func (r *NotificationRepository) CountUnread(userID uint) (int64, error) {
var count int64
err := r.db.Model(&models.Notification{}).
Where("user_id = ? AND read = ?", userID, false).
Count(&count).Error
return count, err
}
// GetPendingNotifications gets notifications that need to be sent
func (r *NotificationRepository) GetPendingNotifications(limit int) ([]models.Notification, error) {
var notifications []models.Notification
err := r.db.Where("sent = ?", false).
Order("created_at ASC").
Limit(limit).
Find(&notifications).Error
return notifications, err
}
// === Notification Preferences ===
// FindPreferencesByUser finds notification preferences for a user
func (r *NotificationRepository) FindPreferencesByUser(userID uint) (*models.NotificationPreference, error) {
var prefs models.NotificationPreference
err := r.db.Where("user_id = ?", userID).First(&prefs).Error
if err != nil {
return nil, err
}
return &prefs, nil
}
// CreatePreferences creates notification preferences for a user
func (r *NotificationRepository) CreatePreferences(prefs *models.NotificationPreference) error {
return r.db.Create(prefs).Error
}
// UpdatePreferences updates notification preferences
func (r *NotificationRepository) UpdatePreferences(prefs *models.NotificationPreference) error {
return r.db.Omit("User").Save(prefs).Error
}
// GetOrCreatePreferences gets or creates notification preferences for a user.
// Uses a transaction to avoid TOCTOU race conditions on concurrent requests.
func (r *NotificationRepository) GetOrCreatePreferences(userID uint) (*models.NotificationPreference, error) {
var prefs models.NotificationPreference
err := r.db.Transaction(func(tx *gorm.DB) error {
err := tx.Where("user_id = ?", userID).First(&prefs).Error
if err == nil {
return nil // Found existing preferences
}
if !errors.Is(err, gorm.ErrRecordNotFound) {
return err // Unexpected error
}
// Record not found -- create with defaults
prefs = models.NotificationPreference{
UserID: userID,
TaskDueSoon: true,
TaskOverdue: true,
TaskCompleted: true,
TaskAssigned: true,
ResidenceShared: true,
WarrantyExpiring: true,
EmailTaskCompleted: true,
}
return tx.Create(&prefs).Error
})
if err != nil {
return nil, err
}
return &prefs, nil
}
// === Device Registration ===
// FindAPNSDeviceByID finds an APNS device by ID
func (r *NotificationRepository) FindAPNSDeviceByID(id uint) (*models.APNSDevice, error) {
var device models.APNSDevice
err := r.db.First(&device, id).Error
if err != nil {
return nil, err
}
return &device, nil
}
// FindGCMDeviceByID finds a GCM device by ID
func (r *NotificationRepository) FindGCMDeviceByID(id uint) (*models.GCMDevice, error) {
var device models.GCMDevice
err := r.db.First(&device, id).Error
if err != nil {
return nil, err
}
return &device, nil
}
// FindAPNSDeviceByToken finds an APNS device by registration token
func (r *NotificationRepository) FindAPNSDeviceByToken(token string) (*models.APNSDevice, error) {
var device models.APNSDevice
err := r.db.Where("registration_id = ?", token).First(&device).Error
if err != nil {
return nil, err
}
return &device, nil
}
// FindAPNSDevicesByUser finds all APNS devices for a user
func (r *NotificationRepository) FindAPNSDevicesByUser(userID uint) ([]models.APNSDevice, error) {
var devices []models.APNSDevice
err := r.db.Where("user_id = ? AND active = ?", userID, true).Find(&devices).Error
return devices, err
}
// CreateAPNSDevice creates a new APNS device
func (r *NotificationRepository) CreateAPNSDevice(device *models.APNSDevice) error {
return r.db.Create(device).Error
}
// UpdateAPNSDevice updates an APNS device
func (r *NotificationRepository) UpdateAPNSDevice(device *models.APNSDevice) error {
return r.db.Save(device).Error
}
// DeleteAPNSDevice deletes an APNS device
func (r *NotificationRepository) DeleteAPNSDevice(id uint) error {
return r.db.Delete(&models.APNSDevice{}, id).Error
}
// DeactivateAPNSDevice deactivates an APNS device
func (r *NotificationRepository) DeactivateAPNSDevice(id uint) error {
return r.db.Model(&models.APNSDevice{}).
Where("id = ?", id).
Update("active", false).Error
}
// FindGCMDeviceByToken finds a GCM device by registration token
func (r *NotificationRepository) FindGCMDeviceByToken(token string) (*models.GCMDevice, error) {
var device models.GCMDevice
err := r.db.Where("registration_id = ?", token).First(&device).Error
if err != nil {
return nil, err
}
return &device, nil
}
// FindGCMDevicesByUser finds all GCM devices for a user
func (r *NotificationRepository) FindGCMDevicesByUser(userID uint) ([]models.GCMDevice, error) {
var devices []models.GCMDevice
err := r.db.Where("user_id = ? AND active = ?", userID, true).Find(&devices).Error
return devices, err
}
// CreateGCMDevice creates a new GCM device
func (r *NotificationRepository) CreateGCMDevice(device *models.GCMDevice) error {
return r.db.Create(device).Error
}
// UpdateGCMDevice updates a GCM device
func (r *NotificationRepository) UpdateGCMDevice(device *models.GCMDevice) error {
return r.db.Save(device).Error
}
// DeleteGCMDevice deletes a GCM device
func (r *NotificationRepository) DeleteGCMDevice(id uint) error {
return r.db.Delete(&models.GCMDevice{}, id).Error
}
// DeactivateGCMDevice deactivates a GCM device
func (r *NotificationRepository) DeactivateGCMDevice(id uint) error {
return r.db.Model(&models.GCMDevice{}).
Where("id = ?", id).
Update("active", false).Error
}
// GetActiveTokensForUser gets all active push tokens for a user
func (r *NotificationRepository) GetActiveTokensForUser(userID uint) (iosTokens []string, androidTokens []string, err error) {
apnsDevices, err := r.FindAPNSDevicesByUser(userID)
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil, err
}
gcmDevices, err := r.FindGCMDevicesByUser(userID)
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil, err
}
iosTokens = make([]string, 0, len(apnsDevices))
for _, d := range apnsDevices {
iosTokens = append(iosTokens, d.RegistrationID)
}
androidTokens = make([]string, 0, len(gcmDevices))
for _, d := range gcmDevices {
androidTokens = append(androidTokens, d.RegistrationID)
}
return iosTokens, androidTokens, nil
}
// WithContext returns a copy of the repository whose underlying *gorm.DB carries
// the supplied context. SQL emitted via this copy gets attached to ctx's trace span
// (when otelgorm is registered) and respects ctx cancellation/deadlines.
func (r *NotificationRepository) WithContext(ctx context.Context) *NotificationRepository {
return &NotificationRepository{db: r.db.WithContext(ctx)}
}