package services import ( "context" "fmt" "strings" "time" "github.com/google/uuid" "github.com/redis/go-redis/v9" "github.com/rs/zerolog/log" "github.com/treytartt/honeydue-api/internal/apperrors" "github.com/treytartt/honeydue-api/internal/config" "github.com/treytartt/honeydue-api/internal/dto/responses" "github.com/treytartt/honeydue-api/internal/models" "github.com/treytartt/honeydue-api/internal/repositories" ) // Upload policy constants. These are NOT tier-differentiated by request — a // single number applies to every authenticated user regardless of free/pro // status. Adjust here only; per-tier overrides should be added via // config/SubscriptionSettings rather than splitting the constants. const ( UploadMaxBytes = 10 * 1024 * 1024 // 10 MiB per single object UploadPresignSlackBytes = 256 // ± slack for content-length-range UploadPresignTTL = 15 * time.Minute // signed URL lifetime UploadCleanupTTL = 24 * time.Hour // unclaimed row reap window UploadPresignsPerHour = 50 // per-user rate cap UploadConcurrentUnclaimed = 10 // per-user in-flight cap uploadRateLimitRedisPrefix = "upload:presign:" ) // allowedContentTypes maps each category to the set of mime types accepted // for the upload. Anything outside this set is rejected before signing. var allowedContentTypes = map[models.UploadCategory]map[string]bool{ models.UploadCategoryCompletion: { "image/jpeg": true, "image/png": true, "image/heic": true, "image/heif": true, "image/webp": true, }, models.UploadCategoryDocumentImage: { "image/jpeg": true, "image/png": true, "image/heic": true, "image/heif": true, "image/webp": true, }, models.UploadCategoryDocumentFile: { "image/jpeg": true, "image/png": true, "image/heic": true, "image/heif": true, "image/webp": true, "application/pdf": true, }, } // uploadCategoryToSubdir maps the category to the path prefix used inside the // bucket. Mirrors the pattern in storage_service.go. var uploadCategoryToSubdir = map[models.UploadCategory]string{ models.UploadCategoryCompletion: "completions", models.UploadCategoryDocumentImage: "documents", models.UploadCategoryDocumentFile: "documents", } // UploadService orchestrates presigned-URL upload sessions. It owns: // // - validation (size, mime, category) // - quota + rate-limit enforcement (Redis preferred; DB fallback) // - signing the B2 POST policy via the existing S3Backend // - tracking the session in pending_uploads so the attach path can verify // and claim the object type UploadService struct { repo *repositories.PendingUploadRepository s3 *S3Backend cfg *config.StorageConfig cache *CacheService redisEnabled bool } // NewUploadService wires the service. s3 may be nil if storage isn't // configured for S3 (local-disk dev mode); presign requests will then // return a clear error. func NewUploadService( repo *repositories.PendingUploadRepository, s3 *S3Backend, cfg *config.StorageConfig, cache *CacheService, ) *UploadService { return &UploadService{ repo: repo, s3: s3, cfg: cfg, cache: cache, redisEnabled: cache != nil, } } // Presign validates the request, enforces quota + rate limits, signs a B2 // POST policy, persists the pending_uploads row, and returns the response // the client needs to perform the upload. // // Errors are mapped to apperrors so the HTTP layer can return the right // status code: // // 413 — content_length over UploadMaxBytes // 422 — content_type not allowed for the category // 429 — over the rate limit OR over the concurrent in-flight cap // 500 — storage not configured / signing failure func (s *UploadService) Presign( ctx context.Context, userID uint, category models.UploadCategory, contentType string, contentLength int64, ) (*responses.PresignUploadResponse, error) { if s.s3 == nil { return nil, apperrors.Internal(fmt.Errorf("upload: S3 backend not configured")) } // Size cap. 413 semantically; we use BadRequest because that's the // pattern across the codebase for dto-validation rejections. if contentLength <= 0 || contentLength > UploadMaxBytes { return nil, apperrors.BadRequest("error.upload_too_large") } // Mime check is per-category — completion photos can't be PDFs. allowed, ok := allowedContentTypes[category] if !ok { return nil, apperrors.BadRequest("error.upload_invalid_category") } if !allowed[strings.ToLower(contentType)] { return nil, apperrors.BadRequest("error.upload_unsupported_content_type") } now := time.Now().UTC() // Concurrency cap: how many sessions does this user have unclaimed and // not yet expired? Cheap COUNT, indexed. active, err := s.repo.WithContext(ctx).CountUnclaimedActiveForUser(userID, now) if err != nil { return nil, apperrors.Internal(err) } if active >= UploadConcurrentUnclaimed { return nil, apperrors.TooManyRequests("error.upload_too_many_in_flight") } // Rate limit: presigns issued in the last hour. Redis when available // (cheap atomic counter); DB fallback for dev/test. if err := s.checkRateLimit(ctx, userID, now); err != nil { return nil, err } // Pick a stable storage key. UUID + extension is enough — no need for the // timestamp-prefix trick in storage_service.go because we never list this // path; lookups go through pending_uploads.id. subdir := uploadCategoryToSubdir[category] if subdir == "" { subdir = "uploads" } ext := extensionForContentType(contentType) key := fmt.Sprintf("uploads/%s/%d/%s%s", subdir, userID, uuid.New().String(), ext) // Sign a single-use PUT URL that binds Content-Type and Content-Length. // B2 does not implement S3's POST Object form upload (returns 501), so // we use PUT — which works uniformly against AWS S3, B2, and MinIO. // Size mismatch is caught later in VerifyAndClaim via HEAD; we don't // need a min/max range here because the signature pins exactly one size. put, err := s.s3.PresignedPut(ctx, key, contentType, contentLength, UploadPresignTTL) if err != nil { return nil, apperrors.Internal(fmt.Errorf("presign upload: %w", err)) } expiresAt := now.Add(UploadPresignTTL) row := &models.PendingUpload{ UserID: userID, Category: category, B2Key: key, ContentType: strings.ToLower(contentType), ExpectedBytes: contentLength, ExpiresAt: expiresAt, } if err := s.repo.WithContext(ctx).Create(row); err != nil { return nil, apperrors.Internal(err) } return &responses.PresignUploadResponse{ ID: row.ID, URL: put.URL, Headers: put.Headers, Method: "PUT", Key: key, ExpiresAt: expiresAt.Format(time.RFC3339), }, nil } // VerifyAndClaim is called from CreateCompletion / CreateDocument. It locks // the pending rows, HEADs each B2 object to confirm size + content-type, // flips claimed_at, and returns the verified rows so the caller can write // task_completion_image / document_image rows referencing them. // // If any verification fails, the entire batch is rejected and no rows are // claimed — atomic semantics matter so one bad upload doesn't half-attach. func (s *UploadService) VerifyAndClaim( ctx context.Context, userID uint, uploadIDs []uint, ) ([]models.PendingUpload, error) { if len(uploadIDs) == 0 { return nil, nil } if s.s3 == nil { return nil, apperrors.Internal(fmt.Errorf("upload: S3 backend not configured")) } rows, err := s.repo.WithContext(ctx).FindUnclaimedForUser(userID, uploadIDs) if err != nil { return nil, apperrors.Internal(err) } if len(rows) != len(uploadIDs) { // Either some IDs don't exist, belong to another user, or are already // claimed. We don't differentiate — same status either way. return nil, apperrors.NotFound("error.upload_not_found") } now := time.Now().UTC() verified := make([]models.PendingUpload, 0, len(rows)) for i := range rows { r := rows[i] if r.IsExpired(now) { return nil, apperrors.BadRequest("error.upload_expired") } info, err := s.s3.Stat(ctx, r.B2Key) if err != nil { log.Warn().Err(err).Uint("upload_id", r.ID).Str("key", r.B2Key).Msg("upload claim: stat failed") return nil, apperrors.BadRequest("error.upload_not_uploaded") } // Size must match the claimed bytes within the policy slack window. // Anything outside it means the client lied or B2 misreported. if info.Size < r.ExpectedBytes-UploadPresignSlackBytes || info.Size > r.ExpectedBytes+UploadPresignSlackBytes { return nil, apperrors.BadRequest("error.upload_size_mismatch") } if err := s.repo.WithContext(ctx).MarkClaimed(r.ID, info.Size, now); err != nil { return nil, apperrors.Internal(err) } r.ActualBytes = &info.Size r.ClaimedAt = &now verified = append(verified, r) } return verified, nil } // CleanupExpired finds unclaimed rows past their expires_at, deletes the // corresponding B2 objects, and removes the rows. Called from the Asynq // hourly cron in cmd/worker. // // Returns the number of rows reaped. Errors per row are logged and the loop // continues — one stuck object shouldn't block the others. func (s *UploadService) CleanupExpired(ctx context.Context, batchLimit int) (int, error) { if s.s3 == nil { return 0, fmt.Errorf("upload: S3 backend not configured") } now := time.Now().UTC() rows, err := s.repo.WithContext(ctx).FindExpiredUnclaimed(now, batchLimit) if err != nil { return 0, fmt.Errorf("find expired: %w", err) } reaped := 0 for _, r := range rows { if err := s.s3.Delete(r.B2Key); err != nil { log.Warn().Err(err).Uint("upload_id", r.ID).Str("key", r.B2Key).Msg("upload cleanup: B2 delete failed (continuing)") // Continue to row delete anyway — bucket lifecycle backstop will // pick up the orphaned B2 object after 7 days. } if err := s.repo.WithContext(ctx).DeleteByID(r.ID); err != nil { log.Warn().Err(err).Uint("upload_id", r.ID).Msg("upload cleanup: row delete failed") continue } reaped++ } return reaped, nil } // checkRateLimit enforces UploadPresignsPerHour using Redis (preferred) or // the DB as a fallback. Returns a 429 apperror when over. func (s *UploadService) checkRateLimit(ctx context.Context, userID uint, now time.Time) error { if s.redisEnabled { bucket := now.Truncate(time.Hour).Unix() key := fmt.Sprintf("%s%d:%d", uploadRateLimitRedisPrefix, userID, bucket) client := s.cache.Client() count, err := client.Incr(ctx, key).Result() if err == nil { // Best-effort EXPIRE; only set on first INCR (count == 1). if count == 1 { _ = client.Expire(ctx, key, 2*time.Hour).Err() } if count > UploadPresignsPerHour { return apperrors.TooManyRequests("error.upload_rate_limit") } return nil } // Redis transient failure — fall through to DB. Don't open the gates, // don't crash the request. if err != redis.Nil { log.Warn().Err(err).Msg("upload rate limit: redis unavailable, falling back to DB") } } since := now.Add(-1 * time.Hour) count, err := s.repo.WithContext(ctx).CountCreatedSinceForUser(userID, since) if err != nil { return apperrors.Internal(err) } if count >= UploadPresignsPerHour { return apperrors.TooManyRequests("error.upload_rate_limit") } return nil } // urlForUploadKey returns the public URL for a stored object given its B2 // key. Mirrors the URL format used by StorageService.Upload so the existing // media handler can serve presigned-uploaded objects without changes. // // If storageService is nil (shouldn't happen in production but defensive) // we fall back to returning the raw key prefixed with "/" — better than an // empty URL on a clearly buggy code path. func urlForUploadKey(storageService *StorageService, b2Key string) string { if storageService == nil || storageService.cfg == nil { return "/" + b2Key } base := strings.TrimRight(storageService.cfg.BaseURL, "/") return fmt.Sprintf("%s/%s", base, b2Key) } // extensionForContentType picks a sensible file extension for the storage // key. Falls back to .bin so the key is always non-empty even for unknown // types — the policy will reject anything with the wrong content-type // regardless of the extension we pick here. func extensionForContentType(contentType string) string { switch strings.ToLower(strings.TrimSpace(contentType)) { case "image/jpeg", "image/jpg": return ".jpg" case "image/png": return ".png" case "image/heic": return ".heic" case "image/heif": return ".heif" case "image/webp": return ".webp" case "application/pdf": return ".pdf" default: return ".bin" } }