Add webhook logging, pagination, middleware, migrations, and prod hardening
- Webhook event logging repo and subscription webhook idempotency - Pagination helper (echohelpers) with cursor/offset support - Request ID and structured logging middleware - Push client improvements (FCM HTTP v1, better error handling) - Task model version column, business constraint migrations, targeted indexes - Expanded categorization chain tests - Email service and config hardening - CI workflow updates, .gitignore additions, .env.example updates Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -57,12 +57,19 @@ func (r *SubscriptionRepository) Update(sub *models.UserSubscription) error {
|
||||
return r.db.Save(sub).Error
|
||||
}
|
||||
|
||||
// UpgradeToPro upgrades a user to Pro tier
|
||||
// UpgradeToPro upgrades a user to Pro tier using a transaction with row locking
|
||||
// to prevent concurrent subscription mutations from corrupting state.
|
||||
func (r *SubscriptionRepository) UpgradeToPro(userID uint, expiresAt time.Time, platform string) error {
|
||||
now := time.Now().UTC()
|
||||
return r.db.Model(&models.UserSubscription{}).
|
||||
Where("user_id = ?", userID).
|
||||
Updates(map[string]interface{}{
|
||||
return r.db.Transaction(func(tx *gorm.DB) error {
|
||||
// Lock the row for update
|
||||
var sub models.UserSubscription
|
||||
if err := tx.Set("gorm:query_option", "FOR UPDATE").
|
||||
Where("user_id = ?", userID).First(&sub).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
return tx.Model(&sub).Updates(map[string]interface{}{
|
||||
"tier": models.TierPro,
|
||||
"subscribed_at": now,
|
||||
"expires_at": expiresAt,
|
||||
@@ -70,18 +77,27 @@ func (r *SubscriptionRepository) UpgradeToPro(userID uint, expiresAt time.Time,
|
||||
"platform": platform,
|
||||
"auto_renew": true,
|
||||
}).Error
|
||||
})
|
||||
}
|
||||
|
||||
// DowngradeToFree downgrades a user to Free tier
|
||||
// DowngradeToFree downgrades a user to Free tier using a transaction with row locking
|
||||
// to prevent concurrent subscription mutations from corrupting state.
|
||||
func (r *SubscriptionRepository) DowngradeToFree(userID uint) error {
|
||||
now := time.Now().UTC()
|
||||
return r.db.Model(&models.UserSubscription{}).
|
||||
Where("user_id = ?", userID).
|
||||
Updates(map[string]interface{}{
|
||||
return r.db.Transaction(func(tx *gorm.DB) error {
|
||||
// Lock the row for update
|
||||
var sub models.UserSubscription
|
||||
if err := tx.Set("gorm:query_option", "FOR UPDATE").
|
||||
Where("user_id = ?", userID).First(&sub).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
return tx.Model(&sub).Updates(map[string]interface{}{
|
||||
"tier": models.TierFree,
|
||||
"cancelled_at": now,
|
||||
"auto_renew": false,
|
||||
}).Error
|
||||
})
|
||||
}
|
||||
|
||||
// SetAutoRenew sets the auto-renew flag
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@@ -11,6 +12,9 @@ import (
|
||||
"github.com/treytartt/casera-api/internal/task/categorization"
|
||||
)
|
||||
|
||||
// ErrVersionConflict indicates a concurrent modification was detected
|
||||
var ErrVersionConflict = errors.New("version conflict: task was modified by another request")
|
||||
|
||||
// TaskRepository handles database operations for tasks
|
||||
type TaskRepository struct {
|
||||
db *gorm.DB
|
||||
@@ -294,10 +298,39 @@ func (r *TaskRepository) Create(task *models.Task) error {
|
||||
return r.db.Create(task).Error
|
||||
}
|
||||
|
||||
// Update updates a task
|
||||
// Uses Omit to exclude associations that shouldn't be updated via Save
|
||||
// Update updates a task with optimistic locking.
|
||||
// The update only succeeds if the task's version in the database matches the expected version.
|
||||
// On success, the local task.Version is incremented to reflect the new version.
|
||||
func (r *TaskRepository) Update(task *models.Task) error {
|
||||
return r.db.Omit("Residence", "CreatedBy", "AssignedTo", "Category", "Priority", "Frequency", "ParentTask", "Completions").Save(task).Error
|
||||
result := r.db.Model(task).
|
||||
Where("id = ? AND version = ?", task.ID, task.Version).
|
||||
Omit("Residence", "CreatedBy", "AssignedTo", "Category", "Priority", "Frequency", "ParentTask", "Completions").
|
||||
Updates(map[string]interface{}{
|
||||
"title": task.Title,
|
||||
"description": task.Description,
|
||||
"category_id": task.CategoryID,
|
||||
"priority_id": task.PriorityID,
|
||||
"frequency_id": task.FrequencyID,
|
||||
"custom_interval_days": task.CustomIntervalDays,
|
||||
"in_progress": task.InProgress,
|
||||
"assigned_to_id": task.AssignedToID,
|
||||
"due_date": task.DueDate,
|
||||
"next_due_date": task.NextDueDate,
|
||||
"estimated_cost": task.EstimatedCost,
|
||||
"actual_cost": task.ActualCost,
|
||||
"contractor_id": task.ContractorID,
|
||||
"is_cancelled": task.IsCancelled,
|
||||
"is_archived": task.IsArchived,
|
||||
"version": gorm.Expr("version + 1"),
|
||||
})
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
}
|
||||
if result.RowsAffected == 0 {
|
||||
return ErrVersionConflict
|
||||
}
|
||||
task.Version++ // Update local copy
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete hard-deletes a task
|
||||
@@ -307,39 +340,89 @@ func (r *TaskRepository) Delete(id uint) error {
|
||||
|
||||
// === Task State Operations ===
|
||||
|
||||
// MarkInProgress marks a task as in progress
|
||||
func (r *TaskRepository) MarkInProgress(id uint) error {
|
||||
return r.db.Model(&models.Task{}).
|
||||
Where("id = ?", id).
|
||||
Update("in_progress", true).Error
|
||||
// MarkInProgress marks a task as in progress with optimistic locking.
|
||||
func (r *TaskRepository) MarkInProgress(id uint, version int) error {
|
||||
result := r.db.Model(&models.Task{}).
|
||||
Where("id = ? AND version = ?", id, version).
|
||||
Updates(map[string]interface{}{
|
||||
"in_progress": true,
|
||||
"version": gorm.Expr("version + 1"),
|
||||
})
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
}
|
||||
if result.RowsAffected == 0 {
|
||||
return ErrVersionConflict
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Cancel cancels a task
|
||||
func (r *TaskRepository) Cancel(id uint) error {
|
||||
return r.db.Model(&models.Task{}).
|
||||
Where("id = ?", id).
|
||||
Update("is_cancelled", true).Error
|
||||
// Cancel cancels a task with optimistic locking.
|
||||
func (r *TaskRepository) Cancel(id uint, version int) error {
|
||||
result := r.db.Model(&models.Task{}).
|
||||
Where("id = ? AND version = ?", id, version).
|
||||
Updates(map[string]interface{}{
|
||||
"is_cancelled": true,
|
||||
"version": gorm.Expr("version + 1"),
|
||||
})
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
}
|
||||
if result.RowsAffected == 0 {
|
||||
return ErrVersionConflict
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Uncancel uncancels a task
|
||||
func (r *TaskRepository) Uncancel(id uint) error {
|
||||
return r.db.Model(&models.Task{}).
|
||||
Where("id = ?", id).
|
||||
Update("is_cancelled", false).Error
|
||||
// Uncancel uncancels a task with optimistic locking.
|
||||
func (r *TaskRepository) Uncancel(id uint, version int) error {
|
||||
result := r.db.Model(&models.Task{}).
|
||||
Where("id = ? AND version = ?", id, version).
|
||||
Updates(map[string]interface{}{
|
||||
"is_cancelled": false,
|
||||
"version": gorm.Expr("version + 1"),
|
||||
})
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
}
|
||||
if result.RowsAffected == 0 {
|
||||
return ErrVersionConflict
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Archive archives a task
|
||||
func (r *TaskRepository) Archive(id uint) error {
|
||||
return r.db.Model(&models.Task{}).
|
||||
Where("id = ?", id).
|
||||
Update("is_archived", true).Error
|
||||
// Archive archives a task with optimistic locking.
|
||||
func (r *TaskRepository) Archive(id uint, version int) error {
|
||||
result := r.db.Model(&models.Task{}).
|
||||
Where("id = ? AND version = ?", id, version).
|
||||
Updates(map[string]interface{}{
|
||||
"is_archived": true,
|
||||
"version": gorm.Expr("version + 1"),
|
||||
})
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
}
|
||||
if result.RowsAffected == 0 {
|
||||
return ErrVersionConflict
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Unarchive unarchives a task
|
||||
func (r *TaskRepository) Unarchive(id uint) error {
|
||||
return r.db.Model(&models.Task{}).
|
||||
Where("id = ?", id).
|
||||
Update("is_archived", false).Error
|
||||
// Unarchive unarchives a task with optimistic locking.
|
||||
func (r *TaskRepository) Unarchive(id uint, version int) error {
|
||||
result := r.db.Model(&models.Task{}).
|
||||
Where("id = ? AND version = ?", id, version).
|
||||
Updates(map[string]interface{}{
|
||||
"is_archived": false,
|
||||
"version": gorm.Expr("version + 1"),
|
||||
})
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
}
|
||||
if result.RowsAffected == 0 {
|
||||
return ErrVersionConflict
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// === Kanban Board ===
|
||||
|
||||
@@ -113,7 +113,7 @@ func TestTaskRepository_Cancel(t *testing.T) {
|
||||
|
||||
assert.False(t, task.IsCancelled)
|
||||
|
||||
err := repo.Cancel(task.ID)
|
||||
err := repo.Cancel(task.ID, task.Version)
|
||||
require.NoError(t, err)
|
||||
|
||||
found, err := repo.FindByID(task.ID)
|
||||
@@ -129,8 +129,8 @@ func TestTaskRepository_Uncancel(t *testing.T) {
|
||||
residence := testutil.CreateTestResidence(t, db, user.ID, "Test House")
|
||||
task := testutil.CreateTestTask(t, db, residence.ID, user.ID, "Test Task")
|
||||
|
||||
repo.Cancel(task.ID)
|
||||
err := repo.Uncancel(task.ID)
|
||||
repo.Cancel(task.ID, task.Version)
|
||||
err := repo.Uncancel(task.ID, task.Version+1) // version incremented by Cancel
|
||||
require.NoError(t, err)
|
||||
|
||||
found, err := repo.FindByID(task.ID)
|
||||
@@ -146,7 +146,7 @@ func TestTaskRepository_Archive(t *testing.T) {
|
||||
residence := testutil.CreateTestResidence(t, db, user.ID, "Test House")
|
||||
task := testutil.CreateTestTask(t, db, residence.ID, user.ID, "Test Task")
|
||||
|
||||
err := repo.Archive(task.ID)
|
||||
err := repo.Archive(task.ID, task.Version)
|
||||
require.NoError(t, err)
|
||||
|
||||
found, err := repo.FindByID(task.ID)
|
||||
@@ -162,8 +162,8 @@ func TestTaskRepository_Unarchive(t *testing.T) {
|
||||
residence := testutil.CreateTestResidence(t, db, user.ID, "Test House")
|
||||
task := testutil.CreateTestTask(t, db, residence.ID, user.ID, "Test Task")
|
||||
|
||||
repo.Archive(task.ID)
|
||||
err := repo.Unarchive(task.ID)
|
||||
repo.Archive(task.ID, task.Version)
|
||||
err := repo.Unarchive(task.ID, task.Version+1) // version incremented by Archive
|
||||
require.NoError(t, err)
|
||||
|
||||
found, err := repo.FindByID(task.ID)
|
||||
@@ -316,7 +316,7 @@ func TestKanbanBoard_CancelledTasksHiddenFromKanbanBoard(t *testing.T) {
|
||||
|
||||
// Create a cancelled task
|
||||
task := testutil.CreateTestTask(t, db, residence.ID, user.ID, "Cancelled Task")
|
||||
repo.Cancel(task.ID)
|
||||
repo.Cancel(task.ID, task.Version)
|
||||
|
||||
board, err := repo.GetKanbanData(residence.ID, 30, time.Now().UTC())
|
||||
require.NoError(t, err)
|
||||
@@ -571,7 +571,7 @@ func TestKanbanBoard_ArchivedTasksHiddenFromKanbanBoard(t *testing.T) {
|
||||
// Create a regular task and an archived task
|
||||
testutil.CreateTestTask(t, db, residence.ID, user.ID, "Regular Task")
|
||||
archivedTask := testutil.CreateTestTask(t, db, residence.ID, user.ID, "Archived Task")
|
||||
repo.Archive(archivedTask.ID)
|
||||
repo.Archive(archivedTask.ID, archivedTask.Version)
|
||||
|
||||
board, err := repo.GetKanbanData(residence.ID, 30, time.Now().UTC())
|
||||
require.NoError(t, err)
|
||||
@@ -856,7 +856,7 @@ func TestKanbanBoard_MultipleResidences(t *testing.T) {
|
||||
|
||||
// Create a cancelled task in house 1
|
||||
cancelledTask := testutil.CreateTestTask(t, db, residence1.ID, user.ID, "Cancelled in House 1")
|
||||
repo.Cancel(cancelledTask.ID)
|
||||
repo.Cancel(cancelledTask.ID, cancelledTask.Version)
|
||||
|
||||
board, err := repo.GetKanbanDataForMultipleResidences([]uint{residence1.ID, residence2.ID}, 30, time.Now().UTC())
|
||||
require.NoError(t, err)
|
||||
|
||||
54
internal/repositories/webhook_event_repo.go
Normal file
54
internal/repositories/webhook_event_repo.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// WebhookEvent represents a processed webhook event for deduplication
|
||||
type WebhookEvent struct {
|
||||
ID uint `gorm:"primaryKey"`
|
||||
EventID string `gorm:"column:event_id;size:255;not null;uniqueIndex:idx_provider_event_id"`
|
||||
Provider string `gorm:"column:provider;size:20;not null;uniqueIndex:idx_provider_event_id"`
|
||||
EventType string `gorm:"column:event_type;size:100;not null"`
|
||||
ProcessedAt time.Time `gorm:"column:processed_at;autoCreateTime"`
|
||||
PayloadHash string `gorm:"column:payload_hash;size:64"`
|
||||
}
|
||||
|
||||
func (WebhookEvent) TableName() string {
|
||||
return "webhook_event_log"
|
||||
}
|
||||
|
||||
// WebhookEventRepository handles webhook event deduplication
|
||||
type WebhookEventRepository struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
// NewWebhookEventRepository creates a new webhook event repository
|
||||
func NewWebhookEventRepository(db *gorm.DB) *WebhookEventRepository {
|
||||
return &WebhookEventRepository{db: db}
|
||||
}
|
||||
|
||||
// HasProcessed checks if an event has already been processed
|
||||
func (r *WebhookEventRepository) HasProcessed(provider, eventID string) (bool, error) {
|
||||
var count int64
|
||||
err := r.db.Model(&WebhookEvent{}).
|
||||
Where("provider = ? AND event_id = ?", provider, eventID).
|
||||
Count(&count).Error
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
// RecordEvent records a processed webhook event
|
||||
func (r *WebhookEventRepository) RecordEvent(provider, eventID, eventType, payloadHash string) error {
|
||||
event := &WebhookEvent{
|
||||
EventID: eventID,
|
||||
Provider: provider,
|
||||
EventType: eventType,
|
||||
PayloadHash: payloadHash,
|
||||
}
|
||||
return r.db.Create(event).Error
|
||||
}
|
||||
104
internal/repositories/webhook_event_repo_test.go
Normal file
104
internal/repositories/webhook_event_repo_test.go
Normal file
@@ -0,0 +1,104 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"gorm.io/driver/sqlite"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/logger"
|
||||
)
|
||||
|
||||
// setupWebhookTestDB creates an in-memory SQLite database with the
|
||||
// WebhookEvent table auto-migrated. This is separate from testutil.SetupTestDB
|
||||
// because WebhookEvent lives in the repositories package (not models/) and
|
||||
// only needs its own table for testing.
|
||||
func setupWebhookTestDB(t *testing.T) *gorm.DB {
|
||||
t.Helper()
|
||||
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{
|
||||
Logger: logger.Default.LogMode(logger.Silent),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.AutoMigrate(&WebhookEvent{})
|
||||
require.NoError(t, err)
|
||||
|
||||
return db
|
||||
}
|
||||
|
||||
func TestWebhookEventRepo_RecordAndCheck(t *testing.T) {
|
||||
db := setupWebhookTestDB(t)
|
||||
repo := NewWebhookEventRepository(db)
|
||||
|
||||
// Record an event
|
||||
err := repo.RecordEvent("apple", "evt_001", "INITIAL_BUY", "abc123hash")
|
||||
require.NoError(t, err)
|
||||
|
||||
// HasProcessed should return true for the same provider + event ID
|
||||
processed, err := repo.HasProcessed("apple", "evt_001")
|
||||
require.NoError(t, err)
|
||||
assert.True(t, processed, "expected HasProcessed to return true for a recorded event")
|
||||
|
||||
// HasProcessed should return false for a different event ID
|
||||
processed, err = repo.HasProcessed("apple", "evt_999")
|
||||
require.NoError(t, err)
|
||||
assert.False(t, processed, "expected HasProcessed to return false for an unrecorded event ID")
|
||||
|
||||
// HasProcessed should return false for a different provider
|
||||
processed, err = repo.HasProcessed("google", "evt_001")
|
||||
require.NoError(t, err)
|
||||
assert.False(t, processed, "expected HasProcessed to return false for a different provider")
|
||||
}
|
||||
|
||||
func TestWebhookEventRepo_DuplicateInsert(t *testing.T) {
|
||||
db := setupWebhookTestDB(t)
|
||||
repo := NewWebhookEventRepository(db)
|
||||
|
||||
// First insert should succeed
|
||||
err := repo.RecordEvent("apple", "evt_dup", "RENEWAL", "hash1")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Second insert with the same provider + event ID should fail (unique constraint)
|
||||
err = repo.RecordEvent("apple", "evt_dup", "RENEWAL", "hash1")
|
||||
require.Error(t, err, "expected an error when inserting a duplicate provider + event_id")
|
||||
|
||||
// Verify only one row exists
|
||||
var count int64
|
||||
db.Model(&WebhookEvent{}).Where("provider = ? AND event_id = ?", "apple", "evt_dup").Count(&count)
|
||||
assert.Equal(t, int64(1), count, "expected exactly one row for the duplicated event")
|
||||
}
|
||||
|
||||
func TestWebhookEventRepo_DifferentProviders(t *testing.T) {
|
||||
db := setupWebhookTestDB(t)
|
||||
repo := NewWebhookEventRepository(db)
|
||||
|
||||
sharedEventID := "evt_shared_123"
|
||||
|
||||
// Record event for "apple" provider
|
||||
err := repo.RecordEvent("apple", sharedEventID, "INITIAL_BUY", "applehash")
|
||||
require.NoError(t, err)
|
||||
|
||||
// HasProcessed should return true for "apple"
|
||||
processed, err := repo.HasProcessed("apple", sharedEventID)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, processed, "expected HasProcessed to return true for apple provider")
|
||||
|
||||
// HasProcessed should return false for "google" with the same event ID
|
||||
processed, err = repo.HasProcessed("google", sharedEventID)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, processed, "expected HasProcessed to return false for google provider with the same event ID")
|
||||
|
||||
// Recording the same event ID under "google" should succeed (different provider)
|
||||
err = repo.RecordEvent("google", sharedEventID, "INITIAL_BUY", "googlehash")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Now both providers should show as processed
|
||||
processed, err = repo.HasProcessed("apple", sharedEventID)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, processed, "expected apple to still be processed")
|
||||
|
||||
processed, err = repo.HasProcessed("google", sharedEventID)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, processed, "expected google to now be processed")
|
||||
}
|
||||
Reference in New Issue
Block a user