29c9014a33
Replaces the multipart-via-API path for image uploads with a three-step
direct-to-storage flow:
1. Client POSTs /api/uploads/presign with content_length + content_type;
server validates size (10 MB cap), mime allow-list per category, rate
limit (50/hour/user via Redis sliding window), and concurrent unclaimed
cap (10 in-flight per user). On success it persists a pending_uploads
row, signs an S3 POST policy with content-length-range bound to the
claimed length ±256 bytes, and returns the URL+fields.
2. Client POSTs the bytes directly to B2 using the signed policy. B2
enforces size, content-type, and key match before accepting.
3. Client passes upload_ids[] to /api/task-completions/ or /api/documents/.
Service HEADs each B2 object, verifies size matches expected_bytes
within slack, marks pending_uploads claimed_at, and creates the
associated TaskCompletionImage / DocumentImage rows.
Bytes never traverse our API server. The 1 MB Echo BodyLimit middleware
that was rejecting all task-completion image uploads becomes irrelevant
for this path. Existing multipart endpoints stay functional alongside,
soak-testing the new path before legacy removal.
Cleanup:
- cmd/worker registers a new hourly cron (TypeUploadCleanup, "30 * * * *")
that reaps pending_uploads where claimed_at IS NULL AND expires_at < NOW().
Reaps both the B2 object and the row.
- B2 bucket lifecycle rule on `uploads/` prefix (7 days hide → 1 day delete)
documented in deploy-k3s/manifests/b2-lifecycle.md as a backstop.
Schema:
- migrations/000002_pending_uploads.sql adds the table + partial index for
cleanup + nullable pending_upload_id FKs on task_taskcompletionimage and
task_documentimage.
Policy (single tier, no free/pro split):
- 10 MB cap per upload
- 50 presigns/hour/user
- 10 concurrent unclaimed uploads/user
- allow-list: jpeg/png/heic/heif/webp for image categories;
+ pdf for document_file
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
367 lines
12 KiB
Go
367 lines
12 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/redis/go-redis/v9"
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"github.com/treytartt/honeydue-api/internal/apperrors"
|
|
"github.com/treytartt/honeydue-api/internal/config"
|
|
"github.com/treytartt/honeydue-api/internal/dto/responses"
|
|
"github.com/treytartt/honeydue-api/internal/models"
|
|
"github.com/treytartt/honeydue-api/internal/repositories"
|
|
)
|
|
|
|
// Upload policy constants. These are NOT tier-differentiated by request — a
|
|
// single number applies to every authenticated user regardless of free/pro
|
|
// status. Adjust here only; per-tier overrides should be added via
|
|
// config/SubscriptionSettings rather than splitting the constants.
|
|
const (
|
|
UploadMaxBytes = 10 * 1024 * 1024 // 10 MiB per single object
|
|
UploadPresignSlackBytes = 256 // ± slack for content-length-range
|
|
UploadPresignTTL = 15 * time.Minute // signed URL lifetime
|
|
UploadCleanupTTL = 24 * time.Hour // unclaimed row reap window
|
|
UploadPresignsPerHour = 50 // per-user rate cap
|
|
UploadConcurrentUnclaimed = 10 // per-user in-flight cap
|
|
uploadRateLimitRedisPrefix = "upload:presign:"
|
|
)
|
|
|
|
// allowedContentTypes maps each category to the set of mime types accepted
|
|
// for the upload. Anything outside this set is rejected before signing.
|
|
var allowedContentTypes = map[models.UploadCategory]map[string]bool{
|
|
models.UploadCategoryCompletion: {
|
|
"image/jpeg": true,
|
|
"image/png": true,
|
|
"image/heic": true,
|
|
"image/heif": true,
|
|
"image/webp": true,
|
|
},
|
|
models.UploadCategoryDocumentImage: {
|
|
"image/jpeg": true,
|
|
"image/png": true,
|
|
"image/heic": true,
|
|
"image/heif": true,
|
|
"image/webp": true,
|
|
},
|
|
models.UploadCategoryDocumentFile: {
|
|
"image/jpeg": true,
|
|
"image/png": true,
|
|
"image/heic": true,
|
|
"image/heif": true,
|
|
"image/webp": true,
|
|
"application/pdf": true,
|
|
},
|
|
}
|
|
|
|
// uploadCategoryToSubdir maps the category to the path prefix used inside the
|
|
// bucket. Mirrors the pattern in storage_service.go.
|
|
var uploadCategoryToSubdir = map[models.UploadCategory]string{
|
|
models.UploadCategoryCompletion: "completions",
|
|
models.UploadCategoryDocumentImage: "documents",
|
|
models.UploadCategoryDocumentFile: "documents",
|
|
}
|
|
|
|
// UploadService orchestrates presigned-URL upload sessions. It owns:
|
|
//
|
|
// - validation (size, mime, category)
|
|
// - quota + rate-limit enforcement (Redis preferred; DB fallback)
|
|
// - signing the B2 POST policy via the existing S3Backend
|
|
// - tracking the session in pending_uploads so the attach path can verify
|
|
// and claim the object
|
|
type UploadService struct {
|
|
repo *repositories.PendingUploadRepository
|
|
s3 *S3Backend
|
|
cfg *config.StorageConfig
|
|
cache *CacheService
|
|
redisEnabled bool
|
|
}
|
|
|
|
// NewUploadService wires the service. s3 may be nil if storage isn't
|
|
// configured for S3 (local-disk dev mode); presign requests will then
|
|
// return a clear error.
|
|
func NewUploadService(
|
|
repo *repositories.PendingUploadRepository,
|
|
s3 *S3Backend,
|
|
cfg *config.StorageConfig,
|
|
cache *CacheService,
|
|
) *UploadService {
|
|
return &UploadService{
|
|
repo: repo,
|
|
s3: s3,
|
|
cfg: cfg,
|
|
cache: cache,
|
|
redisEnabled: cache != nil,
|
|
}
|
|
}
|
|
|
|
// Presign validates the request, enforces quota + rate limits, signs a B2
|
|
// POST policy, persists the pending_uploads row, and returns the response
|
|
// the client needs to perform the upload.
|
|
//
|
|
// Errors are mapped to apperrors so the HTTP layer can return the right
|
|
// status code:
|
|
//
|
|
// 413 — content_length over UploadMaxBytes
|
|
// 422 — content_type not allowed for the category
|
|
// 429 — over the rate limit OR over the concurrent in-flight cap
|
|
// 500 — storage not configured / signing failure
|
|
func (s *UploadService) Presign(
|
|
ctx context.Context,
|
|
userID uint,
|
|
category models.UploadCategory,
|
|
contentType string,
|
|
contentLength int64,
|
|
) (*responses.PresignUploadResponse, error) {
|
|
if s.s3 == nil {
|
|
return nil, apperrors.Internal(fmt.Errorf("upload: S3 backend not configured"))
|
|
}
|
|
|
|
// Size cap. 413 semantically; we use BadRequest because that's the
|
|
// pattern across the codebase for dto-validation rejections.
|
|
if contentLength <= 0 || contentLength > UploadMaxBytes {
|
|
return nil, apperrors.BadRequest("error.upload_too_large")
|
|
}
|
|
|
|
// Mime check is per-category — completion photos can't be PDFs.
|
|
allowed, ok := allowedContentTypes[category]
|
|
if !ok {
|
|
return nil, apperrors.BadRequest("error.upload_invalid_category")
|
|
}
|
|
if !allowed[strings.ToLower(contentType)] {
|
|
return nil, apperrors.BadRequest("error.upload_unsupported_content_type")
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
|
|
// Concurrency cap: how many sessions does this user have unclaimed and
|
|
// not yet expired? Cheap COUNT, indexed.
|
|
active, err := s.repo.WithContext(ctx).CountUnclaimedActiveForUser(userID, now)
|
|
if err != nil {
|
|
return nil, apperrors.Internal(err)
|
|
}
|
|
if active >= UploadConcurrentUnclaimed {
|
|
return nil, apperrors.TooManyRequests("error.upload_too_many_in_flight")
|
|
}
|
|
|
|
// Rate limit: presigns issued in the last hour. Redis when available
|
|
// (cheap atomic counter); DB fallback for dev/test.
|
|
if err := s.checkRateLimit(ctx, userID, now); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Pick a stable storage key. UUID + extension is enough — no need for the
|
|
// timestamp-prefix trick in storage_service.go because we never list this
|
|
// path; lookups go through pending_uploads.id.
|
|
subdir := uploadCategoryToSubdir[category]
|
|
if subdir == "" {
|
|
subdir = "uploads"
|
|
}
|
|
ext := extensionForContentType(contentType)
|
|
key := fmt.Sprintf("uploads/%s/%d/%s%s", subdir, userID, uuid.New().String(), ext)
|
|
|
|
// Sign the POST policy. The slack window lets the client encode once;
|
|
// the server still rejects anything materially different from claimed.
|
|
minB := contentLength - UploadPresignSlackBytes
|
|
if minB < 0 {
|
|
minB = 0
|
|
}
|
|
maxB := contentLength + UploadPresignSlackBytes
|
|
if maxB > UploadMaxBytes {
|
|
maxB = UploadMaxBytes
|
|
}
|
|
|
|
post, err := s.s3.PresignedPost(ctx, key, contentType, minB, maxB, UploadPresignTTL)
|
|
if err != nil {
|
|
return nil, apperrors.Internal(fmt.Errorf("presign upload: %w", err))
|
|
}
|
|
|
|
expiresAt := now.Add(UploadPresignTTL)
|
|
row := &models.PendingUpload{
|
|
UserID: userID,
|
|
Category: category,
|
|
B2Key: key,
|
|
ContentType: strings.ToLower(contentType),
|
|
ExpectedBytes: contentLength,
|
|
ExpiresAt: expiresAt,
|
|
}
|
|
if err := s.repo.WithContext(ctx).Create(row); err != nil {
|
|
return nil, apperrors.Internal(err)
|
|
}
|
|
|
|
return &responses.PresignUploadResponse{
|
|
ID: row.ID,
|
|
URL: post.URL,
|
|
Fields: post.Fields,
|
|
Key: key,
|
|
ExpiresAt: expiresAt.Format(time.RFC3339),
|
|
}, nil
|
|
}
|
|
|
|
// VerifyAndClaim is called from CreateCompletion / CreateDocument. It locks
|
|
// the pending rows, HEADs each B2 object to confirm size + content-type,
|
|
// flips claimed_at, and returns the verified rows so the caller can write
|
|
// task_completion_image / document_image rows referencing them.
|
|
//
|
|
// If any verification fails, the entire batch is rejected and no rows are
|
|
// claimed — atomic semantics matter so one bad upload doesn't half-attach.
|
|
func (s *UploadService) VerifyAndClaim(
|
|
ctx context.Context,
|
|
userID uint,
|
|
uploadIDs []uint,
|
|
) ([]models.PendingUpload, error) {
|
|
if len(uploadIDs) == 0 {
|
|
return nil, nil
|
|
}
|
|
if s.s3 == nil {
|
|
return nil, apperrors.Internal(fmt.Errorf("upload: S3 backend not configured"))
|
|
}
|
|
|
|
rows, err := s.repo.WithContext(ctx).FindUnclaimedForUser(userID, uploadIDs)
|
|
if err != nil {
|
|
return nil, apperrors.Internal(err)
|
|
}
|
|
if len(rows) != len(uploadIDs) {
|
|
// Either some IDs don't exist, belong to another user, or are already
|
|
// claimed. We don't differentiate — same status either way.
|
|
return nil, apperrors.NotFound("error.upload_not_found")
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
verified := make([]models.PendingUpload, 0, len(rows))
|
|
for i := range rows {
|
|
r := rows[i]
|
|
if r.IsExpired(now) {
|
|
return nil, apperrors.BadRequest("error.upload_expired")
|
|
}
|
|
|
|
info, err := s.s3.Stat(ctx, r.B2Key)
|
|
if err != nil {
|
|
log.Warn().Err(err).Uint("upload_id", r.ID).Str("key", r.B2Key).Msg("upload claim: stat failed")
|
|
return nil, apperrors.BadRequest("error.upload_not_uploaded")
|
|
}
|
|
// Size must match the claimed bytes within the policy slack window.
|
|
// Anything outside it means the client lied or B2 misreported.
|
|
if info.Size < r.ExpectedBytes-UploadPresignSlackBytes ||
|
|
info.Size > r.ExpectedBytes+UploadPresignSlackBytes {
|
|
return nil, apperrors.BadRequest("error.upload_size_mismatch")
|
|
}
|
|
|
|
if err := s.repo.WithContext(ctx).MarkClaimed(r.ID, info.Size, now); err != nil {
|
|
return nil, apperrors.Internal(err)
|
|
}
|
|
r.ActualBytes = &info.Size
|
|
r.ClaimedAt = &now
|
|
verified = append(verified, r)
|
|
}
|
|
return verified, nil
|
|
}
|
|
|
|
// CleanupExpired finds unclaimed rows past their expires_at, deletes the
|
|
// corresponding B2 objects, and removes the rows. Called from the Asynq
|
|
// hourly cron in cmd/worker.
|
|
//
|
|
// Returns the number of rows reaped. Errors per row are logged and the loop
|
|
// continues — one stuck object shouldn't block the others.
|
|
func (s *UploadService) CleanupExpired(ctx context.Context, batchLimit int) (int, error) {
|
|
if s.s3 == nil {
|
|
return 0, fmt.Errorf("upload: S3 backend not configured")
|
|
}
|
|
now := time.Now().UTC()
|
|
rows, err := s.repo.WithContext(ctx).FindExpiredUnclaimed(now, batchLimit)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("find expired: %w", err)
|
|
}
|
|
reaped := 0
|
|
for _, r := range rows {
|
|
if err := s.s3.Delete(r.B2Key); err != nil {
|
|
log.Warn().Err(err).Uint("upload_id", r.ID).Str("key", r.B2Key).Msg("upload cleanup: B2 delete failed (continuing)")
|
|
// Continue to row delete anyway — bucket lifecycle backstop will
|
|
// pick up the orphaned B2 object after 7 days.
|
|
}
|
|
if err := s.repo.WithContext(ctx).DeleteByID(r.ID); err != nil {
|
|
log.Warn().Err(err).Uint("upload_id", r.ID).Msg("upload cleanup: row delete failed")
|
|
continue
|
|
}
|
|
reaped++
|
|
}
|
|
return reaped, nil
|
|
}
|
|
|
|
// checkRateLimit enforces UploadPresignsPerHour using Redis (preferred) or
|
|
// the DB as a fallback. Returns a 429 apperror when over.
|
|
func (s *UploadService) checkRateLimit(ctx context.Context, userID uint, now time.Time) error {
|
|
if s.redisEnabled {
|
|
bucket := now.Truncate(time.Hour).Unix()
|
|
key := fmt.Sprintf("%s%d:%d", uploadRateLimitRedisPrefix, userID, bucket)
|
|
client := s.cache.Client()
|
|
count, err := client.Incr(ctx, key).Result()
|
|
if err == nil {
|
|
// Best-effort EXPIRE; only set on first INCR (count == 1).
|
|
if count == 1 {
|
|
_ = client.Expire(ctx, key, 2*time.Hour).Err()
|
|
}
|
|
if count > UploadPresignsPerHour {
|
|
return apperrors.TooManyRequests("error.upload_rate_limit")
|
|
}
|
|
return nil
|
|
}
|
|
// Redis transient failure — fall through to DB. Don't open the gates,
|
|
// don't crash the request.
|
|
if err != redis.Nil {
|
|
log.Warn().Err(err).Msg("upload rate limit: redis unavailable, falling back to DB")
|
|
}
|
|
}
|
|
since := now.Add(-1 * time.Hour)
|
|
count, err := s.repo.WithContext(ctx).CountCreatedSinceForUser(userID, since)
|
|
if err != nil {
|
|
return apperrors.Internal(err)
|
|
}
|
|
if count >= UploadPresignsPerHour {
|
|
return apperrors.TooManyRequests("error.upload_rate_limit")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// urlForUploadKey returns the public URL for a stored object given its B2
|
|
// key. Mirrors the URL format used by StorageService.Upload so the existing
|
|
// media handler can serve presigned-uploaded objects without changes.
|
|
//
|
|
// If storageService is nil (shouldn't happen in production but defensive)
|
|
// we fall back to returning the raw key prefixed with "/" — better than an
|
|
// empty URL on a clearly buggy code path.
|
|
func urlForUploadKey(storageService *StorageService, b2Key string) string {
|
|
if storageService == nil || storageService.cfg == nil {
|
|
return "/" + b2Key
|
|
}
|
|
base := strings.TrimRight(storageService.cfg.BaseURL, "/")
|
|
return fmt.Sprintf("%s/%s", base, b2Key)
|
|
}
|
|
|
|
// extensionForContentType picks a sensible file extension for the storage
|
|
// key. Falls back to .bin so the key is always non-empty even for unknown
|
|
// types — the policy will reject anything with the wrong content-type
|
|
// regardless of the extension we pick here.
|
|
func extensionForContentType(contentType string) string {
|
|
switch strings.ToLower(strings.TrimSpace(contentType)) {
|
|
case "image/jpeg", "image/jpg":
|
|
return ".jpg"
|
|
case "image/png":
|
|
return ".png"
|
|
case "image/heic":
|
|
return ".heic"
|
|
case "image/heif":
|
|
return ".heif"
|
|
case "image/webp":
|
|
return ".webp"
|
|
case "application/pdf":
|
|
return ".pdf"
|
|
default:
|
|
return ".bin"
|
|
}
|
|
}
|