7cc5448a7c
Backblaze B2's S3-compatible endpoint does not implement the S3 POST Object operation. It returns HTTP 501 to every POST regardless of URL style — both path-style (https://s3.<region>.backblazeb2.com/<bucket>/) and virtual-hosted-style (https://<bucket>.s3.<region>.backblazeb2.com/). Yesterday's BucketLookupDNS fix produced virtual-hosted URLs, which is correct for AWS but doesn't help here — B2 rejects POST on either form. Verified with `curl -X POST https://...backblazeb2.com/honeyDueProd/` returning 501 directly, with no signature involved. Replace minio-go's PresignedPostPolicy with PresignHeader + http.MethodPut. The signed URL now points at a single PUT endpoint, with Content-Type and Content-Length signed via headers — B2/S3/MinIO all accept it. Drop the min/max content-length range (we sign exactly one length now); post-upload size verification still happens in VerifyAndClaim via HEAD. Response shape: - URL (was: signed POST endpoint) → now: signed PUT URL - Fields → renamed to Headers; client sends them as request headers, not multipart form parts - Method (new): always "PUT", emitted explicitly so clients don't have to hardcode Companion KMP/iOS commits switch the client paths from multipart POST to single PUT. Existing builds in the field will need to be rebuilt. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
362 lines
12 KiB
Go
362 lines
12 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/redis/go-redis/v9"
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"github.com/treytartt/honeydue-api/internal/apperrors"
|
|
"github.com/treytartt/honeydue-api/internal/config"
|
|
"github.com/treytartt/honeydue-api/internal/dto/responses"
|
|
"github.com/treytartt/honeydue-api/internal/models"
|
|
"github.com/treytartt/honeydue-api/internal/repositories"
|
|
)
|
|
|
|
// Upload policy constants. These are NOT tier-differentiated by request — a
|
|
// single number applies to every authenticated user regardless of free/pro
|
|
// status. Adjust here only; per-tier overrides should be added via
|
|
// config/SubscriptionSettings rather than splitting the constants.
|
|
const (
|
|
UploadMaxBytes = 10 * 1024 * 1024 // 10 MiB per single object
|
|
UploadPresignSlackBytes = 256 // ± slack for content-length-range
|
|
UploadPresignTTL = 15 * time.Minute // signed URL lifetime
|
|
UploadCleanupTTL = 24 * time.Hour // unclaimed row reap window
|
|
UploadPresignsPerHour = 50 // per-user rate cap
|
|
UploadConcurrentUnclaimed = 10 // per-user in-flight cap
|
|
uploadRateLimitRedisPrefix = "upload:presign:"
|
|
)
|
|
|
|
// allowedContentTypes maps each category to the set of mime types accepted
|
|
// for the upload. Anything outside this set is rejected before signing.
|
|
var allowedContentTypes = map[models.UploadCategory]map[string]bool{
|
|
models.UploadCategoryCompletion: {
|
|
"image/jpeg": true,
|
|
"image/png": true,
|
|
"image/heic": true,
|
|
"image/heif": true,
|
|
"image/webp": true,
|
|
},
|
|
models.UploadCategoryDocumentImage: {
|
|
"image/jpeg": true,
|
|
"image/png": true,
|
|
"image/heic": true,
|
|
"image/heif": true,
|
|
"image/webp": true,
|
|
},
|
|
models.UploadCategoryDocumentFile: {
|
|
"image/jpeg": true,
|
|
"image/png": true,
|
|
"image/heic": true,
|
|
"image/heif": true,
|
|
"image/webp": true,
|
|
"application/pdf": true,
|
|
},
|
|
}
|
|
|
|
// uploadCategoryToSubdir maps the category to the path prefix used inside the
|
|
// bucket. Mirrors the pattern in storage_service.go.
|
|
var uploadCategoryToSubdir = map[models.UploadCategory]string{
|
|
models.UploadCategoryCompletion: "completions",
|
|
models.UploadCategoryDocumentImage: "documents",
|
|
models.UploadCategoryDocumentFile: "documents",
|
|
}
|
|
|
|
// UploadService orchestrates presigned-URL upload sessions. It owns:
|
|
//
|
|
// - validation (size, mime, category)
|
|
// - quota + rate-limit enforcement (Redis preferred; DB fallback)
|
|
// - signing the B2 POST policy via the existing S3Backend
|
|
// - tracking the session in pending_uploads so the attach path can verify
|
|
// and claim the object
|
|
type UploadService struct {
|
|
repo *repositories.PendingUploadRepository
|
|
s3 *S3Backend
|
|
cfg *config.StorageConfig
|
|
cache *CacheService
|
|
redisEnabled bool
|
|
}
|
|
|
|
// NewUploadService wires the service. s3 may be nil if storage isn't
|
|
// configured for S3 (local-disk dev mode); presign requests will then
|
|
// return a clear error.
|
|
func NewUploadService(
|
|
repo *repositories.PendingUploadRepository,
|
|
s3 *S3Backend,
|
|
cfg *config.StorageConfig,
|
|
cache *CacheService,
|
|
) *UploadService {
|
|
return &UploadService{
|
|
repo: repo,
|
|
s3: s3,
|
|
cfg: cfg,
|
|
cache: cache,
|
|
redisEnabled: cache != nil,
|
|
}
|
|
}
|
|
|
|
// Presign validates the request, enforces quota + rate limits, signs a B2
|
|
// POST policy, persists the pending_uploads row, and returns the response
|
|
// the client needs to perform the upload.
|
|
//
|
|
// Errors are mapped to apperrors so the HTTP layer can return the right
|
|
// status code:
|
|
//
|
|
// 413 — content_length over UploadMaxBytes
|
|
// 422 — content_type not allowed for the category
|
|
// 429 — over the rate limit OR over the concurrent in-flight cap
|
|
// 500 — storage not configured / signing failure
|
|
func (s *UploadService) Presign(
|
|
ctx context.Context,
|
|
userID uint,
|
|
category models.UploadCategory,
|
|
contentType string,
|
|
contentLength int64,
|
|
) (*responses.PresignUploadResponse, error) {
|
|
if s.s3 == nil {
|
|
return nil, apperrors.Internal(fmt.Errorf("upload: S3 backend not configured"))
|
|
}
|
|
|
|
// Size cap. 413 semantically; we use BadRequest because that's the
|
|
// pattern across the codebase for dto-validation rejections.
|
|
if contentLength <= 0 || contentLength > UploadMaxBytes {
|
|
return nil, apperrors.BadRequest("error.upload_too_large")
|
|
}
|
|
|
|
// Mime check is per-category — completion photos can't be PDFs.
|
|
allowed, ok := allowedContentTypes[category]
|
|
if !ok {
|
|
return nil, apperrors.BadRequest("error.upload_invalid_category")
|
|
}
|
|
if !allowed[strings.ToLower(contentType)] {
|
|
return nil, apperrors.BadRequest("error.upload_unsupported_content_type")
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
|
|
// Concurrency cap: how many sessions does this user have unclaimed and
|
|
// not yet expired? Cheap COUNT, indexed.
|
|
active, err := s.repo.WithContext(ctx).CountUnclaimedActiveForUser(userID, now)
|
|
if err != nil {
|
|
return nil, apperrors.Internal(err)
|
|
}
|
|
if active >= UploadConcurrentUnclaimed {
|
|
return nil, apperrors.TooManyRequests("error.upload_too_many_in_flight")
|
|
}
|
|
|
|
// Rate limit: presigns issued in the last hour. Redis when available
|
|
// (cheap atomic counter); DB fallback for dev/test.
|
|
if err := s.checkRateLimit(ctx, userID, now); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Pick a stable storage key. UUID + extension is enough — no need for the
|
|
// timestamp-prefix trick in storage_service.go because we never list this
|
|
// path; lookups go through pending_uploads.id.
|
|
subdir := uploadCategoryToSubdir[category]
|
|
if subdir == "" {
|
|
subdir = "uploads"
|
|
}
|
|
ext := extensionForContentType(contentType)
|
|
key := fmt.Sprintf("uploads/%s/%d/%s%s", subdir, userID, uuid.New().String(), ext)
|
|
|
|
// Sign a single-use PUT URL that binds Content-Type and Content-Length.
|
|
// B2 does not implement S3's POST Object form upload (returns 501), so
|
|
// we use PUT — which works uniformly against AWS S3, B2, and MinIO.
|
|
// Size mismatch is caught later in VerifyAndClaim via HEAD; we don't
|
|
// need a min/max range here because the signature pins exactly one size.
|
|
put, err := s.s3.PresignedPut(ctx, key, contentType, contentLength, UploadPresignTTL)
|
|
if err != nil {
|
|
return nil, apperrors.Internal(fmt.Errorf("presign upload: %w", err))
|
|
}
|
|
|
|
expiresAt := now.Add(UploadPresignTTL)
|
|
row := &models.PendingUpload{
|
|
UserID: userID,
|
|
Category: category,
|
|
B2Key: key,
|
|
ContentType: strings.ToLower(contentType),
|
|
ExpectedBytes: contentLength,
|
|
ExpiresAt: expiresAt,
|
|
}
|
|
if err := s.repo.WithContext(ctx).Create(row); err != nil {
|
|
return nil, apperrors.Internal(err)
|
|
}
|
|
|
|
return &responses.PresignUploadResponse{
|
|
ID: row.ID,
|
|
URL: put.URL,
|
|
Headers: put.Headers,
|
|
Method: "PUT",
|
|
Key: key,
|
|
ExpiresAt: expiresAt.Format(time.RFC3339),
|
|
}, nil
|
|
}
|
|
|
|
// VerifyAndClaim is called from CreateCompletion / CreateDocument. It locks
|
|
// the pending rows, HEADs each B2 object to confirm size + content-type,
|
|
// flips claimed_at, and returns the verified rows so the caller can write
|
|
// task_completion_image / document_image rows referencing them.
|
|
//
|
|
// If any verification fails, the entire batch is rejected and no rows are
|
|
// claimed — atomic semantics matter so one bad upload doesn't half-attach.
|
|
func (s *UploadService) VerifyAndClaim(
|
|
ctx context.Context,
|
|
userID uint,
|
|
uploadIDs []uint,
|
|
) ([]models.PendingUpload, error) {
|
|
if len(uploadIDs) == 0 {
|
|
return nil, nil
|
|
}
|
|
if s.s3 == nil {
|
|
return nil, apperrors.Internal(fmt.Errorf("upload: S3 backend not configured"))
|
|
}
|
|
|
|
rows, err := s.repo.WithContext(ctx).FindUnclaimedForUser(userID, uploadIDs)
|
|
if err != nil {
|
|
return nil, apperrors.Internal(err)
|
|
}
|
|
if len(rows) != len(uploadIDs) {
|
|
// Either some IDs don't exist, belong to another user, or are already
|
|
// claimed. We don't differentiate — same status either way.
|
|
return nil, apperrors.NotFound("error.upload_not_found")
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
verified := make([]models.PendingUpload, 0, len(rows))
|
|
for i := range rows {
|
|
r := rows[i]
|
|
if r.IsExpired(now) {
|
|
return nil, apperrors.BadRequest("error.upload_expired")
|
|
}
|
|
|
|
info, err := s.s3.Stat(ctx, r.B2Key)
|
|
if err != nil {
|
|
log.Warn().Err(err).Uint("upload_id", r.ID).Str("key", r.B2Key).Msg("upload claim: stat failed")
|
|
return nil, apperrors.BadRequest("error.upload_not_uploaded")
|
|
}
|
|
// Size must match the claimed bytes within the policy slack window.
|
|
// Anything outside it means the client lied or B2 misreported.
|
|
if info.Size < r.ExpectedBytes-UploadPresignSlackBytes ||
|
|
info.Size > r.ExpectedBytes+UploadPresignSlackBytes {
|
|
return nil, apperrors.BadRequest("error.upload_size_mismatch")
|
|
}
|
|
|
|
if err := s.repo.WithContext(ctx).MarkClaimed(r.ID, info.Size, now); err != nil {
|
|
return nil, apperrors.Internal(err)
|
|
}
|
|
r.ActualBytes = &info.Size
|
|
r.ClaimedAt = &now
|
|
verified = append(verified, r)
|
|
}
|
|
return verified, nil
|
|
}
|
|
|
|
// CleanupExpired finds unclaimed rows past their expires_at, deletes the
|
|
// corresponding B2 objects, and removes the rows. Called from the Asynq
|
|
// hourly cron in cmd/worker.
|
|
//
|
|
// Returns the number of rows reaped. Errors per row are logged and the loop
|
|
// continues — one stuck object shouldn't block the others.
|
|
func (s *UploadService) CleanupExpired(ctx context.Context, batchLimit int) (int, error) {
|
|
if s.s3 == nil {
|
|
return 0, fmt.Errorf("upload: S3 backend not configured")
|
|
}
|
|
now := time.Now().UTC()
|
|
rows, err := s.repo.WithContext(ctx).FindExpiredUnclaimed(now, batchLimit)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("find expired: %w", err)
|
|
}
|
|
reaped := 0
|
|
for _, r := range rows {
|
|
if err := s.s3.Delete(r.B2Key); err != nil {
|
|
log.Warn().Err(err).Uint("upload_id", r.ID).Str("key", r.B2Key).Msg("upload cleanup: B2 delete failed (continuing)")
|
|
// Continue to row delete anyway — bucket lifecycle backstop will
|
|
// pick up the orphaned B2 object after 7 days.
|
|
}
|
|
if err := s.repo.WithContext(ctx).DeleteByID(r.ID); err != nil {
|
|
log.Warn().Err(err).Uint("upload_id", r.ID).Msg("upload cleanup: row delete failed")
|
|
continue
|
|
}
|
|
reaped++
|
|
}
|
|
return reaped, nil
|
|
}
|
|
|
|
// checkRateLimit enforces UploadPresignsPerHour using Redis (preferred) or
|
|
// the DB as a fallback. Returns a 429 apperror when over.
|
|
func (s *UploadService) checkRateLimit(ctx context.Context, userID uint, now time.Time) error {
|
|
if s.redisEnabled {
|
|
bucket := now.Truncate(time.Hour).Unix()
|
|
key := fmt.Sprintf("%s%d:%d", uploadRateLimitRedisPrefix, userID, bucket)
|
|
client := s.cache.Client()
|
|
count, err := client.Incr(ctx, key).Result()
|
|
if err == nil {
|
|
// Best-effort EXPIRE; only set on first INCR (count == 1).
|
|
if count == 1 {
|
|
_ = client.Expire(ctx, key, 2*time.Hour).Err()
|
|
}
|
|
if count > UploadPresignsPerHour {
|
|
return apperrors.TooManyRequests("error.upload_rate_limit")
|
|
}
|
|
return nil
|
|
}
|
|
// Redis transient failure — fall through to DB. Don't open the gates,
|
|
// don't crash the request.
|
|
if err != redis.Nil {
|
|
log.Warn().Err(err).Msg("upload rate limit: redis unavailable, falling back to DB")
|
|
}
|
|
}
|
|
since := now.Add(-1 * time.Hour)
|
|
count, err := s.repo.WithContext(ctx).CountCreatedSinceForUser(userID, since)
|
|
if err != nil {
|
|
return apperrors.Internal(err)
|
|
}
|
|
if count >= UploadPresignsPerHour {
|
|
return apperrors.TooManyRequests("error.upload_rate_limit")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// urlForUploadKey returns the public URL for a stored object given its B2
|
|
// key. Mirrors the URL format used by StorageService.Upload so the existing
|
|
// media handler can serve presigned-uploaded objects without changes.
|
|
//
|
|
// If storageService is nil (shouldn't happen in production but defensive)
|
|
// we fall back to returning the raw key prefixed with "/" — better than an
|
|
// empty URL on a clearly buggy code path.
|
|
func urlForUploadKey(storageService *StorageService, b2Key string) string {
|
|
if storageService == nil || storageService.cfg == nil {
|
|
return "/" + b2Key
|
|
}
|
|
base := strings.TrimRight(storageService.cfg.BaseURL, "/")
|
|
return fmt.Sprintf("%s/%s", base, b2Key)
|
|
}
|
|
|
|
// extensionForContentType picks a sensible file extension for the storage
|
|
// key. Falls back to .bin so the key is always non-empty even for unknown
|
|
// types — the policy will reject anything with the wrong content-type
|
|
// regardless of the extension we pick here.
|
|
func extensionForContentType(contentType string) string {
|
|
switch strings.ToLower(strings.TrimSpace(contentType)) {
|
|
case "image/jpeg", "image/jpg":
|
|
return ".jpg"
|
|
case "image/png":
|
|
return ".png"
|
|
case "image/heic":
|
|
return ".heic"
|
|
case "image/heif":
|
|
return ".heif"
|
|
case "image/webp":
|
|
return ".webp"
|
|
case "application/pdf":
|
|
return ".pdf"
|
|
default:
|
|
return ".bin"
|
|
}
|
|
}
|