feat(uploads): direct-to-B2 presigned uploads with content-length-range policy
Replaces the multipart-via-API path for image uploads with a three-step
direct-to-storage flow:
1. Client POSTs /api/uploads/presign with content_length + content_type;
server validates size (10 MB cap), mime allow-list per category, rate
limit (50/hour/user via Redis sliding window), and concurrent unclaimed
cap (10 in-flight per user). On success it persists a pending_uploads
row, signs an S3 POST policy with content-length-range bound to the
claimed length ±256 bytes, and returns the URL+fields.
2. Client POSTs the bytes directly to B2 using the signed policy. B2
enforces size, content-type, and key match before accepting.
3. Client passes upload_ids[] to /api/task-completions/ or /api/documents/.
Service HEADs each B2 object, verifies size matches expected_bytes
within slack, marks pending_uploads claimed_at, and creates the
associated TaskCompletionImage / DocumentImage rows.
Bytes never traverse our API server. The 1 MB Echo BodyLimit middleware
that was rejecting all task-completion image uploads becomes irrelevant
for this path. Existing multipart endpoints stay functional alongside,
soak-testing the new path before legacy removal.
Cleanup:
- cmd/worker registers a new hourly cron (TypeUploadCleanup, "30 * * * *")
that reaps pending_uploads where claimed_at IS NULL AND expires_at < NOW().
Reaps both the B2 object and the row.
- B2 bucket lifecycle rule on `uploads/` prefix (7 days hide → 1 day delete)
documented in deploy-k3s/manifests/b2-lifecycle.md as a backstop.
Schema:
- migrations/000002_pending_uploads.sql adds the table + partial index for
cleanup + nullable pending_upload_id FKs on task_taskcompletionimage and
task_documentimage.
Policy (single tier, no free/pro split):
- 10 MB cap per upload
- 50 presigns/hour/user
- 10 concurrent unclaimed uploads/user
- allow-list: jpeg/png/heic/heif/webp for image categories;
+ pdf for document_file
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -167,6 +167,20 @@ func main() {
|
||||
// Create job handler
|
||||
jobHandler := jobs.NewHandler(db, pushClient, emailService, notificationService, cfg)
|
||||
|
||||
// Wire upload service for the pending_uploads cleanup cron. Storage may
|
||||
// be local-disk (no S3 backend), in which case the upload service stays
|
||||
// nil and the cleanup handler no-ops. Cache is optional — the cleanup
|
||||
// path doesn't rate-limit and works fine with a nil cache.
|
||||
if storageService, sErr := services.NewStorageService(&cfg.Storage); sErr == nil {
|
||||
if s3 := storageService.S3Backend(); s3 != nil {
|
||||
pendingUploadRepo := repositories.NewPendingUploadRepository(db)
|
||||
uploadService := services.NewUploadService(pendingUploadRepo, s3, &cfg.Storage, nil)
|
||||
jobHandler.SetUploadService(uploadService)
|
||||
}
|
||||
} else {
|
||||
log.Warn().Err(sErr).Msg("Failed to initialize storage service for upload cleanup; cleanup cron will no-op")
|
||||
}
|
||||
|
||||
// Create Asynq mux and register handlers
|
||||
mux := asynq.NewServeMux()
|
||||
|
||||
@@ -180,6 +194,7 @@ func main() {
|
||||
mux.HandleFunc(jobs.TypeSendPush, jobHandler.HandleSendPush)
|
||||
mux.HandleFunc(jobs.TypeOnboardingEmails, jobHandler.HandleOnboardingEmails)
|
||||
mux.HandleFunc(jobs.TypeReminderLogCleanup, jobHandler.HandleReminderLogCleanup)
|
||||
mux.HandleFunc(jobs.TypeUploadCleanup, jobHandler.HandleUploadCleanup)
|
||||
|
||||
// Register email job handlers (welcome, verification, password reset, password changed)
|
||||
if emailService != nil {
|
||||
@@ -219,6 +234,15 @@ func main() {
|
||||
}
|
||||
log.Info().Str("cron", "0 3 * * *").Msg("Registered reminder log cleanup job (runs daily at 3:00 AM UTC)")
|
||||
|
||||
// Schedule pending_uploads cleanup (hourly at :30 to avoid colliding with
|
||||
// the top-of-hour reminder + digest crons). Reaps unclaimed expired
|
||||
// upload sessions; the B2 bucket lifecycle (7 days on uploads/ prefix)
|
||||
// is the backstop if this worker is offline for an extended period.
|
||||
if _, err := scheduler.Register("30 * * * *", asynq.NewTask(jobs.TypeUploadCleanup, nil)); err != nil {
|
||||
log.Fatal().Err(err).Msg("Failed to register upload cleanup job")
|
||||
}
|
||||
log.Info().Str("cron", "30 * * * *").Msg("Registered pending_uploads cleanup job (runs hourly)")
|
||||
|
||||
// Handle graceful shutdown
|
||||
quit := make(chan os.Signal, 1)
|
||||
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
# B2 bucket lifecycle — `uploads/` prefix
|
||||
|
||||
The `pending_uploads` cleanup worker (cron `30 * * * *`, see
|
||||
`internal/worker/jobs/handler.go::HandleUploadCleanup`) reaps unclaimed
|
||||
upload sessions every hour, deleting both the row and the corresponding B2
|
||||
object. This bucket-level lifecycle rule is a **backstop** — it catches B2
|
||||
objects that survive the row deletion (e.g. worker crashed mid-loop, B2
|
||||
delete errored, manual DB tampering).
|
||||
|
||||
## Rule
|
||||
|
||||
Apply via the Backblaze web console: **Bucket → `honeyDueProd` → Lifecycle Settings → Custom**
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"fileNamePrefix": "uploads/",
|
||||
"daysFromUploadingToHiding": 7,
|
||||
"daysFromHidingToDeleting": 1
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
Effect: any object under the `uploads/` prefix is hidden 7 days after
|
||||
upload, then permanently deleted 1 day after that. Total maximum lifetime
|
||||
of an orphaned object: 8 days.
|
||||
|
||||
This rule does NOT affect:
|
||||
|
||||
- `images/`, `documents/`, `completions/` — legacy multipart-uploaded
|
||||
objects, which are managed by the existing `task_completion_image` /
|
||||
`document_image` / `document.file_url` references.
|
||||
|
||||
## Why a backstop, not the primary mechanism
|
||||
|
||||
The application worker is the primary mechanism because:
|
||||
|
||||
1. It can delete the **DB row** alongside the B2 object — lifecycle alone
|
||||
would leave dangling `pending_uploads` rows.
|
||||
2. It runs hourly vs. lifecycle's once-per-day evaluation — much tighter
|
||||
recovery window for the common case.
|
||||
3. It produces logs / metrics for orphan rate observability.
|
||||
|
||||
## Verification
|
||||
|
||||
After applying:
|
||||
|
||||
```bash
|
||||
b2 bucket get-info honeyDueProd | jq '.lifecycleRules'
|
||||
```
|
||||
|
||||
Should show the rule above. If you don't have the B2 CLI:
|
||||
|
||||
```bash
|
||||
curl -u "$B2_KEY_ID:$B2_APP_KEY" https://api.backblazeb2.com/b2api/v3/b2_authorize_account
|
||||
# Then use the returned authorization_token + apiUrl to call b2_get_bucket
|
||||
```
|
||||
@@ -25,7 +25,13 @@ type CreateDocumentRequest struct {
|
||||
SerialNumber string `json:"serial_number" validate:"max=100"`
|
||||
ModelNumber string `json:"model_number" validate:"max=100"`
|
||||
TaskID *uint `json:"task_id"`
|
||||
ImageURLs []string `json:"image_urls" validate:"omitempty,max=20,dive,max=500"` // Multiple image URLs
|
||||
ImageURLs []string `json:"image_urls" validate:"omitempty,max=20,dive,max=500"` // Legacy multipart upload path
|
||||
// UploadIDs claims pending_uploads rows produced by the presigned-URL
|
||||
// upload flow and turns them into document_image rows. May be combined
|
||||
// with ImageURLs during the rollout window. UploadIDs of category
|
||||
// "document_file" attach to the document's main FileURL/FileName fields
|
||||
// instead — the service infers placement from the row's category.
|
||||
UploadIDs []uint `json:"upload_ids" validate:"omitempty,max=20"`
|
||||
}
|
||||
|
||||
// UpdateDocumentRequest represents the request to update a document
|
||||
|
||||
@@ -107,7 +107,21 @@ type CreateTaskCompletionRequest struct {
|
||||
Notes string `json:"notes" validate:"max=10000"`
|
||||
ActualCost *decimal.Decimal `json:"actual_cost"`
|
||||
Rating *int `json:"rating" validate:"omitempty,min=1,max=5"` // 1-5 star rating
|
||||
ImageURLs []string `json:"image_urls" validate:"omitempty,max=20,dive,max=500"` // Multiple image URLs
|
||||
|
||||
// ImageURLs is the legacy multipart-upload path: the handler uploaded the
|
||||
// images first via the same request and produced URLs. Still supported for
|
||||
// older client builds.
|
||||
ImageURLs []string `json:"image_urls" validate:"omitempty,max=20,dive,max=500"`
|
||||
|
||||
// UploadIDs is the new direct-to-B2 path: the client uploaded each image
|
||||
// via a presigned URL and now claims the resulting pending_uploads rows
|
||||
// by id. The service verifies ownership + size, marks each row claimed,
|
||||
// and creates task_completion_image rows from them.
|
||||
//
|
||||
// If both ImageURLs and UploadIDs are present, both contribute to the
|
||||
// final set of images so a single completion can mix legacy and new
|
||||
// uploads (helps during the rollout window).
|
||||
UploadIDs []uint `json:"upload_ids" validate:"omitempty,max=20"`
|
||||
}
|
||||
|
||||
// UpdateTaskCompletionRequest represents the request to update a task completion
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
package requests
|
||||
|
||||
// PresignUploadRequest is the body for POST /api/uploads/presign. The client
|
||||
// describes what it's about to upload; the server validates against quota,
|
||||
// rate limits, and per-category caps before returning a signed POST policy.
|
||||
type PresignUploadRequest struct {
|
||||
// Category gates allowed mime types and the size cap. One of:
|
||||
// "completion" — task completion photos
|
||||
// "document_image" — image attached to a Document
|
||||
// "document_file" — file (e.g. PDF) attached to a Document
|
||||
Category string `json:"category" validate:"required,oneof=completion document_image document_file"`
|
||||
|
||||
// ContentType is the MIME type the client will upload (e.g. image/jpeg).
|
||||
// Bound to the policy so the actual upload must match exactly.
|
||||
ContentType string `json:"content_type" validate:"required,min=3,max=127"`
|
||||
|
||||
// ContentLength is the exact byte count the client intends to upload.
|
||||
// The signed policy permits a small slack window around this value
|
||||
// (server-side constant) so the client can encode in one pass without
|
||||
// having to predict the byte count perfectly.
|
||||
ContentLength int64 `json:"content_length" validate:"required,min=1"`
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
package responses
|
||||
|
||||
// PresignUploadResponse is what /api/uploads/presign returns to the client.
|
||||
//
|
||||
// The client uses URL + Fields to build a multipart/form-data POST directly
|
||||
// to S3-compatible storage (B2). Once the upload completes, the client calls
|
||||
// the relevant entity-creation endpoint (POST /api/task-completions/, POST
|
||||
// /api/documents/) with `upload_ids: [Id]` to claim and attach the object.
|
||||
type PresignUploadResponse struct {
|
||||
// ID is the pending_uploads.id the client passes back via upload_ids[].
|
||||
ID uint `json:"id"`
|
||||
|
||||
// URL is the storage endpoint to POST to (no query string).
|
||||
URL string `json:"upload_url"`
|
||||
|
||||
// Fields are the form fields (policy, signature, key, etc.) that must be
|
||||
// submitted with the multipart form. The file part must be named "file"
|
||||
// and come last per S3 POST policy rules.
|
||||
Fields map[string]string `json:"fields"`
|
||||
|
||||
// Key is the object key chosen by the server. Echoed for client logging
|
||||
// and debugging; the canonical reference is via ID.
|
||||
Key string `json:"key"`
|
||||
|
||||
// ExpiresAt is when the signed URL stops working. Clients should retry
|
||||
// with a fresh presign rather than relying on long-lived URLs.
|
||||
ExpiresAt string `json:"expires_at"`
|
||||
}
|
||||
@@ -7,9 +7,11 @@ import (
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"github.com/treytartt/honeydue-api/internal/apperrors"
|
||||
"github.com/treytartt/honeydue-api/internal/dto/requests"
|
||||
"github.com/treytartt/honeydue-api/internal/dto/responses"
|
||||
"github.com/treytartt/honeydue-api/internal/i18n"
|
||||
"github.com/treytartt/honeydue-api/internal/middleware"
|
||||
"github.com/treytartt/honeydue-api/internal/models"
|
||||
"github.com/treytartt/honeydue-api/internal/services"
|
||||
)
|
||||
|
||||
@@ -22,18 +24,26 @@ type FileOwnershipChecker interface {
|
||||
|
||||
// UploadHandler handles file upload endpoints
|
||||
type UploadHandler struct {
|
||||
storageService *services.StorageService
|
||||
storageService *services.StorageService
|
||||
uploadService *services.UploadService // optional — only set when S3 storage is configured
|
||||
fileOwnershipChecker FileOwnershipChecker
|
||||
}
|
||||
|
||||
// NewUploadHandler creates a new upload handler
|
||||
func NewUploadHandler(storageService *services.StorageService, fileOwnershipChecker FileOwnershipChecker) *UploadHandler {
|
||||
return &UploadHandler{
|
||||
storageService: storageService,
|
||||
storageService: storageService,
|
||||
fileOwnershipChecker: fileOwnershipChecker,
|
||||
}
|
||||
}
|
||||
|
||||
// SetUploadService wires the presigned-URL upload service. Called from the
|
||||
// router only when S3 storage is configured; with local-disk storage the
|
||||
// presign endpoint is unsupported and returns 503.
|
||||
func (h *UploadHandler) SetUploadService(s *services.UploadService) {
|
||||
h.uploadService = s
|
||||
}
|
||||
|
||||
// UploadImage handles POST /api/uploads/image
|
||||
// Accepts multipart/form-data with "file" field
|
||||
func (h *UploadHandler) UploadImage(c echo.Context) error {
|
||||
@@ -138,3 +148,39 @@ func (h *UploadHandler) DeleteFile(c echo.Context) error {
|
||||
|
||||
return c.JSON(http.StatusOK, responses.MessageResponse{Message: i18n.LocalizedMessage(c, "message.file_deleted")})
|
||||
}
|
||||
|
||||
// PresignUpload handles POST /api/uploads/presign.
|
||||
//
|
||||
// Returns a short-lived signed POST policy that the client uses to upload an
|
||||
// image or document directly to B2, bypassing the API entirely for the byte
|
||||
// transfer. The returned `id` is later passed in `upload_ids[]` on the
|
||||
// task-completion or document creation endpoints to attach the object.
|
||||
func (h *UploadHandler) PresignUpload(c echo.Context) error {
|
||||
if h.uploadService == nil {
|
||||
return apperrors.Internal(nil)
|
||||
}
|
||||
user, err := middleware.MustGetAuthUser(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var req requests.PresignUploadRequest
|
||||
if err := c.Bind(&req); err != nil {
|
||||
return apperrors.BadRequest("error.invalid_request")
|
||||
}
|
||||
if err := c.Validate(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := h.uploadService.Presign(
|
||||
c.Request().Context(),
|
||||
user.ID,
|
||||
models.UploadCategory(req.Category),
|
||||
req.ContentType,
|
||||
req.ContentLength,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.JSON(http.StatusCreated, resp)
|
||||
}
|
||||
|
||||
@@ -91,6 +91,8 @@ type DocumentImage struct {
|
||||
DocumentID uint `gorm:"column:document_id;index;not null" json:"document_id"`
|
||||
ImageURL string `gorm:"column:image_url;size:500;not null" json:"image_url"`
|
||||
Caption string `gorm:"column:caption;size:255" json:"caption"`
|
||||
// PendingUploadID — see TaskCompletionImage.PendingUploadID.
|
||||
PendingUploadID *uint `gorm:"column:pending_upload_id" json:"pending_upload_id,omitempty"`
|
||||
}
|
||||
|
||||
// TableName returns the table name for GORM
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
package models
|
||||
|
||||
import "time"
|
||||
|
||||
// UploadCategory enumerates the kinds of objects that can be uploaded via the
|
||||
// presigned-URL flow. Each category has its own size cap and mime-type
|
||||
// allow-list enforced at the service layer.
|
||||
type UploadCategory string
|
||||
|
||||
const (
|
||||
UploadCategoryCompletion UploadCategory = "completion"
|
||||
UploadCategoryDocumentImage UploadCategory = "document_image"
|
||||
UploadCategoryDocumentFile UploadCategory = "document_file"
|
||||
)
|
||||
|
||||
// PendingUpload is a short-lived upload session created when the client asks
|
||||
// for a presigned POST policy. The row tracks the intent so the server can
|
||||
// validate quota / rate-limit / size up front, then attach the resulting B2
|
||||
// object to a task_completion_image or document_image once the upload lands.
|
||||
//
|
||||
// Lifecycle:
|
||||
//
|
||||
// created → upload to B2 → attach via /api/task-completions/ or /documents/
|
||||
// ↑ │
|
||||
// └─ if not claimed before expires_at, the cleanup worker (see
|
||||
// internal/worker/jobs) deletes the B2 object and the row.
|
||||
type PendingUpload struct {
|
||||
ID uint `gorm:"primaryKey" json:"id"`
|
||||
UserID uint `gorm:"column:user_id;not null;index:idx_pending_uploads_user_created,priority:1" json:"user_id"`
|
||||
Category UploadCategory `gorm:"column:category;size:32;not null" json:"category"`
|
||||
B2Key string `gorm:"column:b2_key;size:255;uniqueIndex" json:"b2_key"`
|
||||
ContentType string `gorm:"column:content_type;size:127;not null" json:"content_type"`
|
||||
ExpectedBytes int64 `gorm:"column:expected_bytes;not null" json:"expected_bytes"`
|
||||
ActualBytes *int64 `gorm:"column:actual_bytes" json:"actual_bytes,omitempty"`
|
||||
ClaimedAt *time.Time `gorm:"column:claimed_at" json:"claimed_at,omitempty"`
|
||||
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime;index:idx_pending_uploads_user_created,priority:2,sort:desc" json:"created_at"`
|
||||
ExpiresAt time.Time `gorm:"column:expires_at;not null" json:"expires_at"`
|
||||
}
|
||||
|
||||
// TableName matches the goose migration.
|
||||
func (PendingUpload) TableName() string {
|
||||
return "pending_uploads"
|
||||
}
|
||||
|
||||
// IsClaimed reports whether the upload has been linked to a real entity.
|
||||
func (p *PendingUpload) IsClaimed() bool {
|
||||
return p.ClaimedAt != nil
|
||||
}
|
||||
|
||||
// IsExpired reports whether the upload session has passed its TTL.
|
||||
func (p *PendingUpload) IsExpired(now time.Time) bool {
|
||||
return now.After(p.ExpiresAt)
|
||||
}
|
||||
@@ -215,6 +215,10 @@ type TaskCompletionImage struct {
|
||||
CompletionID uint `gorm:"column:completion_id;index;not null" json:"completion_id"`
|
||||
ImageURL string `gorm:"column:image_url;size:500;not null" json:"image_url"`
|
||||
Caption string `gorm:"column:caption;size:255" json:"caption"`
|
||||
// PendingUploadID links to the pending_uploads row that produced this
|
||||
// image when uploaded via the presigned-URL flow. Nullable: legacy rows
|
||||
// uploaded through the multipart path don't have one.
|
||||
PendingUploadID *uint `gorm:"column:pending_upload_id" json:"pending_upload_id,omitempty"`
|
||||
}
|
||||
|
||||
// TableName returns the table name for GORM
|
||||
|
||||
@@ -0,0 +1,121 @@
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
|
||||
"github.com/treytartt/honeydue-api/internal/models"
|
||||
)
|
||||
|
||||
// PendingUploadRepository handles persistence for upload sessions.
|
||||
type PendingUploadRepository struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
// NewPendingUploadRepository constructs a repo bound to the given GORM handle.
|
||||
func NewPendingUploadRepository(db *gorm.DB) *PendingUploadRepository {
|
||||
return &PendingUploadRepository{db: db}
|
||||
}
|
||||
|
||||
// WithContext returns a session bound to ctx so DB spans nest under the
|
||||
// request span in tracing.
|
||||
func (r *PendingUploadRepository) WithContext(ctx context.Context) *PendingUploadRepository {
|
||||
return &PendingUploadRepository{db: r.db.WithContext(ctx)}
|
||||
}
|
||||
|
||||
// Create inserts a new upload session.
|
||||
func (r *PendingUploadRepository) Create(p *models.PendingUpload) error {
|
||||
return r.db.Create(p).Error
|
||||
}
|
||||
|
||||
// FindByID returns a single session, or gorm.ErrRecordNotFound.
|
||||
func (r *PendingUploadRepository) FindByID(id uint) (*models.PendingUpload, error) {
|
||||
var p models.PendingUpload
|
||||
if err := r.db.First(&p, id).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &p, nil
|
||||
}
|
||||
|
||||
// FindUnclaimedForUser locks and returns rows belonging to userID matching
|
||||
// ids, where claimed_at IS NULL. Used by the attach path to ensure exactly
|
||||
// one claim per row even under concurrent requests. Postgres applies real
|
||||
// row locks; SQLite (test harness) silently ignores the clause.
|
||||
//
|
||||
// Caller must run inside a transaction for the lock to outlive the call.
|
||||
func (r *PendingUploadRepository) FindUnclaimedForUser(userID uint, ids []uint) ([]models.PendingUpload, error) {
|
||||
if len(ids) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
var rows []models.PendingUpload
|
||||
err := r.db.
|
||||
Clauses(clause.Locking{Strength: "UPDATE"}).
|
||||
Where("user_id = ? AND id IN ? AND claimed_at IS NULL", userID, ids).
|
||||
Find(&rows).Error
|
||||
return rows, err
|
||||
}
|
||||
|
||||
// MarkClaimed writes actual_bytes + claimed_at. Returns gorm.ErrRecordNotFound
|
||||
// if the row was claimed by another transaction in the meantime.
|
||||
func (r *PendingUploadRepository) MarkClaimed(id uint, actualBytes int64, now time.Time) error {
|
||||
res := r.db.Model(&models.PendingUpload{}).
|
||||
Where("id = ? AND claimed_at IS NULL", id).
|
||||
Updates(map[string]interface{}{
|
||||
"actual_bytes": actualBytes,
|
||||
"claimed_at": now,
|
||||
})
|
||||
if res.Error != nil {
|
||||
return res.Error
|
||||
}
|
||||
if res.RowsAffected == 0 {
|
||||
return gorm.ErrRecordNotFound
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CountUnclaimedActiveForUser returns how many in-flight (unclaimed,
|
||||
// not-yet-expired) sessions a user holds. Used for the concurrency cap.
|
||||
func (r *PendingUploadRepository) CountUnclaimedActiveForUser(userID uint, now time.Time) (int64, error) {
|
||||
var n int64
|
||||
err := r.db.Model(&models.PendingUpload{}).
|
||||
Where("user_id = ? AND claimed_at IS NULL AND expires_at > ?", userID, now).
|
||||
Count(&n).Error
|
||||
return n, err
|
||||
}
|
||||
|
||||
// CountCreatedSinceForUser returns the number of presign requests issued in
|
||||
// the last `since` window. The service layer uses Redis for the rate-limit
|
||||
// hot path; this is a fallback / consistency check.
|
||||
func (r *PendingUploadRepository) CountCreatedSinceForUser(userID uint, since time.Time) (int64, error) {
|
||||
var n int64
|
||||
err := r.db.Model(&models.PendingUpload{}).
|
||||
Where("user_id = ? AND created_at > ?", userID, since).
|
||||
Count(&n).Error
|
||||
return n, err
|
||||
}
|
||||
|
||||
// FindExpiredUnclaimed returns up to `limit` sessions ready to reap. Caller
|
||||
// is responsible for deleting the corresponding B2 objects + rows.
|
||||
func (r *PendingUploadRepository) FindExpiredUnclaimed(now time.Time, limit int) ([]models.PendingUpload, error) {
|
||||
var rows []models.PendingUpload
|
||||
err := r.db.
|
||||
Where("claimed_at IS NULL AND expires_at < ?", now).
|
||||
Order("expires_at ASC").
|
||||
Limit(limit).
|
||||
Find(&rows).Error
|
||||
return rows, err
|
||||
}
|
||||
|
||||
// DeleteByID removes a single session row.
|
||||
func (r *PendingUploadRepository) DeleteByID(id uint) error {
|
||||
return r.db.Delete(&models.PendingUpload{}, id).Error
|
||||
}
|
||||
|
||||
// Transaction runs fn inside a DB transaction. Mirrors how task_service
|
||||
// composes multi-step writes.
|
||||
func (r *PendingUploadRepository) Transaction(fn func(tx *gorm.DB) error) error {
|
||||
return r.db.Transaction(fn)
|
||||
}
|
||||
@@ -260,6 +260,20 @@ func SetupRouter(deps *Dependencies) *echo.Echo {
|
||||
if deps.StorageService != nil {
|
||||
uploadHandler = handlers.NewUploadHandler(deps.StorageService, services.NewFileOwnershipService(deps.DB))
|
||||
mediaHandler = handlers.NewMediaHandler(documentRepo, taskRepo, residenceRepo, deps.StorageService)
|
||||
|
||||
// Presigned-URL upload path requires S3-compatible backend. With local
|
||||
// disk we silently skip; the route returns 500 if hit.
|
||||
if s3 := deps.StorageService.S3Backend(); s3 != nil {
|
||||
pendingUploadRepo := repositories.NewPendingUploadRepository(deps.DB)
|
||||
uploadService := services.NewUploadService(pendingUploadRepo, s3, &cfg.Storage, deps.Cache)
|
||||
uploadHandler.SetUploadService(uploadService)
|
||||
// Task and document services need the upload service to claim
|
||||
// pending_uploads rows when /api/task-completions/ or /api/documents/
|
||||
// is called with `upload_ids: [..]` instead of multipart.
|
||||
taskService.SetUploadService(uploadService)
|
||||
documentService.SetStorageService(deps.StorageService)
|
||||
documentService.SetUploadService(uploadService)
|
||||
}
|
||||
}
|
||||
|
||||
// Legacy Prometheus-shaped metrics from internal/monitoring (consumed by
|
||||
@@ -724,6 +738,7 @@ func setupUploadRoutes(api *echo.Group, uploadHandler *handlers.UploadHandler) {
|
||||
uploads.POST("/image/", uploadHandler.UploadImage)
|
||||
uploads.POST("/document/", uploadHandler.UploadDocument)
|
||||
uploads.POST("/completion/", uploadHandler.UploadCompletion)
|
||||
uploads.POST("/presign/", uploadHandler.PresignUpload)
|
||||
uploads.DELETE("/", uploadHandler.DeleteFile)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,9 +22,11 @@ import (
|
||||
|
||||
// DocumentService handles document business logic
|
||||
type DocumentService struct {
|
||||
documentRepo *repositories.DocumentRepository
|
||||
residenceRepo *repositories.ResidenceRepository
|
||||
cache *CacheService
|
||||
documentRepo *repositories.DocumentRepository
|
||||
residenceRepo *repositories.ResidenceRepository
|
||||
storageService *StorageService
|
||||
uploadService *UploadService
|
||||
cache *CacheService
|
||||
}
|
||||
|
||||
// NewDocumentService creates a new document service
|
||||
@@ -40,6 +42,19 @@ func (s *DocumentService) SetCacheService(cache *CacheService) {
|
||||
s.cache = cache
|
||||
}
|
||||
|
||||
// SetStorageService wires the storage service so URLs for presigned uploads
|
||||
// can be generated using the same BaseURL the legacy uploader uses.
|
||||
func (s *DocumentService) SetStorageService(ss *StorageService) {
|
||||
s.storageService = ss
|
||||
}
|
||||
|
||||
// SetUploadService wires the presigned-URL upload service so CreateDocument
|
||||
// can claim pending_uploads rows by id and convert them into document_image
|
||||
// rows (or, for category=document_file, set the document's main file fields).
|
||||
func (s *DocumentService) SetUploadService(us *UploadService) {
|
||||
s.uploadService = us
|
||||
}
|
||||
|
||||
// GetDocument gets a document by ID with access check
|
||||
func (s *DocumentService) GetDocument(ctx context.Context, documentID, userID uint) (*responses.DocumentResponse, error) {
|
||||
document, err := s.documentRepo.WithContext(ctx).FindByID(documentID)
|
||||
@@ -154,11 +169,42 @@ func (s *DocumentService) CreateDocument(ctx context.Context, req *requests.Crea
|
||||
IsActive: true,
|
||||
}
|
||||
|
||||
// Claim presigned uploads BEFORE the document insert. If the client
|
||||
// passed a category=document_file row, lift it onto the document's
|
||||
// FileURL/FileName/FileSize/MimeType fields rather than creating an
|
||||
// image row for it. Image categories produce DocumentImage rows below.
|
||||
var claimedUploads []models.PendingUpload
|
||||
if len(req.UploadIDs) > 0 && s.uploadService != nil {
|
||||
var claimErr error
|
||||
claimedUploads, claimErr = s.uploadService.VerifyAndClaim(ctx, userID, req.UploadIDs)
|
||||
if claimErr != nil {
|
||||
return nil, claimErr
|
||||
}
|
||||
// Lift the (single) document_file upload, if present, onto the
|
||||
// document fields. Multiple document_file claims aren't meaningful;
|
||||
// take the first and ignore extras to keep the surface narrow.
|
||||
for _, pu := range claimedUploads {
|
||||
if pu.Category == models.UploadCategoryDocumentFile {
|
||||
if document.FileURL == "" {
|
||||
document.FileURL = urlForUploadKey(s.storageService, pu.B2Key)
|
||||
}
|
||||
if document.MimeType == "" {
|
||||
document.MimeType = pu.ContentType
|
||||
}
|
||||
if document.FileSize == nil && pu.ActualBytes != nil {
|
||||
b := *pu.ActualBytes
|
||||
document.FileSize = &b
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.documentRepo.WithContext(ctx).Create(document); err != nil {
|
||||
return nil, apperrors.Internal(err)
|
||||
}
|
||||
|
||||
// Create images if provided
|
||||
// Legacy multipart path — already-uploaded URLs.
|
||||
for _, imageURL := range req.ImageURLs {
|
||||
if imageURL != "" {
|
||||
img := &models.DocumentImage{
|
||||
@@ -172,6 +218,26 @@ func (s *DocumentService) CreateDocument(ctx context.Context, req *requests.Crea
|
||||
}
|
||||
}
|
||||
|
||||
// New presigned path — claimed image uploads become DocumentImage rows.
|
||||
// The document_file row (if any) was already lifted onto the document above.
|
||||
for i := range claimedUploads {
|
||||
pu := claimedUploads[i]
|
||||
if pu.Category == models.UploadCategoryDocumentFile {
|
||||
continue
|
||||
}
|
||||
img := &models.DocumentImage{
|
||||
DocumentID: document.ID,
|
||||
ImageURL: urlForUploadKey(s.storageService, pu.B2Key),
|
||||
PendingUploadID: &pu.ID,
|
||||
}
|
||||
if err := s.documentRepo.WithContext(ctx).CreateDocumentImage(img); err != nil {
|
||||
// Don't fail the whole document for an image insert failure;
|
||||
// matches the legacy ImageURLs behavior. The orphaned upload
|
||||
// row is benign (still claimed, just unreferenced).
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Reload with relations
|
||||
document, err = s.documentRepo.WithContext(ctx).FindByID(document.ID)
|
||||
if err != nil {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio-go/v7"
|
||||
@@ -101,3 +102,81 @@ func (b *S3Backend) ReadStream(key string) (io.ReadCloser, error) {
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
// PresignedPostResult is the data a client needs to perform a direct multipart
|
||||
// POST to S3-compatible storage. The caller assembles a multipart/form-data
|
||||
// request with the fields below as form parts (in order) and the file last.
|
||||
type PresignedPostResult struct {
|
||||
URL string // e.g. https://s3.us-east-005.backblazeb2.com/honeyDueProd
|
||||
Fields map[string]string // policy, x-amz-*, key, Content-Type, etc.
|
||||
}
|
||||
|
||||
// PresignedPost generates a POST policy that constrains uploads at the protocol
|
||||
// level: only the named key, only the named content-type, only sizes within
|
||||
// the requested range. S3 (and B2's S3-compatible endpoint) reject anything
|
||||
// that doesn't satisfy every condition before accepting the body.
|
||||
//
|
||||
// minBytes/maxBytes are inclusive. The returned URL + Fields can be sent
|
||||
// straight to the client.
|
||||
func (b *S3Backend) PresignedPost(ctx context.Context, key, contentType string, minBytes, maxBytes int64, ttl time.Duration) (*PresignedPostResult, error) {
|
||||
policy := minio.NewPostPolicy()
|
||||
if err := policy.SetBucket(b.bucket); err != nil {
|
||||
return nil, fmt.Errorf("set bucket: %w", err)
|
||||
}
|
||||
if err := policy.SetKey(key); err != nil {
|
||||
return nil, fmt.Errorf("set key: %w", err)
|
||||
}
|
||||
if err := policy.SetContentType(contentType); err != nil {
|
||||
return nil, fmt.Errorf("set content-type: %w", err)
|
||||
}
|
||||
if err := policy.SetContentLengthRange(minBytes, maxBytes); err != nil {
|
||||
return nil, fmt.Errorf("set content-length-range: %w", err)
|
||||
}
|
||||
if err := policy.SetExpires(time.Now().UTC().Add(ttl)); err != nil {
|
||||
return nil, fmt.Errorf("set expires: %w", err)
|
||||
}
|
||||
|
||||
u, fields, err := b.client.PresignedPostPolicy(ctx, policy)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("presign post policy: %w", err)
|
||||
}
|
||||
return &PresignedPostResult{
|
||||
URL: stripQuery(u),
|
||||
Fields: fields,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Stat returns object metadata without fetching the body. Used by the attach
|
||||
// path to verify the uploaded object's size and content-type match what the
|
||||
// client claimed when requesting the presign.
|
||||
type ObjectInfo struct {
|
||||
Size int64
|
||||
ContentType string
|
||||
ETag string
|
||||
}
|
||||
|
||||
func (b *S3Backend) Stat(ctx context.Context, key string) (*ObjectInfo, error) {
|
||||
info, err := b.client.StatObject(ctx, b.bucket, key, minio.StatObjectOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stat S3 object: %w", err)
|
||||
}
|
||||
return &ObjectInfo{
|
||||
Size: info.Size,
|
||||
ContentType: info.ContentType,
|
||||
ETag: info.ETag,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// stripQuery returns the URL with its query string removed. minio-go encodes
|
||||
// the policy/signature into both the form fields and the query; the form
|
||||
// fields are the source of truth for POST policy uploads, and many clients
|
||||
// (including Apple's NSURLSession) will reject the request if the same
|
||||
// signature appears in both places.
|
||||
func stripQuery(u *url.URL) string {
|
||||
if u == nil {
|
||||
return ""
|
||||
}
|
||||
clone := *u
|
||||
clone.RawQuery = ""
|
||||
return clone.String()
|
||||
}
|
||||
|
||||
@@ -40,6 +40,16 @@ type UploadResult struct {
|
||||
MimeType string `json:"mime_type"`
|
||||
}
|
||||
|
||||
// S3Backend returns the underlying S3-compatible backend if one is in use,
|
||||
// or nil for local-disk storage. Used by the presigned-URL upload path which
|
||||
// requires features (POST policies, StatObject) only available on S3.
|
||||
func (s *StorageService) S3Backend() *S3Backend {
|
||||
if b, ok := s.backend.(*S3Backend); ok {
|
||||
return b
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewStorageService creates a new storage service with the appropriate backend.
|
||||
// If S3 config is set, uses S3-compatible storage (B2, MinIO).
|
||||
// Otherwise, uses local filesystem.
|
||||
|
||||
@@ -37,9 +37,18 @@ type TaskService struct {
|
||||
notificationService *NotificationService
|
||||
emailService *EmailService
|
||||
storageService *StorageService
|
||||
uploadService *UploadService // optional — only set when S3 storage is configured
|
||||
cache *CacheService
|
||||
}
|
||||
|
||||
// SetUploadService wires the presigned-URL upload service so CreateCompletion
|
||||
// can claim pending_uploads rows by id and convert them into completion image
|
||||
// rows. Optional: with local-disk storage there's no presigned flow and the
|
||||
// service is left nil.
|
||||
func (s *TaskService) SetUploadService(us *UploadService) {
|
||||
s.uploadService = us
|
||||
}
|
||||
|
||||
// SetCacheService wires Redis caching for residence-ID lookups.
|
||||
func (s *TaskService) SetCacheService(cache *CacheService) {
|
||||
s.cache = cache
|
||||
@@ -694,6 +703,21 @@ func (s *TaskService) CreateCompletion(ctx context.Context, req *requests.Create
|
||||
task.InProgress = false
|
||||
}
|
||||
|
||||
// New presigned-URL path: claim pending_uploads rows that the client
|
||||
// already POSTed to B2. We do this BEFORE the txn because VerifyAndClaim
|
||||
// HEADs each B2 object — we don't want to hold a Postgres transaction
|
||||
// open across HTTP calls. If the txn rolls back later, the rows stay
|
||||
// claimed but unreferenced; they're cents of storage and visible via
|
||||
// admin queries if cleanup ever matters.
|
||||
var claimedUploads []models.PendingUpload
|
||||
if len(req.UploadIDs) > 0 && s.uploadService != nil {
|
||||
var claimErr error
|
||||
claimedUploads, claimErr = s.uploadService.VerifyAndClaim(ctx, userID, req.UploadIDs)
|
||||
if claimErr != nil {
|
||||
return nil, claimErr
|
||||
}
|
||||
}
|
||||
|
||||
// P1-5 + B-07: Wrap completion creation, task update, and image creation
|
||||
// in a single transaction for atomicity. If any operation fails, all are rolled back.
|
||||
txErr := s.taskRepo.WithContext(ctx).DB().Transaction(func(tx *gorm.DB) error {
|
||||
@@ -703,7 +727,12 @@ func (s *TaskService) CreateCompletion(ctx context.Context, req *requests.Create
|
||||
if err := s.taskRepo.WithContext(ctx).UpdateTx(tx, task); err != nil {
|
||||
return err
|
||||
}
|
||||
// B-07: Create images inside the same transaction as completion
|
||||
// B-07: Create images inside the same transaction as completion.
|
||||
// Two sources contribute, both produce TaskCompletionImage rows:
|
||||
// 1. Legacy multipart path — client uploaded via the API and got
|
||||
// back URLs in req.ImageURLs.
|
||||
// 2. New presigned path — client uploaded direct to B2 and we
|
||||
// claimed the pending_uploads rows above.
|
||||
for _, imageURL := range req.ImageURLs {
|
||||
if imageURL != "" {
|
||||
img := &models.TaskCompletionImage{
|
||||
@@ -715,6 +744,17 @@ func (s *TaskService) CreateCompletion(ctx context.Context, req *requests.Create
|
||||
}
|
||||
}
|
||||
}
|
||||
for i := range claimedUploads {
|
||||
pu := claimedUploads[i]
|
||||
img := &models.TaskCompletionImage{
|
||||
CompletionID: completion.ID,
|
||||
ImageURL: urlForUploadKey(s.storageService, pu.B2Key),
|
||||
PendingUploadID: &pu.ID,
|
||||
}
|
||||
if err := tx.Create(img).Error; err != nil {
|
||||
return fmt.Errorf("failed to create completion image from upload %d: %w", pu.ID, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if txErr != nil {
|
||||
|
||||
@@ -0,0 +1,366 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"github.com/treytartt/honeydue-api/internal/apperrors"
|
||||
"github.com/treytartt/honeydue-api/internal/config"
|
||||
"github.com/treytartt/honeydue-api/internal/dto/responses"
|
||||
"github.com/treytartt/honeydue-api/internal/models"
|
||||
"github.com/treytartt/honeydue-api/internal/repositories"
|
||||
)
|
||||
|
||||
// Upload policy constants. These are NOT tier-differentiated by request — a
|
||||
// single number applies to every authenticated user regardless of free/pro
|
||||
// status. Adjust here only; per-tier overrides should be added via
|
||||
// config/SubscriptionSettings rather than splitting the constants.
|
||||
const (
|
||||
UploadMaxBytes = 10 * 1024 * 1024 // 10 MiB per single object
|
||||
UploadPresignSlackBytes = 256 // ± slack for content-length-range
|
||||
UploadPresignTTL = 15 * time.Minute // signed URL lifetime
|
||||
UploadCleanupTTL = 24 * time.Hour // unclaimed row reap window
|
||||
UploadPresignsPerHour = 50 // per-user rate cap
|
||||
UploadConcurrentUnclaimed = 10 // per-user in-flight cap
|
||||
uploadRateLimitRedisPrefix = "upload:presign:"
|
||||
)
|
||||
|
||||
// allowedContentTypes maps each category to the set of mime types accepted
|
||||
// for the upload. Anything outside this set is rejected before signing.
|
||||
var allowedContentTypes = map[models.UploadCategory]map[string]bool{
|
||||
models.UploadCategoryCompletion: {
|
||||
"image/jpeg": true,
|
||||
"image/png": true,
|
||||
"image/heic": true,
|
||||
"image/heif": true,
|
||||
"image/webp": true,
|
||||
},
|
||||
models.UploadCategoryDocumentImage: {
|
||||
"image/jpeg": true,
|
||||
"image/png": true,
|
||||
"image/heic": true,
|
||||
"image/heif": true,
|
||||
"image/webp": true,
|
||||
},
|
||||
models.UploadCategoryDocumentFile: {
|
||||
"image/jpeg": true,
|
||||
"image/png": true,
|
||||
"image/heic": true,
|
||||
"image/heif": true,
|
||||
"image/webp": true,
|
||||
"application/pdf": true,
|
||||
},
|
||||
}
|
||||
|
||||
// uploadCategoryToSubdir maps the category to the path prefix used inside the
|
||||
// bucket. Mirrors the pattern in storage_service.go.
|
||||
var uploadCategoryToSubdir = map[models.UploadCategory]string{
|
||||
models.UploadCategoryCompletion: "completions",
|
||||
models.UploadCategoryDocumentImage: "documents",
|
||||
models.UploadCategoryDocumentFile: "documents",
|
||||
}
|
||||
|
||||
// UploadService orchestrates presigned-URL upload sessions. It owns:
|
||||
//
|
||||
// - validation (size, mime, category)
|
||||
// - quota + rate-limit enforcement (Redis preferred; DB fallback)
|
||||
// - signing the B2 POST policy via the existing S3Backend
|
||||
// - tracking the session in pending_uploads so the attach path can verify
|
||||
// and claim the object
|
||||
type UploadService struct {
|
||||
repo *repositories.PendingUploadRepository
|
||||
s3 *S3Backend
|
||||
cfg *config.StorageConfig
|
||||
cache *CacheService
|
||||
redisEnabled bool
|
||||
}
|
||||
|
||||
// NewUploadService wires the service. s3 may be nil if storage isn't
|
||||
// configured for S3 (local-disk dev mode); presign requests will then
|
||||
// return a clear error.
|
||||
func NewUploadService(
|
||||
repo *repositories.PendingUploadRepository,
|
||||
s3 *S3Backend,
|
||||
cfg *config.StorageConfig,
|
||||
cache *CacheService,
|
||||
) *UploadService {
|
||||
return &UploadService{
|
||||
repo: repo,
|
||||
s3: s3,
|
||||
cfg: cfg,
|
||||
cache: cache,
|
||||
redisEnabled: cache != nil,
|
||||
}
|
||||
}
|
||||
|
||||
// Presign validates the request, enforces quota + rate limits, signs a B2
|
||||
// POST policy, persists the pending_uploads row, and returns the response
|
||||
// the client needs to perform the upload.
|
||||
//
|
||||
// Errors are mapped to apperrors so the HTTP layer can return the right
|
||||
// status code:
|
||||
//
|
||||
// 413 — content_length over UploadMaxBytes
|
||||
// 422 — content_type not allowed for the category
|
||||
// 429 — over the rate limit OR over the concurrent in-flight cap
|
||||
// 500 — storage not configured / signing failure
|
||||
func (s *UploadService) Presign(
|
||||
ctx context.Context,
|
||||
userID uint,
|
||||
category models.UploadCategory,
|
||||
contentType string,
|
||||
contentLength int64,
|
||||
) (*responses.PresignUploadResponse, error) {
|
||||
if s.s3 == nil {
|
||||
return nil, apperrors.Internal(fmt.Errorf("upload: S3 backend not configured"))
|
||||
}
|
||||
|
||||
// Size cap. 413 semantically; we use BadRequest because that's the
|
||||
// pattern across the codebase for dto-validation rejections.
|
||||
if contentLength <= 0 || contentLength > UploadMaxBytes {
|
||||
return nil, apperrors.BadRequest("error.upload_too_large")
|
||||
}
|
||||
|
||||
// Mime check is per-category — completion photos can't be PDFs.
|
||||
allowed, ok := allowedContentTypes[category]
|
||||
if !ok {
|
||||
return nil, apperrors.BadRequest("error.upload_invalid_category")
|
||||
}
|
||||
if !allowed[strings.ToLower(contentType)] {
|
||||
return nil, apperrors.BadRequest("error.upload_unsupported_content_type")
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
// Concurrency cap: how many sessions does this user have unclaimed and
|
||||
// not yet expired? Cheap COUNT, indexed.
|
||||
active, err := s.repo.WithContext(ctx).CountUnclaimedActiveForUser(userID, now)
|
||||
if err != nil {
|
||||
return nil, apperrors.Internal(err)
|
||||
}
|
||||
if active >= UploadConcurrentUnclaimed {
|
||||
return nil, apperrors.TooManyRequests("error.upload_too_many_in_flight")
|
||||
}
|
||||
|
||||
// Rate limit: presigns issued in the last hour. Redis when available
|
||||
// (cheap atomic counter); DB fallback for dev/test.
|
||||
if err := s.checkRateLimit(ctx, userID, now); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Pick a stable storage key. UUID + extension is enough — no need for the
|
||||
// timestamp-prefix trick in storage_service.go because we never list this
|
||||
// path; lookups go through pending_uploads.id.
|
||||
subdir := uploadCategoryToSubdir[category]
|
||||
if subdir == "" {
|
||||
subdir = "uploads"
|
||||
}
|
||||
ext := extensionForContentType(contentType)
|
||||
key := fmt.Sprintf("uploads/%s/%d/%s%s", subdir, userID, uuid.New().String(), ext)
|
||||
|
||||
// Sign the POST policy. The slack window lets the client encode once;
|
||||
// the server still rejects anything materially different from claimed.
|
||||
minB := contentLength - UploadPresignSlackBytes
|
||||
if minB < 0 {
|
||||
minB = 0
|
||||
}
|
||||
maxB := contentLength + UploadPresignSlackBytes
|
||||
if maxB > UploadMaxBytes {
|
||||
maxB = UploadMaxBytes
|
||||
}
|
||||
|
||||
post, err := s.s3.PresignedPost(ctx, key, contentType, minB, maxB, UploadPresignTTL)
|
||||
if err != nil {
|
||||
return nil, apperrors.Internal(fmt.Errorf("presign upload: %w", err))
|
||||
}
|
||||
|
||||
expiresAt := now.Add(UploadPresignTTL)
|
||||
row := &models.PendingUpload{
|
||||
UserID: userID,
|
||||
Category: category,
|
||||
B2Key: key,
|
||||
ContentType: strings.ToLower(contentType),
|
||||
ExpectedBytes: contentLength,
|
||||
ExpiresAt: expiresAt,
|
||||
}
|
||||
if err := s.repo.WithContext(ctx).Create(row); err != nil {
|
||||
return nil, apperrors.Internal(err)
|
||||
}
|
||||
|
||||
return &responses.PresignUploadResponse{
|
||||
ID: row.ID,
|
||||
URL: post.URL,
|
||||
Fields: post.Fields,
|
||||
Key: key,
|
||||
ExpiresAt: expiresAt.Format(time.RFC3339),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// VerifyAndClaim is called from CreateCompletion / CreateDocument. It locks
|
||||
// the pending rows, HEADs each B2 object to confirm size + content-type,
|
||||
// flips claimed_at, and returns the verified rows so the caller can write
|
||||
// task_completion_image / document_image rows referencing them.
|
||||
//
|
||||
// If any verification fails, the entire batch is rejected and no rows are
|
||||
// claimed — atomic semantics matter so one bad upload doesn't half-attach.
|
||||
func (s *UploadService) VerifyAndClaim(
|
||||
ctx context.Context,
|
||||
userID uint,
|
||||
uploadIDs []uint,
|
||||
) ([]models.PendingUpload, error) {
|
||||
if len(uploadIDs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
if s.s3 == nil {
|
||||
return nil, apperrors.Internal(fmt.Errorf("upload: S3 backend not configured"))
|
||||
}
|
||||
|
||||
rows, err := s.repo.WithContext(ctx).FindUnclaimedForUser(userID, uploadIDs)
|
||||
if err != nil {
|
||||
return nil, apperrors.Internal(err)
|
||||
}
|
||||
if len(rows) != len(uploadIDs) {
|
||||
// Either some IDs don't exist, belong to another user, or are already
|
||||
// claimed. We don't differentiate — same status either way.
|
||||
return nil, apperrors.NotFound("error.upload_not_found")
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
verified := make([]models.PendingUpload, 0, len(rows))
|
||||
for i := range rows {
|
||||
r := rows[i]
|
||||
if r.IsExpired(now) {
|
||||
return nil, apperrors.BadRequest("error.upload_expired")
|
||||
}
|
||||
|
||||
info, err := s.s3.Stat(ctx, r.B2Key)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Uint("upload_id", r.ID).Str("key", r.B2Key).Msg("upload claim: stat failed")
|
||||
return nil, apperrors.BadRequest("error.upload_not_uploaded")
|
||||
}
|
||||
// Size must match the claimed bytes within the policy slack window.
|
||||
// Anything outside it means the client lied or B2 misreported.
|
||||
if info.Size < r.ExpectedBytes-UploadPresignSlackBytes ||
|
||||
info.Size > r.ExpectedBytes+UploadPresignSlackBytes {
|
||||
return nil, apperrors.BadRequest("error.upload_size_mismatch")
|
||||
}
|
||||
|
||||
if err := s.repo.WithContext(ctx).MarkClaimed(r.ID, info.Size, now); err != nil {
|
||||
return nil, apperrors.Internal(err)
|
||||
}
|
||||
r.ActualBytes = &info.Size
|
||||
r.ClaimedAt = &now
|
||||
verified = append(verified, r)
|
||||
}
|
||||
return verified, nil
|
||||
}
|
||||
|
||||
// CleanupExpired finds unclaimed rows past their expires_at, deletes the
|
||||
// corresponding B2 objects, and removes the rows. Called from the Asynq
|
||||
// hourly cron in cmd/worker.
|
||||
//
|
||||
// Returns the number of rows reaped. Errors per row are logged and the loop
|
||||
// continues — one stuck object shouldn't block the others.
|
||||
func (s *UploadService) CleanupExpired(ctx context.Context, batchLimit int) (int, error) {
|
||||
if s.s3 == nil {
|
||||
return 0, fmt.Errorf("upload: S3 backend not configured")
|
||||
}
|
||||
now := time.Now().UTC()
|
||||
rows, err := s.repo.WithContext(ctx).FindExpiredUnclaimed(now, batchLimit)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("find expired: %w", err)
|
||||
}
|
||||
reaped := 0
|
||||
for _, r := range rows {
|
||||
if err := s.s3.Delete(r.B2Key); err != nil {
|
||||
log.Warn().Err(err).Uint("upload_id", r.ID).Str("key", r.B2Key).Msg("upload cleanup: B2 delete failed (continuing)")
|
||||
// Continue to row delete anyway — bucket lifecycle backstop will
|
||||
// pick up the orphaned B2 object after 7 days.
|
||||
}
|
||||
if err := s.repo.WithContext(ctx).DeleteByID(r.ID); err != nil {
|
||||
log.Warn().Err(err).Uint("upload_id", r.ID).Msg("upload cleanup: row delete failed")
|
||||
continue
|
||||
}
|
||||
reaped++
|
||||
}
|
||||
return reaped, nil
|
||||
}
|
||||
|
||||
// checkRateLimit enforces UploadPresignsPerHour using Redis (preferred) or
|
||||
// the DB as a fallback. Returns a 429 apperror when over.
|
||||
func (s *UploadService) checkRateLimit(ctx context.Context, userID uint, now time.Time) error {
|
||||
if s.redisEnabled {
|
||||
bucket := now.Truncate(time.Hour).Unix()
|
||||
key := fmt.Sprintf("%s%d:%d", uploadRateLimitRedisPrefix, userID, bucket)
|
||||
client := s.cache.Client()
|
||||
count, err := client.Incr(ctx, key).Result()
|
||||
if err == nil {
|
||||
// Best-effort EXPIRE; only set on first INCR (count == 1).
|
||||
if count == 1 {
|
||||
_ = client.Expire(ctx, key, 2*time.Hour).Err()
|
||||
}
|
||||
if count > UploadPresignsPerHour {
|
||||
return apperrors.TooManyRequests("error.upload_rate_limit")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// Redis transient failure — fall through to DB. Don't open the gates,
|
||||
// don't crash the request.
|
||||
if err != redis.Nil {
|
||||
log.Warn().Err(err).Msg("upload rate limit: redis unavailable, falling back to DB")
|
||||
}
|
||||
}
|
||||
since := now.Add(-1 * time.Hour)
|
||||
count, err := s.repo.WithContext(ctx).CountCreatedSinceForUser(userID, since)
|
||||
if err != nil {
|
||||
return apperrors.Internal(err)
|
||||
}
|
||||
if count >= UploadPresignsPerHour {
|
||||
return apperrors.TooManyRequests("error.upload_rate_limit")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// urlForUploadKey returns the public URL for a stored object given its B2
|
||||
// key. Mirrors the URL format used by StorageService.Upload so the existing
|
||||
// media handler can serve presigned-uploaded objects without changes.
|
||||
//
|
||||
// If storageService is nil (shouldn't happen in production but defensive)
|
||||
// we fall back to returning the raw key prefixed with "/" — better than an
|
||||
// empty URL on a clearly buggy code path.
|
||||
func urlForUploadKey(storageService *StorageService, b2Key string) string {
|
||||
if storageService == nil || storageService.cfg == nil {
|
||||
return "/" + b2Key
|
||||
}
|
||||
base := strings.TrimRight(storageService.cfg.BaseURL, "/")
|
||||
return fmt.Sprintf("%s/%s", base, b2Key)
|
||||
}
|
||||
|
||||
// extensionForContentType picks a sensible file extension for the storage
|
||||
// key. Falls back to .bin so the key is always non-empty even for unknown
|
||||
// types — the policy will reject anything with the wrong content-type
|
||||
// regardless of the extension we pick here.
|
||||
func extensionForContentType(contentType string) string {
|
||||
switch strings.ToLower(strings.TrimSpace(contentType)) {
|
||||
case "image/jpeg", "image/jpg":
|
||||
return ".jpg"
|
||||
case "image/png":
|
||||
return ".png"
|
||||
case "image/heic":
|
||||
return ".heic"
|
||||
case "image/heif":
|
||||
return ".heif"
|
||||
case "image/webp":
|
||||
return ".webp"
|
||||
case "application/pdf":
|
||||
return ".pdf"
|
||||
default:
|
||||
return ".bin"
|
||||
}
|
||||
}
|
||||
@@ -81,6 +81,7 @@ func SetupTestDB(t *testing.T) *gorm.DB {
|
||||
&models.TierLimits{},
|
||||
&models.FeatureBenefit{},
|
||||
&models.UpgradeTrigger{},
|
||||
&models.PendingUpload{},
|
||||
&models.Promotion{},
|
||||
&models.AuditLog{},
|
||||
&models.TaskTemplate{},
|
||||
|
||||
@@ -26,6 +26,7 @@ const (
|
||||
TypeSendPush = "push:send"
|
||||
TypeOnboardingEmails = "email:onboarding"
|
||||
TypeReminderLogCleanup = "maintenance:reminder_log_cleanup"
|
||||
TypeUploadCleanup = "maintenance:upload_cleanup" // Reaps expired pending_uploads
|
||||
)
|
||||
|
||||
// Handler handles background job processing
|
||||
@@ -39,9 +40,17 @@ type Handler struct {
|
||||
emailService EmailSender
|
||||
notificationService NotificationSender
|
||||
onboardingService OnboardingEmailSender
|
||||
uploadService *services.UploadService
|
||||
config *config.Config
|
||||
}
|
||||
|
||||
// SetUploadService wires the upload service so HandleUploadCleanup can reap
|
||||
// expired pending_uploads rows. Optional; nil-safe — the cleanup handler
|
||||
// no-ops when not configured (e.g. local-disk dev environments).
|
||||
func (h *Handler) SetUploadService(us *services.UploadService) {
|
||||
h.uploadService = us
|
||||
}
|
||||
|
||||
// NewHandler creates a new job handler
|
||||
func NewHandler(db *gorm.DB, pushClient *push.Client, emailService *services.EmailService, notificationService *services.NotificationService, cfg *config.Config) *Handler {
|
||||
h := &Handler{
|
||||
@@ -647,3 +656,24 @@ func (h *Handler) HandleReminderLogCleanup(ctx context.Context, task *asynq.Task
|
||||
log.Info().Int64("deleted", deleted).Msg("Reminder log cleanup completed")
|
||||
return nil
|
||||
}
|
||||
|
||||
// HandleUploadCleanup reaps expired pending_uploads rows and their B2 objects.
|
||||
//
|
||||
// Runs hourly. Each tick processes up to 500 expired sessions; if the queue
|
||||
// is deeper than that, the next hourly run picks up the rest. The B2 bucket
|
||||
// also has a 7-day lifecycle rule on the uploads/ prefix as a backstop in
|
||||
// case this worker is offline for long stretches.
|
||||
func (h *Handler) HandleUploadCleanup(ctx context.Context, task *asynq.Task) error {
|
||||
if h.uploadService == nil {
|
||||
log.Debug().Msg("Upload cleanup skipped: upload service not configured (local-disk storage)")
|
||||
return nil
|
||||
}
|
||||
log.Info().Msg("Processing pending_uploads cleanup...")
|
||||
reaped, err := h.uploadService.CleanupExpired(ctx, 500)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Pending uploads cleanup failed")
|
||||
return err
|
||||
}
|
||||
log.Info().Int("reaped", reaped).Msg("Pending uploads cleanup completed")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
-- +goose Up
|
||||
-- pending_uploads tracks short-lived presigned-URL upload sessions for direct
|
||||
-- client-to-B2 uploads. A row is created when the client requests a presigned
|
||||
-- POST policy, and either claimed (linked to a task_completion_image or
|
||||
-- document_image) or reaped by the cleanup worker after expiry.
|
||||
CREATE TABLE pending_uploads (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
user_id BIGINT NOT NULL REFERENCES auth_user(id) ON DELETE CASCADE,
|
||||
category VARCHAR(32) NOT NULL,
|
||||
b2_key VARCHAR(255) NOT NULL UNIQUE,
|
||||
content_type VARCHAR(127) NOT NULL,
|
||||
expected_bytes BIGINT NOT NULL,
|
||||
actual_bytes BIGINT,
|
||||
claimed_at TIMESTAMPTZ,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
expires_at TIMESTAMPTZ NOT NULL
|
||||
);
|
||||
|
||||
-- Quota lookups: SUM/COUNT by user, ordered by recency.
|
||||
CREATE INDEX idx_pending_uploads_user_created
|
||||
ON pending_uploads (user_id, created_at DESC);
|
||||
|
||||
-- Cleanup worker scan: only unclaimed expired rows. Partial index keeps it tiny.
|
||||
CREATE INDEX idx_pending_uploads_cleanup
|
||||
ON pending_uploads (expires_at) WHERE claimed_at IS NULL;
|
||||
|
||||
-- task_completion_image and document_image gain an optional FK to the
|
||||
-- pending_uploads row that produced them. Nullable so legacy rows (uploaded
|
||||
-- through the multipart path) keep working.
|
||||
ALTER TABLE task_taskcompletionimage
|
||||
ADD COLUMN pending_upload_id BIGINT REFERENCES pending_uploads(id) ON DELETE SET NULL;
|
||||
|
||||
ALTER TABLE task_documentimage
|
||||
ADD COLUMN pending_upload_id BIGINT REFERENCES pending_uploads(id) ON DELETE SET NULL;
|
||||
|
||||
-- +goose Down
|
||||
ALTER TABLE task_documentimage DROP COLUMN IF EXISTS pending_upload_id;
|
||||
ALTER TABLE task_taskcompletionimage DROP COLUMN IF EXISTS pending_upload_id;
|
||||
DROP TABLE IF EXISTS pending_uploads;
|
||||
Reference in New Issue
Block a user