Files
honeyDueAPI/internal/services/upload_service.go
T
Trey t 29c9014a33
Backend CI / Test (push) Has been cancelled
Backend CI / Contract Tests (push) Has been cancelled
Backend CI / Build (push) Has been cancelled
Backend CI / Lint (push) Has been cancelled
Backend CI / Secret Scanning (push) Has been cancelled
feat(uploads): direct-to-B2 presigned uploads with content-length-range policy
Replaces the multipart-via-API path for image uploads with a three-step
direct-to-storage flow:

  1. Client POSTs /api/uploads/presign with content_length + content_type;
     server validates size (10 MB cap), mime allow-list per category, rate
     limit (50/hour/user via Redis sliding window), and concurrent unclaimed
     cap (10 in-flight per user). On success it persists a pending_uploads
     row, signs an S3 POST policy with content-length-range bound to the
     claimed length ±256 bytes, and returns the URL+fields.
  2. Client POSTs the bytes directly to B2 using the signed policy. B2
     enforces size, content-type, and key match before accepting.
  3. Client passes upload_ids[] to /api/task-completions/ or /api/documents/.
     Service HEADs each B2 object, verifies size matches expected_bytes
     within slack, marks pending_uploads claimed_at, and creates the
     associated TaskCompletionImage / DocumentImage rows.

Bytes never traverse our API server. The 1 MB Echo BodyLimit middleware
that was rejecting all task-completion image uploads becomes irrelevant
for this path. Existing multipart endpoints stay functional alongside,
soak-testing the new path before legacy removal.

Cleanup:
  - cmd/worker registers a new hourly cron (TypeUploadCleanup, "30 * * * *")
    that reaps pending_uploads where claimed_at IS NULL AND expires_at < NOW().
    Reaps both the B2 object and the row.
  - B2 bucket lifecycle rule on `uploads/` prefix (7 days hide → 1 day delete)
    documented in deploy-k3s/manifests/b2-lifecycle.md as a backstop.

Schema:
  - migrations/000002_pending_uploads.sql adds the table + partial index for
    cleanup + nullable pending_upload_id FKs on task_taskcompletionimage and
    task_documentimage.

Policy (single tier, no free/pro split):
  - 10 MB cap per upload
  - 50 presigns/hour/user
  - 10 concurrent unclaimed uploads/user
  - allow-list: jpeg/png/heic/heif/webp for image categories;
    + pdf for document_file

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 14:36:42 -07:00

367 lines
12 KiB
Go

package services
import (
"context"
"fmt"
"strings"
"time"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"github.com/rs/zerolog/log"
"github.com/treytartt/honeydue-api/internal/apperrors"
"github.com/treytartt/honeydue-api/internal/config"
"github.com/treytartt/honeydue-api/internal/dto/responses"
"github.com/treytartt/honeydue-api/internal/models"
"github.com/treytartt/honeydue-api/internal/repositories"
)
// Upload policy constants. These are NOT tier-differentiated by request — a
// single number applies to every authenticated user regardless of free/pro
// status. Adjust here only; per-tier overrides should be added via
// config/SubscriptionSettings rather than splitting the constants.
const (
UploadMaxBytes = 10 * 1024 * 1024 // 10 MiB per single object
UploadPresignSlackBytes = 256 // ± slack for content-length-range
UploadPresignTTL = 15 * time.Minute // signed URL lifetime
UploadCleanupTTL = 24 * time.Hour // unclaimed row reap window
UploadPresignsPerHour = 50 // per-user rate cap
UploadConcurrentUnclaimed = 10 // per-user in-flight cap
uploadRateLimitRedisPrefix = "upload:presign:"
)
// allowedContentTypes maps each category to the set of mime types accepted
// for the upload. Anything outside this set is rejected before signing.
var allowedContentTypes = map[models.UploadCategory]map[string]bool{
models.UploadCategoryCompletion: {
"image/jpeg": true,
"image/png": true,
"image/heic": true,
"image/heif": true,
"image/webp": true,
},
models.UploadCategoryDocumentImage: {
"image/jpeg": true,
"image/png": true,
"image/heic": true,
"image/heif": true,
"image/webp": true,
},
models.UploadCategoryDocumentFile: {
"image/jpeg": true,
"image/png": true,
"image/heic": true,
"image/heif": true,
"image/webp": true,
"application/pdf": true,
},
}
// uploadCategoryToSubdir maps the category to the path prefix used inside the
// bucket. Mirrors the pattern in storage_service.go.
var uploadCategoryToSubdir = map[models.UploadCategory]string{
models.UploadCategoryCompletion: "completions",
models.UploadCategoryDocumentImage: "documents",
models.UploadCategoryDocumentFile: "documents",
}
// UploadService orchestrates presigned-URL upload sessions. It owns:
//
// - validation (size, mime, category)
// - quota + rate-limit enforcement (Redis preferred; DB fallback)
// - signing the B2 POST policy via the existing S3Backend
// - tracking the session in pending_uploads so the attach path can verify
// and claim the object
type UploadService struct {
repo *repositories.PendingUploadRepository
s3 *S3Backend
cfg *config.StorageConfig
cache *CacheService
redisEnabled bool
}
// NewUploadService wires the service. s3 may be nil if storage isn't
// configured for S3 (local-disk dev mode); presign requests will then
// return a clear error.
func NewUploadService(
repo *repositories.PendingUploadRepository,
s3 *S3Backend,
cfg *config.StorageConfig,
cache *CacheService,
) *UploadService {
return &UploadService{
repo: repo,
s3: s3,
cfg: cfg,
cache: cache,
redisEnabled: cache != nil,
}
}
// Presign validates the request, enforces quota + rate limits, signs a B2
// POST policy, persists the pending_uploads row, and returns the response
// the client needs to perform the upload.
//
// Errors are mapped to apperrors so the HTTP layer can return the right
// status code:
//
// 413 — content_length over UploadMaxBytes
// 422 — content_type not allowed for the category
// 429 — over the rate limit OR over the concurrent in-flight cap
// 500 — storage not configured / signing failure
func (s *UploadService) Presign(
ctx context.Context,
userID uint,
category models.UploadCategory,
contentType string,
contentLength int64,
) (*responses.PresignUploadResponse, error) {
if s.s3 == nil {
return nil, apperrors.Internal(fmt.Errorf("upload: S3 backend not configured"))
}
// Size cap. 413 semantically; we use BadRequest because that's the
// pattern across the codebase for dto-validation rejections.
if contentLength <= 0 || contentLength > UploadMaxBytes {
return nil, apperrors.BadRequest("error.upload_too_large")
}
// Mime check is per-category — completion photos can't be PDFs.
allowed, ok := allowedContentTypes[category]
if !ok {
return nil, apperrors.BadRequest("error.upload_invalid_category")
}
if !allowed[strings.ToLower(contentType)] {
return nil, apperrors.BadRequest("error.upload_unsupported_content_type")
}
now := time.Now().UTC()
// Concurrency cap: how many sessions does this user have unclaimed and
// not yet expired? Cheap COUNT, indexed.
active, err := s.repo.WithContext(ctx).CountUnclaimedActiveForUser(userID, now)
if err != nil {
return nil, apperrors.Internal(err)
}
if active >= UploadConcurrentUnclaimed {
return nil, apperrors.TooManyRequests("error.upload_too_many_in_flight")
}
// Rate limit: presigns issued in the last hour. Redis when available
// (cheap atomic counter); DB fallback for dev/test.
if err := s.checkRateLimit(ctx, userID, now); err != nil {
return nil, err
}
// Pick a stable storage key. UUID + extension is enough — no need for the
// timestamp-prefix trick in storage_service.go because we never list this
// path; lookups go through pending_uploads.id.
subdir := uploadCategoryToSubdir[category]
if subdir == "" {
subdir = "uploads"
}
ext := extensionForContentType(contentType)
key := fmt.Sprintf("uploads/%s/%d/%s%s", subdir, userID, uuid.New().String(), ext)
// Sign the POST policy. The slack window lets the client encode once;
// the server still rejects anything materially different from claimed.
minB := contentLength - UploadPresignSlackBytes
if minB < 0 {
minB = 0
}
maxB := contentLength + UploadPresignSlackBytes
if maxB > UploadMaxBytes {
maxB = UploadMaxBytes
}
post, err := s.s3.PresignedPost(ctx, key, contentType, minB, maxB, UploadPresignTTL)
if err != nil {
return nil, apperrors.Internal(fmt.Errorf("presign upload: %w", err))
}
expiresAt := now.Add(UploadPresignTTL)
row := &models.PendingUpload{
UserID: userID,
Category: category,
B2Key: key,
ContentType: strings.ToLower(contentType),
ExpectedBytes: contentLength,
ExpiresAt: expiresAt,
}
if err := s.repo.WithContext(ctx).Create(row); err != nil {
return nil, apperrors.Internal(err)
}
return &responses.PresignUploadResponse{
ID: row.ID,
URL: post.URL,
Fields: post.Fields,
Key: key,
ExpiresAt: expiresAt.Format(time.RFC3339),
}, nil
}
// VerifyAndClaim is called from CreateCompletion / CreateDocument. It locks
// the pending rows, HEADs each B2 object to confirm size + content-type,
// flips claimed_at, and returns the verified rows so the caller can write
// task_completion_image / document_image rows referencing them.
//
// If any verification fails, the entire batch is rejected and no rows are
// claimed — atomic semantics matter so one bad upload doesn't half-attach.
func (s *UploadService) VerifyAndClaim(
ctx context.Context,
userID uint,
uploadIDs []uint,
) ([]models.PendingUpload, error) {
if len(uploadIDs) == 0 {
return nil, nil
}
if s.s3 == nil {
return nil, apperrors.Internal(fmt.Errorf("upload: S3 backend not configured"))
}
rows, err := s.repo.WithContext(ctx).FindUnclaimedForUser(userID, uploadIDs)
if err != nil {
return nil, apperrors.Internal(err)
}
if len(rows) != len(uploadIDs) {
// Either some IDs don't exist, belong to another user, or are already
// claimed. We don't differentiate — same status either way.
return nil, apperrors.NotFound("error.upload_not_found")
}
now := time.Now().UTC()
verified := make([]models.PendingUpload, 0, len(rows))
for i := range rows {
r := rows[i]
if r.IsExpired(now) {
return nil, apperrors.BadRequest("error.upload_expired")
}
info, err := s.s3.Stat(ctx, r.B2Key)
if err != nil {
log.Warn().Err(err).Uint("upload_id", r.ID).Str("key", r.B2Key).Msg("upload claim: stat failed")
return nil, apperrors.BadRequest("error.upload_not_uploaded")
}
// Size must match the claimed bytes within the policy slack window.
// Anything outside it means the client lied or B2 misreported.
if info.Size < r.ExpectedBytes-UploadPresignSlackBytes ||
info.Size > r.ExpectedBytes+UploadPresignSlackBytes {
return nil, apperrors.BadRequest("error.upload_size_mismatch")
}
if err := s.repo.WithContext(ctx).MarkClaimed(r.ID, info.Size, now); err != nil {
return nil, apperrors.Internal(err)
}
r.ActualBytes = &info.Size
r.ClaimedAt = &now
verified = append(verified, r)
}
return verified, nil
}
// CleanupExpired finds unclaimed rows past their expires_at, deletes the
// corresponding B2 objects, and removes the rows. Called from the Asynq
// hourly cron in cmd/worker.
//
// Returns the number of rows reaped. Errors per row are logged and the loop
// continues — one stuck object shouldn't block the others.
func (s *UploadService) CleanupExpired(ctx context.Context, batchLimit int) (int, error) {
if s.s3 == nil {
return 0, fmt.Errorf("upload: S3 backend not configured")
}
now := time.Now().UTC()
rows, err := s.repo.WithContext(ctx).FindExpiredUnclaimed(now, batchLimit)
if err != nil {
return 0, fmt.Errorf("find expired: %w", err)
}
reaped := 0
for _, r := range rows {
if err := s.s3.Delete(r.B2Key); err != nil {
log.Warn().Err(err).Uint("upload_id", r.ID).Str("key", r.B2Key).Msg("upload cleanup: B2 delete failed (continuing)")
// Continue to row delete anyway — bucket lifecycle backstop will
// pick up the orphaned B2 object after 7 days.
}
if err := s.repo.WithContext(ctx).DeleteByID(r.ID); err != nil {
log.Warn().Err(err).Uint("upload_id", r.ID).Msg("upload cleanup: row delete failed")
continue
}
reaped++
}
return reaped, nil
}
// checkRateLimit enforces UploadPresignsPerHour using Redis (preferred) or
// the DB as a fallback. Returns a 429 apperror when over.
func (s *UploadService) checkRateLimit(ctx context.Context, userID uint, now time.Time) error {
if s.redisEnabled {
bucket := now.Truncate(time.Hour).Unix()
key := fmt.Sprintf("%s%d:%d", uploadRateLimitRedisPrefix, userID, bucket)
client := s.cache.Client()
count, err := client.Incr(ctx, key).Result()
if err == nil {
// Best-effort EXPIRE; only set on first INCR (count == 1).
if count == 1 {
_ = client.Expire(ctx, key, 2*time.Hour).Err()
}
if count > UploadPresignsPerHour {
return apperrors.TooManyRequests("error.upload_rate_limit")
}
return nil
}
// Redis transient failure — fall through to DB. Don't open the gates,
// don't crash the request.
if err != redis.Nil {
log.Warn().Err(err).Msg("upload rate limit: redis unavailable, falling back to DB")
}
}
since := now.Add(-1 * time.Hour)
count, err := s.repo.WithContext(ctx).CountCreatedSinceForUser(userID, since)
if err != nil {
return apperrors.Internal(err)
}
if count >= UploadPresignsPerHour {
return apperrors.TooManyRequests("error.upload_rate_limit")
}
return nil
}
// urlForUploadKey returns the public URL for a stored object given its B2
// key. Mirrors the URL format used by StorageService.Upload so the existing
// media handler can serve presigned-uploaded objects without changes.
//
// If storageService is nil (shouldn't happen in production but defensive)
// we fall back to returning the raw key prefixed with "/" — better than an
// empty URL on a clearly buggy code path.
func urlForUploadKey(storageService *StorageService, b2Key string) string {
if storageService == nil || storageService.cfg == nil {
return "/" + b2Key
}
base := strings.TrimRight(storageService.cfg.BaseURL, "/")
return fmt.Sprintf("%s/%s", base, b2Key)
}
// extensionForContentType picks a sensible file extension for the storage
// key. Falls back to .bin so the key is always non-empty even for unknown
// types — the policy will reject anything with the wrong content-type
// regardless of the extension we pick here.
func extensionForContentType(contentType string) string {
switch strings.ToLower(strings.TrimSpace(contentType)) {
case "image/jpeg", "image/jpg":
return ".jpg"
case "image/png":
return ".png"
case "image/heic":
return ".heic"
case "image/heif":
return ".heif"
case "image/webp":
return ".webp"
case "application/pdf":
return ".pdf"
default:
return ".bin"
}
}