Production hardening: security, resilience, observability, and compliance

Password complexity: custom validator requiring uppercase, lowercase, digit (min 8 chars)
Token expiry: 90-day token lifetime with refresh endpoint (60-90 day renewal window)
Health check: /api/health/ now pings Postgres + Redis, returns 503 on failure
Audit logging: async audit_log table for auth events (login, register, delete, etc.)
Circuit breaker: APNs/FCM push sends wrapped with 5-failure threshold, 30s recovery
FK indexes: 27 missing foreign key indexes across all tables (migration 017)
CSP header: default-src 'none'; frame-ancestors 'none'
Gzip compression: level 5 with media endpoint skipper
Prometheus metrics: /metrics endpoint using existing monitoring service
External timeouts: 15s push, 30s SMTP, context timeouts on all external calls

Migrations: 016 (token created_at), 017 (FK indexes), 018 (audit_log)
Tests: circuit breaker (15), audit service (8), token refresh (7), health (4),
       middleware expiry (5), validator (new)
This commit is contained in:
Trey T
2026-03-26 14:05:28 -05:00
parent 4abc57535e
commit b679f28e55
30 changed files with 2077 additions and 47 deletions

View File

@@ -0,0 +1,167 @@
package push
import (
"errors"
"sync"
"time"
)
// Circuit breaker states
const (
stateClosed = iota // Normal operation, requests pass through
stateOpen // Too many failures, requests are rejected
stateHalfOpen // Testing recovery, one request allowed through
)
// Default circuit breaker settings
const (
defaultFailureThreshold = 5 // Open after this many consecutive failures
defaultRecoveryTimeout = 30 * time.Second // Try again after this duration
)
// ErrCircuitOpen is returned when the circuit breaker is open and rejecting requests.
var ErrCircuitOpen = errors.New("circuit breaker is open")
// CircuitBreaker implements a simple circuit breaker pattern for external service calls.
// It is thread-safe and requires no external dependencies.
//
// States:
// - Closed: normal operation, all requests pass through. Consecutive failures are counted.
// - Open: after reaching the failure threshold, all requests are immediately rejected
// with ErrCircuitOpen until the recovery timeout elapses.
// - Half-Open: after the recovery timeout, one request is allowed through. If it
// succeeds the breaker resets to Closed; if it fails it returns to Open.
type CircuitBreaker struct {
mu sync.Mutex
state int
failureCount int
failureThreshold int
recoveryTimeout time.Duration
lastFailureTime time.Time
name string // For logging
}
// CircuitBreakerOption configures a CircuitBreaker.
type CircuitBreakerOption func(*CircuitBreaker)
// WithFailureThreshold sets the number of consecutive failures before opening the circuit.
func WithFailureThreshold(n int) CircuitBreakerOption {
return func(cb *CircuitBreaker) {
if n > 0 {
cb.failureThreshold = n
}
}
}
// WithRecoveryTimeout sets how long the circuit stays open before trying half-open.
func WithRecoveryTimeout(d time.Duration) CircuitBreakerOption {
return func(cb *CircuitBreaker) {
if d > 0 {
cb.recoveryTimeout = d
}
}
}
// NewCircuitBreaker creates a new CircuitBreaker with the given name and options.
// The name is used for logging and identification.
func NewCircuitBreaker(name string, opts ...CircuitBreakerOption) *CircuitBreaker {
cb := &CircuitBreaker{
state: stateClosed,
failureThreshold: defaultFailureThreshold,
recoveryTimeout: defaultRecoveryTimeout,
name: name,
}
for _, opt := range opts {
opt(cb)
}
return cb
}
// Allow checks whether a request should be allowed through.
// It returns true if the request can proceed, false if the circuit is open.
// When transitioning from open to half-open, it returns true for the probe request.
func (cb *CircuitBreaker) Allow() bool {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.state {
case stateClosed:
return true
case stateOpen:
// Check if recovery timeout has elapsed
if time.Since(cb.lastFailureTime) >= cb.recoveryTimeout {
cb.state = stateHalfOpen
return true
}
return false
case stateHalfOpen:
// Only one request at a time in half-open state.
// The first caller that got here via Allow() is already in flight;
// reject subsequent callers until that probe resolves.
return false
default:
return true
}
}
// RecordSuccess records a successful request. If the breaker is half-open, it resets to closed.
func (cb *CircuitBreaker) RecordSuccess() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.failureCount = 0
cb.state = stateClosed
}
// RecordFailure records a failed request. If the failure threshold is reached, the
// breaker transitions to the open state.
func (cb *CircuitBreaker) RecordFailure() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.failureCount++
cb.lastFailureTime = time.Now()
if cb.failureCount >= cb.failureThreshold {
cb.state = stateOpen
}
}
// State returns the current state of the circuit breaker as a human-readable string.
func (cb *CircuitBreaker) State() string {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.state {
case stateClosed:
return "closed"
case stateOpen:
return "open"
case stateHalfOpen:
return "half-open"
default:
return "unknown"
}
}
// Name returns the circuit breaker's name.
func (cb *CircuitBreaker) Name() string {
return cb.name
}
// Reset resets the circuit breaker to the closed state with zero failures.
func (cb *CircuitBreaker) Reset() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.state = stateClosed
cb.failureCount = 0
cb.lastFailureTime = time.Time{}
}
// Counts returns the current failure count (useful for testing and monitoring).
func (cb *CircuitBreaker) Counts() int {
cb.mu.Lock()
defer cb.mu.Unlock()
return cb.failureCount
}

View File

@@ -0,0 +1,275 @@
package push
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCircuitBreaker_StartsInClosedState(t *testing.T) {
cb := NewCircuitBreaker("test")
assert.Equal(t, "closed", cb.State())
assert.True(t, cb.Allow())
}
func TestCircuitBreaker_OpensAfterThresholdFailures(t *testing.T) {
cb := NewCircuitBreaker("test", WithFailureThreshold(3))
// First two failures should keep it closed
cb.RecordFailure()
assert.Equal(t, "closed", cb.State())
assert.True(t, cb.Allow())
cb.RecordFailure()
assert.Equal(t, "closed", cb.State())
assert.True(t, cb.Allow())
// Third failure should open it
cb.RecordFailure()
assert.Equal(t, "open", cb.State())
assert.False(t, cb.Allow())
}
func TestCircuitBreaker_DefaultThresholdIsFive(t *testing.T) {
cb := NewCircuitBreaker("test")
for i := 0; i < 4; i++ {
cb.RecordFailure()
assert.Equal(t, "closed", cb.State())
}
cb.RecordFailure() // 5th failure
assert.Equal(t, "open", cb.State())
}
func TestCircuitBreaker_RejectsRequestsWhenOpen(t *testing.T) {
cb := NewCircuitBreaker("test", WithFailureThreshold(1))
cb.RecordFailure()
assert.Equal(t, "open", cb.State())
// Multiple calls should all be rejected
for i := 0; i < 10; i++ {
assert.False(t, cb.Allow())
}
}
func TestCircuitBreaker_TransitionsToHalfOpenAfterRecoveryTimeout(t *testing.T) {
cb := NewCircuitBreaker("test",
WithFailureThreshold(1),
WithRecoveryTimeout(50*time.Millisecond),
)
cb.RecordFailure()
assert.Equal(t, "open", cb.State())
assert.False(t, cb.Allow())
// Wait for recovery timeout
time.Sleep(60 * time.Millisecond)
// Should now allow one request (half-open)
assert.True(t, cb.Allow())
assert.Equal(t, "half-open", cb.State())
}
func TestCircuitBreaker_HalfOpenRejectsSecondRequest(t *testing.T) {
cb := NewCircuitBreaker("test",
WithFailureThreshold(1),
WithRecoveryTimeout(50*time.Millisecond),
)
cb.RecordFailure()
time.Sleep(60 * time.Millisecond)
// First request allowed (probe)
assert.True(t, cb.Allow())
assert.Equal(t, "half-open", cb.State())
// Second request rejected while probe is in flight
assert.False(t, cb.Allow())
}
func TestCircuitBreaker_HalfOpenSuccess_ResetsToClosed(t *testing.T) {
cb := NewCircuitBreaker("test",
WithFailureThreshold(1),
WithRecoveryTimeout(50*time.Millisecond),
)
cb.RecordFailure()
time.Sleep(60 * time.Millisecond)
// Probe request
assert.True(t, cb.Allow())
// Probe succeeds
cb.RecordSuccess()
assert.Equal(t, "closed", cb.State())
assert.Equal(t, 0, cb.Counts())
// Normal operation resumes
assert.True(t, cb.Allow())
assert.True(t, cb.Allow())
}
func TestCircuitBreaker_HalfOpenFailure_ReturnsToOpen(t *testing.T) {
cb := NewCircuitBreaker("test",
WithFailureThreshold(2),
WithRecoveryTimeout(50*time.Millisecond),
)
// Open the circuit
cb.RecordFailure()
cb.RecordFailure()
assert.Equal(t, "open", cb.State())
time.Sleep(60 * time.Millisecond)
// Probe request
assert.True(t, cb.Allow())
assert.Equal(t, "half-open", cb.State())
// Probe fails - the failure count is now 3 which is >= threshold of 2
cb.RecordFailure()
assert.Equal(t, "open", cb.State())
assert.False(t, cb.Allow())
}
func TestCircuitBreaker_SuccessResetsFailureCount(t *testing.T) {
cb := NewCircuitBreaker("test", WithFailureThreshold(3))
cb.RecordFailure()
cb.RecordFailure()
assert.Equal(t, 2, cb.Counts())
// A success should reset the counter
cb.RecordSuccess()
assert.Equal(t, 0, cb.Counts())
assert.Equal(t, "closed", cb.State())
// Now it should take 3 more failures to open
cb.RecordFailure()
cb.RecordFailure()
assert.Equal(t, "closed", cb.State())
cb.RecordFailure()
assert.Equal(t, "open", cb.State())
}
func TestCircuitBreaker_Reset(t *testing.T) {
cb := NewCircuitBreaker("test", WithFailureThreshold(1))
cb.RecordFailure()
assert.Equal(t, "open", cb.State())
cb.Reset()
assert.Equal(t, "closed", cb.State())
assert.Equal(t, 0, cb.Counts())
assert.True(t, cb.Allow())
}
func TestCircuitBreaker_Name(t *testing.T) {
cb := NewCircuitBreaker("apns-breaker")
assert.Equal(t, "apns-breaker", cb.Name())
}
func TestCircuitBreaker_CustomOptions(t *testing.T) {
cb := NewCircuitBreaker("test",
WithFailureThreshold(10),
WithRecoveryTimeout(5*time.Minute),
)
// Should take 10 failures to open
for i := 0; i < 9; i++ {
cb.RecordFailure()
assert.Equal(t, "closed", cb.State())
}
cb.RecordFailure()
assert.Equal(t, "open", cb.State())
}
func TestCircuitBreaker_InvalidOptionsIgnored(t *testing.T) {
cb := NewCircuitBreaker("test",
WithFailureThreshold(0), // Should be ignored (keeps default)
WithRecoveryTimeout(-1), // Should be ignored (keeps default)
)
// Default threshold of 5 should still apply
for i := 0; i < 4; i++ {
cb.RecordFailure()
assert.Equal(t, "closed", cb.State())
}
cb.RecordFailure()
assert.Equal(t, "open", cb.State())
}
func TestCircuitBreaker_ThreadSafety(t *testing.T) {
cb := NewCircuitBreaker("test",
WithFailureThreshold(100),
WithRecoveryTimeout(10*time.Millisecond),
)
var wg sync.WaitGroup
const goroutines = 50
const iterations = 100
// Hammer it from many goroutines
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < iterations; j++ {
cb.Allow()
if j%2 == 0 {
cb.RecordFailure()
} else {
cb.RecordSuccess()
}
_ = cb.State()
_ = cb.Counts()
}
}(i)
}
wg.Wait()
// Should not panic or deadlock. State should be valid.
state := cb.State()
require.Contains(t, []string{"closed", "open", "half-open"}, state)
}
func TestCircuitBreaker_FullLifecycle(t *testing.T) {
cb := NewCircuitBreaker("lifecycle-test",
WithFailureThreshold(3),
WithRecoveryTimeout(50*time.Millisecond),
)
// 1. Closed: requests flow normally
assert.True(t, cb.Allow())
cb.RecordSuccess()
assert.Equal(t, "closed", cb.State())
// 2. Accumulate failures
cb.RecordFailure()
cb.RecordFailure()
assert.Equal(t, "closed", cb.State())
// 3. Third failure opens the circuit
cb.RecordFailure()
assert.Equal(t, "open", cb.State())
assert.False(t, cb.Allow())
// 4. Wait for recovery
time.Sleep(60 * time.Millisecond)
// 5. Half-open: probe request allowed
assert.True(t, cb.Allow())
assert.Equal(t, "half-open", cb.State())
// 6. Probe succeeds, back to closed
cb.RecordSuccess()
assert.Equal(t, "closed", cb.State())
assert.True(t, cb.Allow())
}

View File

@@ -2,6 +2,7 @@ package push
import (
"context"
"time"
"github.com/rs/zerolog/log"
@@ -14,16 +15,25 @@ const (
PlatformAndroid = "android"
)
// Timeout for individual push notification send operations.
const pushSendTimeout = 15 * time.Second
// Client provides a unified interface for sending push notifications
type Client struct {
apns *APNsClient
fcm *FCMClient
enabled bool
apns *APNsClient
fcm *FCMClient
enabled bool
apnsBreaker *CircuitBreaker
fcmBreaker *CircuitBreaker
}
// NewClient creates a new unified push notification client
func NewClient(cfg *config.PushConfig, enabled bool) (*Client, error) {
client := &Client{enabled: enabled}
client := &Client{
enabled: enabled,
apnsBreaker: NewCircuitBreaker("apns"),
fcmBreaker: NewCircuitBreaker("fcm"),
}
// Initialize APNs client (iOS)
if cfg.APNSKeyPath != "" && cfg.APNSKeyID != "" && cfg.APNSTeamID != "" {
@@ -54,7 +64,8 @@ func NewClient(cfg *config.PushConfig, enabled bool) (*Client, error) {
return client, nil
}
// SendToIOS sends a push notification to iOS devices
// SendToIOS sends a push notification to iOS devices.
// The call is guarded by a circuit breaker and uses a context timeout.
func (c *Client) SendToIOS(ctx context.Context, tokens []string, title, message string, data map[string]string) error {
if !c.enabled {
log.Debug().Msg("Push notifications disabled by feature flag")
@@ -64,10 +75,26 @@ func (c *Client) SendToIOS(ctx context.Context, tokens []string, title, message
log.Warn().Msg("APNs client not initialized, skipping iOS push")
return nil
}
return c.apns.Send(ctx, tokens, title, message, data)
if !c.apnsBreaker.Allow() {
log.Warn().Str("breaker", c.apnsBreaker.Name()).Msg("APNs circuit breaker is open, skipping iOS push")
return ErrCircuitOpen
}
sendCtx, cancel := context.WithTimeout(ctx, pushSendTimeout)
defer cancel()
err := c.apns.Send(sendCtx, tokens, title, message, data)
if err != nil {
c.apnsBreaker.RecordFailure()
log.Warn().Err(err).Str("breaker_state", c.apnsBreaker.State()).Msg("APNs send failed, recorded circuit breaker failure")
return err
}
c.apnsBreaker.RecordSuccess()
return nil
}
// SendToAndroid sends a push notification to Android devices
// SendToAndroid sends a push notification to Android devices.
// The call is guarded by a circuit breaker and uses a context timeout.
func (c *Client) SendToAndroid(ctx context.Context, tokens []string, title, message string, data map[string]string) error {
if !c.enabled {
log.Debug().Msg("Push notifications disabled by feature flag")
@@ -77,7 +104,22 @@ func (c *Client) SendToAndroid(ctx context.Context, tokens []string, title, mess
log.Warn().Msg("FCM client not initialized, skipping Android push")
return nil
}
return c.fcm.Send(ctx, tokens, title, message, data)
if !c.fcmBreaker.Allow() {
log.Warn().Str("breaker", c.fcmBreaker.Name()).Msg("FCM circuit breaker is open, skipping Android push")
return ErrCircuitOpen
}
sendCtx, cancel := context.WithTimeout(ctx, pushSendTimeout)
defer cancel()
err := c.fcm.Send(sendCtx, tokens, title, message, data)
if err != nil {
c.fcmBreaker.RecordFailure()
log.Warn().Err(err).Str("breaker_state", c.fcmBreaker.State()).Msg("FCM send failed, recorded circuit breaker failure")
return err
}
c.fcmBreaker.RecordSuccess()
return nil
}
// SendToAll sends a push notification to both iOS and Android devices
@@ -115,8 +157,9 @@ func (c *Client) IsAndroidEnabled() bool {
return c.fcm != nil
}
// SendActionableNotification sends notifications with action button support
// iOS receives a category for actionable notifications, Android handles actions via data payload
// SendActionableNotification sends notifications with action button support.
// iOS receives a category for actionable notifications, Android handles actions via data payload.
// Both platforms are guarded by their respective circuit breakers.
func (c *Client) SendActionableNotification(ctx context.Context, iosTokens, androidTokens []string, title, message string, data map[string]string, iosCategoryID string) error {
if !c.enabled {
log.Debug().Msg("Push notifications disabled by feature flag")
@@ -127,10 +170,19 @@ func (c *Client) SendActionableNotification(ctx context.Context, iosTokens, andr
if len(iosTokens) > 0 {
if c.apns == nil {
log.Warn().Msg("APNs client not initialized, skipping iOS actionable push")
} else if !c.apnsBreaker.Allow() {
log.Warn().Str("breaker", c.apnsBreaker.Name()).Msg("APNs circuit breaker is open, skipping iOS actionable push")
lastErr = ErrCircuitOpen
} else {
if err := c.apns.SendWithCategory(ctx, iosTokens, title, message, data, iosCategoryID); err != nil {
sendCtx, cancel := context.WithTimeout(ctx, pushSendTimeout)
err := c.apns.SendWithCategory(sendCtx, iosTokens, title, message, data, iosCategoryID)
cancel()
if err != nil {
c.apnsBreaker.RecordFailure()
log.Error().Err(err).Msg("Failed to send iOS actionable notifications")
lastErr = err
} else {
c.apnsBreaker.RecordSuccess()
}
}
}

View File

@@ -165,7 +165,7 @@ func NewFCMClient(cfg *config.PushConfig) (*FCMClient, error) {
}
httpClient := &http.Client{
Timeout: 30 * time.Second,
Timeout: 15 * time.Second,
Transport: transport,
}