Files
honeyDueAPI/internal/services/upload_service.go
T
Trey t 7cc5448a7c
Backend CI / Test (push) Has been cancelled
Backend CI / Contract Tests (push) Has been cancelled
Backend CI / Lint (push) Has been cancelled
Backend CI / Secret Scanning (push) Has been cancelled
Backend CI / Build (push) Has been cancelled
fix(uploads): switch from S3 POST policy to presigned PUT
Backblaze B2's S3-compatible endpoint does not implement the S3 POST
Object operation. It returns HTTP 501 to every POST regardless of URL
style — both path-style (https://s3.<region>.backblazeb2.com/<bucket>/)
and virtual-hosted-style (https://<bucket>.s3.<region>.backblazeb2.com/).

Yesterday's BucketLookupDNS fix produced virtual-hosted URLs, which is
correct for AWS but doesn't help here — B2 rejects POST on either form.
Verified with `curl -X POST https://...backblazeb2.com/honeyDueProd/`
returning 501 directly, with no signature involved.

Replace minio-go's PresignedPostPolicy with PresignHeader + http.MethodPut.
The signed URL now points at a single PUT endpoint, with Content-Type
and Content-Length signed via headers — B2/S3/MinIO all accept it. Drop
the min/max content-length range (we sign exactly one length now);
post-upload size verification still happens in VerifyAndClaim via HEAD.

Response shape:
  - URL  (was: signed POST endpoint)  → now: signed PUT URL
  - Fields → renamed to Headers; client sends them as request headers,
    not multipart form parts
  - Method (new): always "PUT", emitted explicitly so clients don't
    have to hardcode

Companion KMP/iOS commits switch the client paths from multipart POST
to single PUT. Existing builds in the field will need to be rebuilt.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 15:41:48 -05:00

362 lines
12 KiB
Go

package services
import (
"context"
"fmt"
"strings"
"time"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"github.com/rs/zerolog/log"
"github.com/treytartt/honeydue-api/internal/apperrors"
"github.com/treytartt/honeydue-api/internal/config"
"github.com/treytartt/honeydue-api/internal/dto/responses"
"github.com/treytartt/honeydue-api/internal/models"
"github.com/treytartt/honeydue-api/internal/repositories"
)
// Upload policy constants. These are NOT tier-differentiated by request — a
// single number applies to every authenticated user regardless of free/pro
// status. Adjust here only; per-tier overrides should be added via
// config/SubscriptionSettings rather than splitting the constants.
const (
UploadMaxBytes = 10 * 1024 * 1024 // 10 MiB per single object
UploadPresignSlackBytes = 256 // ± slack for content-length-range
UploadPresignTTL = 15 * time.Minute // signed URL lifetime
UploadCleanupTTL = 24 * time.Hour // unclaimed row reap window
UploadPresignsPerHour = 50 // per-user rate cap
UploadConcurrentUnclaimed = 10 // per-user in-flight cap
uploadRateLimitRedisPrefix = "upload:presign:"
)
// allowedContentTypes maps each category to the set of mime types accepted
// for the upload. Anything outside this set is rejected before signing.
var allowedContentTypes = map[models.UploadCategory]map[string]bool{
models.UploadCategoryCompletion: {
"image/jpeg": true,
"image/png": true,
"image/heic": true,
"image/heif": true,
"image/webp": true,
},
models.UploadCategoryDocumentImage: {
"image/jpeg": true,
"image/png": true,
"image/heic": true,
"image/heif": true,
"image/webp": true,
},
models.UploadCategoryDocumentFile: {
"image/jpeg": true,
"image/png": true,
"image/heic": true,
"image/heif": true,
"image/webp": true,
"application/pdf": true,
},
}
// uploadCategoryToSubdir maps the category to the path prefix used inside the
// bucket. Mirrors the pattern in storage_service.go.
var uploadCategoryToSubdir = map[models.UploadCategory]string{
models.UploadCategoryCompletion: "completions",
models.UploadCategoryDocumentImage: "documents",
models.UploadCategoryDocumentFile: "documents",
}
// UploadService orchestrates presigned-URL upload sessions. It owns:
//
// - validation (size, mime, category)
// - quota + rate-limit enforcement (Redis preferred; DB fallback)
// - signing the B2 POST policy via the existing S3Backend
// - tracking the session in pending_uploads so the attach path can verify
// and claim the object
type UploadService struct {
repo *repositories.PendingUploadRepository
s3 *S3Backend
cfg *config.StorageConfig
cache *CacheService
redisEnabled bool
}
// NewUploadService wires the service. s3 may be nil if storage isn't
// configured for S3 (local-disk dev mode); presign requests will then
// return a clear error.
func NewUploadService(
repo *repositories.PendingUploadRepository,
s3 *S3Backend,
cfg *config.StorageConfig,
cache *CacheService,
) *UploadService {
return &UploadService{
repo: repo,
s3: s3,
cfg: cfg,
cache: cache,
redisEnabled: cache != nil,
}
}
// Presign validates the request, enforces quota + rate limits, signs a B2
// POST policy, persists the pending_uploads row, and returns the response
// the client needs to perform the upload.
//
// Errors are mapped to apperrors so the HTTP layer can return the right
// status code:
//
// 413 — content_length over UploadMaxBytes
// 422 — content_type not allowed for the category
// 429 — over the rate limit OR over the concurrent in-flight cap
// 500 — storage not configured / signing failure
func (s *UploadService) Presign(
ctx context.Context,
userID uint,
category models.UploadCategory,
contentType string,
contentLength int64,
) (*responses.PresignUploadResponse, error) {
if s.s3 == nil {
return nil, apperrors.Internal(fmt.Errorf("upload: S3 backend not configured"))
}
// Size cap. 413 semantically; we use BadRequest because that's the
// pattern across the codebase for dto-validation rejections.
if contentLength <= 0 || contentLength > UploadMaxBytes {
return nil, apperrors.BadRequest("error.upload_too_large")
}
// Mime check is per-category — completion photos can't be PDFs.
allowed, ok := allowedContentTypes[category]
if !ok {
return nil, apperrors.BadRequest("error.upload_invalid_category")
}
if !allowed[strings.ToLower(contentType)] {
return nil, apperrors.BadRequest("error.upload_unsupported_content_type")
}
now := time.Now().UTC()
// Concurrency cap: how many sessions does this user have unclaimed and
// not yet expired? Cheap COUNT, indexed.
active, err := s.repo.WithContext(ctx).CountUnclaimedActiveForUser(userID, now)
if err != nil {
return nil, apperrors.Internal(err)
}
if active >= UploadConcurrentUnclaimed {
return nil, apperrors.TooManyRequests("error.upload_too_many_in_flight")
}
// Rate limit: presigns issued in the last hour. Redis when available
// (cheap atomic counter); DB fallback for dev/test.
if err := s.checkRateLimit(ctx, userID, now); err != nil {
return nil, err
}
// Pick a stable storage key. UUID + extension is enough — no need for the
// timestamp-prefix trick in storage_service.go because we never list this
// path; lookups go through pending_uploads.id.
subdir := uploadCategoryToSubdir[category]
if subdir == "" {
subdir = "uploads"
}
ext := extensionForContentType(contentType)
key := fmt.Sprintf("uploads/%s/%d/%s%s", subdir, userID, uuid.New().String(), ext)
// Sign a single-use PUT URL that binds Content-Type and Content-Length.
// B2 does not implement S3's POST Object form upload (returns 501), so
// we use PUT — which works uniformly against AWS S3, B2, and MinIO.
// Size mismatch is caught later in VerifyAndClaim via HEAD; we don't
// need a min/max range here because the signature pins exactly one size.
put, err := s.s3.PresignedPut(ctx, key, contentType, contentLength, UploadPresignTTL)
if err != nil {
return nil, apperrors.Internal(fmt.Errorf("presign upload: %w", err))
}
expiresAt := now.Add(UploadPresignTTL)
row := &models.PendingUpload{
UserID: userID,
Category: category,
B2Key: key,
ContentType: strings.ToLower(contentType),
ExpectedBytes: contentLength,
ExpiresAt: expiresAt,
}
if err := s.repo.WithContext(ctx).Create(row); err != nil {
return nil, apperrors.Internal(err)
}
return &responses.PresignUploadResponse{
ID: row.ID,
URL: put.URL,
Headers: put.Headers,
Method: "PUT",
Key: key,
ExpiresAt: expiresAt.Format(time.RFC3339),
}, nil
}
// VerifyAndClaim is called from CreateCompletion / CreateDocument. It locks
// the pending rows, HEADs each B2 object to confirm size + content-type,
// flips claimed_at, and returns the verified rows so the caller can write
// task_completion_image / document_image rows referencing them.
//
// If any verification fails, the entire batch is rejected and no rows are
// claimed — atomic semantics matter so one bad upload doesn't half-attach.
func (s *UploadService) VerifyAndClaim(
ctx context.Context,
userID uint,
uploadIDs []uint,
) ([]models.PendingUpload, error) {
if len(uploadIDs) == 0 {
return nil, nil
}
if s.s3 == nil {
return nil, apperrors.Internal(fmt.Errorf("upload: S3 backend not configured"))
}
rows, err := s.repo.WithContext(ctx).FindUnclaimedForUser(userID, uploadIDs)
if err != nil {
return nil, apperrors.Internal(err)
}
if len(rows) != len(uploadIDs) {
// Either some IDs don't exist, belong to another user, or are already
// claimed. We don't differentiate — same status either way.
return nil, apperrors.NotFound("error.upload_not_found")
}
now := time.Now().UTC()
verified := make([]models.PendingUpload, 0, len(rows))
for i := range rows {
r := rows[i]
if r.IsExpired(now) {
return nil, apperrors.BadRequest("error.upload_expired")
}
info, err := s.s3.Stat(ctx, r.B2Key)
if err != nil {
log.Warn().Err(err).Uint("upload_id", r.ID).Str("key", r.B2Key).Msg("upload claim: stat failed")
return nil, apperrors.BadRequest("error.upload_not_uploaded")
}
// Size must match the claimed bytes within the policy slack window.
// Anything outside it means the client lied or B2 misreported.
if info.Size < r.ExpectedBytes-UploadPresignSlackBytes ||
info.Size > r.ExpectedBytes+UploadPresignSlackBytes {
return nil, apperrors.BadRequest("error.upload_size_mismatch")
}
if err := s.repo.WithContext(ctx).MarkClaimed(r.ID, info.Size, now); err != nil {
return nil, apperrors.Internal(err)
}
r.ActualBytes = &info.Size
r.ClaimedAt = &now
verified = append(verified, r)
}
return verified, nil
}
// CleanupExpired finds unclaimed rows past their expires_at, deletes the
// corresponding B2 objects, and removes the rows. Called from the Asynq
// hourly cron in cmd/worker.
//
// Returns the number of rows reaped. Errors per row are logged and the loop
// continues — one stuck object shouldn't block the others.
func (s *UploadService) CleanupExpired(ctx context.Context, batchLimit int) (int, error) {
if s.s3 == nil {
return 0, fmt.Errorf("upload: S3 backend not configured")
}
now := time.Now().UTC()
rows, err := s.repo.WithContext(ctx).FindExpiredUnclaimed(now, batchLimit)
if err != nil {
return 0, fmt.Errorf("find expired: %w", err)
}
reaped := 0
for _, r := range rows {
if err := s.s3.Delete(r.B2Key); err != nil {
log.Warn().Err(err).Uint("upload_id", r.ID).Str("key", r.B2Key).Msg("upload cleanup: B2 delete failed (continuing)")
// Continue to row delete anyway — bucket lifecycle backstop will
// pick up the orphaned B2 object after 7 days.
}
if err := s.repo.WithContext(ctx).DeleteByID(r.ID); err != nil {
log.Warn().Err(err).Uint("upload_id", r.ID).Msg("upload cleanup: row delete failed")
continue
}
reaped++
}
return reaped, nil
}
// checkRateLimit enforces UploadPresignsPerHour using Redis (preferred) or
// the DB as a fallback. Returns a 429 apperror when over.
func (s *UploadService) checkRateLimit(ctx context.Context, userID uint, now time.Time) error {
if s.redisEnabled {
bucket := now.Truncate(time.Hour).Unix()
key := fmt.Sprintf("%s%d:%d", uploadRateLimitRedisPrefix, userID, bucket)
client := s.cache.Client()
count, err := client.Incr(ctx, key).Result()
if err == nil {
// Best-effort EXPIRE; only set on first INCR (count == 1).
if count == 1 {
_ = client.Expire(ctx, key, 2*time.Hour).Err()
}
if count > UploadPresignsPerHour {
return apperrors.TooManyRequests("error.upload_rate_limit")
}
return nil
}
// Redis transient failure — fall through to DB. Don't open the gates,
// don't crash the request.
if err != redis.Nil {
log.Warn().Err(err).Msg("upload rate limit: redis unavailable, falling back to DB")
}
}
since := now.Add(-1 * time.Hour)
count, err := s.repo.WithContext(ctx).CountCreatedSinceForUser(userID, since)
if err != nil {
return apperrors.Internal(err)
}
if count >= UploadPresignsPerHour {
return apperrors.TooManyRequests("error.upload_rate_limit")
}
return nil
}
// urlForUploadKey returns the public URL for a stored object given its B2
// key. Mirrors the URL format used by StorageService.Upload so the existing
// media handler can serve presigned-uploaded objects without changes.
//
// If storageService is nil (shouldn't happen in production but defensive)
// we fall back to returning the raw key prefixed with "/" — better than an
// empty URL on a clearly buggy code path.
func urlForUploadKey(storageService *StorageService, b2Key string) string {
if storageService == nil || storageService.cfg == nil {
return "/" + b2Key
}
base := strings.TrimRight(storageService.cfg.BaseURL, "/")
return fmt.Sprintf("%s/%s", base, b2Key)
}
// extensionForContentType picks a sensible file extension for the storage
// key. Falls back to .bin so the key is always non-empty even for unknown
// types — the policy will reject anything with the wrong content-type
// regardless of the extension we pick here.
func extensionForContentType(contentType string) string {
switch strings.ToLower(strings.TrimSpace(contentType)) {
case "image/jpeg", "image/jpg":
return ".jpg"
case "image/png":
return ".png"
case "image/heic":
return ".heic"
case "image/heif":
return ".heif"
case "image/webp":
return ".webp"
case "application/pdf":
return ".pdf"
default:
return ".bin"
}
}