diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 0a7bf05..7e73199 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -167,6 +167,20 @@ func main() { // Create job handler jobHandler := jobs.NewHandler(db, pushClient, emailService, notificationService, cfg) + // Wire upload service for the pending_uploads cleanup cron. Storage may + // be local-disk (no S3 backend), in which case the upload service stays + // nil and the cleanup handler no-ops. Cache is optional — the cleanup + // path doesn't rate-limit and works fine with a nil cache. + if storageService, sErr := services.NewStorageService(&cfg.Storage); sErr == nil { + if s3 := storageService.S3Backend(); s3 != nil { + pendingUploadRepo := repositories.NewPendingUploadRepository(db) + uploadService := services.NewUploadService(pendingUploadRepo, s3, &cfg.Storage, nil) + jobHandler.SetUploadService(uploadService) + } + } else { + log.Warn().Err(sErr).Msg("Failed to initialize storage service for upload cleanup; cleanup cron will no-op") + } + // Create Asynq mux and register handlers mux := asynq.NewServeMux() @@ -180,6 +194,7 @@ func main() { mux.HandleFunc(jobs.TypeSendPush, jobHandler.HandleSendPush) mux.HandleFunc(jobs.TypeOnboardingEmails, jobHandler.HandleOnboardingEmails) mux.HandleFunc(jobs.TypeReminderLogCleanup, jobHandler.HandleReminderLogCleanup) + mux.HandleFunc(jobs.TypeUploadCleanup, jobHandler.HandleUploadCleanup) // Register email job handlers (welcome, verification, password reset, password changed) if emailService != nil { @@ -219,6 +234,15 @@ func main() { } log.Info().Str("cron", "0 3 * * *").Msg("Registered reminder log cleanup job (runs daily at 3:00 AM UTC)") + // Schedule pending_uploads cleanup (hourly at :30 to avoid colliding with + // the top-of-hour reminder + digest crons). Reaps unclaimed expired + // upload sessions; the B2 bucket lifecycle (7 days on uploads/ prefix) + // is the backstop if this worker is offline for an extended period. + if _, err := scheduler.Register("30 * * * *", asynq.NewTask(jobs.TypeUploadCleanup, nil)); err != nil { + log.Fatal().Err(err).Msg("Failed to register upload cleanup job") + } + log.Info().Str("cron", "30 * * * *").Msg("Registered pending_uploads cleanup job (runs hourly)") + // Handle graceful shutdown quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) diff --git a/deploy-k3s/manifests/b2-lifecycle.md b/deploy-k3s/manifests/b2-lifecycle.md new file mode 100644 index 0000000..e698673 --- /dev/null +++ b/deploy-k3s/manifests/b2-lifecycle.md @@ -0,0 +1,57 @@ +# B2 bucket lifecycle — `uploads/` prefix + +The `pending_uploads` cleanup worker (cron `30 * * * *`, see +`internal/worker/jobs/handler.go::HandleUploadCleanup`) reaps unclaimed +upload sessions every hour, deleting both the row and the corresponding B2 +object. This bucket-level lifecycle rule is a **backstop** — it catches B2 +objects that survive the row deletion (e.g. worker crashed mid-loop, B2 +delete errored, manual DB tampering). + +## Rule + +Apply via the Backblaze web console: **Bucket → `honeyDueProd` → Lifecycle Settings → Custom** + +```json +[ + { + "fileNamePrefix": "uploads/", + "daysFromUploadingToHiding": 7, + "daysFromHidingToDeleting": 1 + } +] +``` + +Effect: any object under the `uploads/` prefix is hidden 7 days after +upload, then permanently deleted 1 day after that. Total maximum lifetime +of an orphaned object: 8 days. + +This rule does NOT affect: + +- `images/`, `documents/`, `completions/` — legacy multipart-uploaded + objects, which are managed by the existing `task_completion_image` / + `document_image` / `document.file_url` references. + +## Why a backstop, not the primary mechanism + +The application worker is the primary mechanism because: + +1. It can delete the **DB row** alongside the B2 object — lifecycle alone + would leave dangling `pending_uploads` rows. +2. It runs hourly vs. lifecycle's once-per-day evaluation — much tighter + recovery window for the common case. +3. It produces logs / metrics for orphan rate observability. + +## Verification + +After applying: + +```bash +b2 bucket get-info honeyDueProd | jq '.lifecycleRules' +``` + +Should show the rule above. If you don't have the B2 CLI: + +```bash +curl -u "$B2_KEY_ID:$B2_APP_KEY" https://api.backblazeb2.com/b2api/v3/b2_authorize_account +# Then use the returned authorization_token + apiUrl to call b2_get_bucket +``` diff --git a/internal/dto/requests/document.go b/internal/dto/requests/document.go index 4dee0e9..e02e60d 100644 --- a/internal/dto/requests/document.go +++ b/internal/dto/requests/document.go @@ -25,7 +25,13 @@ type CreateDocumentRequest struct { SerialNumber string `json:"serial_number" validate:"max=100"` ModelNumber string `json:"model_number" validate:"max=100"` TaskID *uint `json:"task_id"` - ImageURLs []string `json:"image_urls" validate:"omitempty,max=20,dive,max=500"` // Multiple image URLs + ImageURLs []string `json:"image_urls" validate:"omitempty,max=20,dive,max=500"` // Legacy multipart upload path + // UploadIDs claims pending_uploads rows produced by the presigned-URL + // upload flow and turns them into document_image rows. May be combined + // with ImageURLs during the rollout window. UploadIDs of category + // "document_file" attach to the document's main FileURL/FileName fields + // instead — the service infers placement from the row's category. + UploadIDs []uint `json:"upload_ids" validate:"omitempty,max=20"` } // UpdateDocumentRequest represents the request to update a document diff --git a/internal/dto/requests/task.go b/internal/dto/requests/task.go index a8a20df..4db162a 100644 --- a/internal/dto/requests/task.go +++ b/internal/dto/requests/task.go @@ -107,7 +107,21 @@ type CreateTaskCompletionRequest struct { Notes string `json:"notes" validate:"max=10000"` ActualCost *decimal.Decimal `json:"actual_cost"` Rating *int `json:"rating" validate:"omitempty,min=1,max=5"` // 1-5 star rating - ImageURLs []string `json:"image_urls" validate:"omitempty,max=20,dive,max=500"` // Multiple image URLs + + // ImageURLs is the legacy multipart-upload path: the handler uploaded the + // images first via the same request and produced URLs. Still supported for + // older client builds. + ImageURLs []string `json:"image_urls" validate:"omitempty,max=20,dive,max=500"` + + // UploadIDs is the new direct-to-B2 path: the client uploaded each image + // via a presigned URL and now claims the resulting pending_uploads rows + // by id. The service verifies ownership + size, marks each row claimed, + // and creates task_completion_image rows from them. + // + // If both ImageURLs and UploadIDs are present, both contribute to the + // final set of images so a single completion can mix legacy and new + // uploads (helps during the rollout window). + UploadIDs []uint `json:"upload_ids" validate:"omitempty,max=20"` } // UpdateTaskCompletionRequest represents the request to update a task completion diff --git a/internal/dto/requests/upload.go b/internal/dto/requests/upload.go new file mode 100644 index 0000000..8097266 --- /dev/null +++ b/internal/dto/requests/upload.go @@ -0,0 +1,22 @@ +package requests + +// PresignUploadRequest is the body for POST /api/uploads/presign. The client +// describes what it's about to upload; the server validates against quota, +// rate limits, and per-category caps before returning a signed POST policy. +type PresignUploadRequest struct { + // Category gates allowed mime types and the size cap. One of: + // "completion" — task completion photos + // "document_image" — image attached to a Document + // "document_file" — file (e.g. PDF) attached to a Document + Category string `json:"category" validate:"required,oneof=completion document_image document_file"` + + // ContentType is the MIME type the client will upload (e.g. image/jpeg). + // Bound to the policy so the actual upload must match exactly. + ContentType string `json:"content_type" validate:"required,min=3,max=127"` + + // ContentLength is the exact byte count the client intends to upload. + // The signed policy permits a small slack window around this value + // (server-side constant) so the client can encode in one pass without + // having to predict the byte count perfectly. + ContentLength int64 `json:"content_length" validate:"required,min=1"` +} diff --git a/internal/dto/responses/upload.go b/internal/dto/responses/upload.go new file mode 100644 index 0000000..0285e42 --- /dev/null +++ b/internal/dto/responses/upload.go @@ -0,0 +1,28 @@ +package responses + +// PresignUploadResponse is what /api/uploads/presign returns to the client. +// +// The client uses URL + Fields to build a multipart/form-data POST directly +// to S3-compatible storage (B2). Once the upload completes, the client calls +// the relevant entity-creation endpoint (POST /api/task-completions/, POST +// /api/documents/) with `upload_ids: [Id]` to claim and attach the object. +type PresignUploadResponse struct { + // ID is the pending_uploads.id the client passes back via upload_ids[]. + ID uint `json:"id"` + + // URL is the storage endpoint to POST to (no query string). + URL string `json:"upload_url"` + + // Fields are the form fields (policy, signature, key, etc.) that must be + // submitted with the multipart form. The file part must be named "file" + // and come last per S3 POST policy rules. + Fields map[string]string `json:"fields"` + + // Key is the object key chosen by the server. Echoed for client logging + // and debugging; the canonical reference is via ID. + Key string `json:"key"` + + // ExpiresAt is when the signed URL stops working. Clients should retry + // with a fresh presign rather than relying on long-lived URLs. + ExpiresAt string `json:"expires_at"` +} diff --git a/internal/handlers/upload_handler.go b/internal/handlers/upload_handler.go index a3373b5..f710634 100644 --- a/internal/handlers/upload_handler.go +++ b/internal/handlers/upload_handler.go @@ -7,9 +7,11 @@ import ( "github.com/rs/zerolog/log" "github.com/treytartt/honeydue-api/internal/apperrors" + "github.com/treytartt/honeydue-api/internal/dto/requests" "github.com/treytartt/honeydue-api/internal/dto/responses" "github.com/treytartt/honeydue-api/internal/i18n" "github.com/treytartt/honeydue-api/internal/middleware" + "github.com/treytartt/honeydue-api/internal/models" "github.com/treytartt/honeydue-api/internal/services" ) @@ -22,18 +24,26 @@ type FileOwnershipChecker interface { // UploadHandler handles file upload endpoints type UploadHandler struct { - storageService *services.StorageService + storageService *services.StorageService + uploadService *services.UploadService // optional — only set when S3 storage is configured fileOwnershipChecker FileOwnershipChecker } // NewUploadHandler creates a new upload handler func NewUploadHandler(storageService *services.StorageService, fileOwnershipChecker FileOwnershipChecker) *UploadHandler { return &UploadHandler{ - storageService: storageService, + storageService: storageService, fileOwnershipChecker: fileOwnershipChecker, } } +// SetUploadService wires the presigned-URL upload service. Called from the +// router only when S3 storage is configured; with local-disk storage the +// presign endpoint is unsupported and returns 503. +func (h *UploadHandler) SetUploadService(s *services.UploadService) { + h.uploadService = s +} + // UploadImage handles POST /api/uploads/image // Accepts multipart/form-data with "file" field func (h *UploadHandler) UploadImage(c echo.Context) error { @@ -138,3 +148,39 @@ func (h *UploadHandler) DeleteFile(c echo.Context) error { return c.JSON(http.StatusOK, responses.MessageResponse{Message: i18n.LocalizedMessage(c, "message.file_deleted")}) } + +// PresignUpload handles POST /api/uploads/presign. +// +// Returns a short-lived signed POST policy that the client uses to upload an +// image or document directly to B2, bypassing the API entirely for the byte +// transfer. The returned `id` is later passed in `upload_ids[]` on the +// task-completion or document creation endpoints to attach the object. +func (h *UploadHandler) PresignUpload(c echo.Context) error { + if h.uploadService == nil { + return apperrors.Internal(nil) + } + user, err := middleware.MustGetAuthUser(c) + if err != nil { + return err + } + + var req requests.PresignUploadRequest + if err := c.Bind(&req); err != nil { + return apperrors.BadRequest("error.invalid_request") + } + if err := c.Validate(&req); err != nil { + return err + } + + resp, err := h.uploadService.Presign( + c.Request().Context(), + user.ID, + models.UploadCategory(req.Category), + req.ContentType, + req.ContentLength, + ) + if err != nil { + return err + } + return c.JSON(http.StatusCreated, resp) +} diff --git a/internal/models/document.go b/internal/models/document.go index df3b1f5..1c39237 100644 --- a/internal/models/document.go +++ b/internal/models/document.go @@ -91,6 +91,8 @@ type DocumentImage struct { DocumentID uint `gorm:"column:document_id;index;not null" json:"document_id"` ImageURL string `gorm:"column:image_url;size:500;not null" json:"image_url"` Caption string `gorm:"column:caption;size:255" json:"caption"` + // PendingUploadID — see TaskCompletionImage.PendingUploadID. + PendingUploadID *uint `gorm:"column:pending_upload_id" json:"pending_upload_id,omitempty"` } // TableName returns the table name for GORM diff --git a/internal/models/pending_upload.go b/internal/models/pending_upload.go new file mode 100644 index 0000000..faeff2b --- /dev/null +++ b/internal/models/pending_upload.go @@ -0,0 +1,53 @@ +package models + +import "time" + +// UploadCategory enumerates the kinds of objects that can be uploaded via the +// presigned-URL flow. Each category has its own size cap and mime-type +// allow-list enforced at the service layer. +type UploadCategory string + +const ( + UploadCategoryCompletion UploadCategory = "completion" + UploadCategoryDocumentImage UploadCategory = "document_image" + UploadCategoryDocumentFile UploadCategory = "document_file" +) + +// PendingUpload is a short-lived upload session created when the client asks +// for a presigned POST policy. The row tracks the intent so the server can +// validate quota / rate-limit / size up front, then attach the resulting B2 +// object to a task_completion_image or document_image once the upload lands. +// +// Lifecycle: +// +// created → upload to B2 → attach via /api/task-completions/ or /documents/ +// ↑ │ +// └─ if not claimed before expires_at, the cleanup worker (see +// internal/worker/jobs) deletes the B2 object and the row. +type PendingUpload struct { + ID uint `gorm:"primaryKey" json:"id"` + UserID uint `gorm:"column:user_id;not null;index:idx_pending_uploads_user_created,priority:1" json:"user_id"` + Category UploadCategory `gorm:"column:category;size:32;not null" json:"category"` + B2Key string `gorm:"column:b2_key;size:255;uniqueIndex" json:"b2_key"` + ContentType string `gorm:"column:content_type;size:127;not null" json:"content_type"` + ExpectedBytes int64 `gorm:"column:expected_bytes;not null" json:"expected_bytes"` + ActualBytes *int64 `gorm:"column:actual_bytes" json:"actual_bytes,omitempty"` + ClaimedAt *time.Time `gorm:"column:claimed_at" json:"claimed_at,omitempty"` + CreatedAt time.Time `gorm:"column:created_at;autoCreateTime;index:idx_pending_uploads_user_created,priority:2,sort:desc" json:"created_at"` + ExpiresAt time.Time `gorm:"column:expires_at;not null" json:"expires_at"` +} + +// TableName matches the goose migration. +func (PendingUpload) TableName() string { + return "pending_uploads" +} + +// IsClaimed reports whether the upload has been linked to a real entity. +func (p *PendingUpload) IsClaimed() bool { + return p.ClaimedAt != nil +} + +// IsExpired reports whether the upload session has passed its TTL. +func (p *PendingUpload) IsExpired(now time.Time) bool { + return now.After(p.ExpiresAt) +} diff --git a/internal/models/task.go b/internal/models/task.go index 62805cb..2c58618 100644 --- a/internal/models/task.go +++ b/internal/models/task.go @@ -215,6 +215,10 @@ type TaskCompletionImage struct { CompletionID uint `gorm:"column:completion_id;index;not null" json:"completion_id"` ImageURL string `gorm:"column:image_url;size:500;not null" json:"image_url"` Caption string `gorm:"column:caption;size:255" json:"caption"` + // PendingUploadID links to the pending_uploads row that produced this + // image when uploaded via the presigned-URL flow. Nullable: legacy rows + // uploaded through the multipart path don't have one. + PendingUploadID *uint `gorm:"column:pending_upload_id" json:"pending_upload_id,omitempty"` } // TableName returns the table name for GORM diff --git a/internal/repositories/pending_upload_repo.go b/internal/repositories/pending_upload_repo.go new file mode 100644 index 0000000..95d8d8e --- /dev/null +++ b/internal/repositories/pending_upload_repo.go @@ -0,0 +1,121 @@ +package repositories + +import ( + "context" + "time" + + "gorm.io/gorm" + "gorm.io/gorm/clause" + + "github.com/treytartt/honeydue-api/internal/models" +) + +// PendingUploadRepository handles persistence for upload sessions. +type PendingUploadRepository struct { + db *gorm.DB +} + +// NewPendingUploadRepository constructs a repo bound to the given GORM handle. +func NewPendingUploadRepository(db *gorm.DB) *PendingUploadRepository { + return &PendingUploadRepository{db: db} +} + +// WithContext returns a session bound to ctx so DB spans nest under the +// request span in tracing. +func (r *PendingUploadRepository) WithContext(ctx context.Context) *PendingUploadRepository { + return &PendingUploadRepository{db: r.db.WithContext(ctx)} +} + +// Create inserts a new upload session. +func (r *PendingUploadRepository) Create(p *models.PendingUpload) error { + return r.db.Create(p).Error +} + +// FindByID returns a single session, or gorm.ErrRecordNotFound. +func (r *PendingUploadRepository) FindByID(id uint) (*models.PendingUpload, error) { + var p models.PendingUpload + if err := r.db.First(&p, id).Error; err != nil { + return nil, err + } + return &p, nil +} + +// FindUnclaimedForUser locks and returns rows belonging to userID matching +// ids, where claimed_at IS NULL. Used by the attach path to ensure exactly +// one claim per row even under concurrent requests. Postgres applies real +// row locks; SQLite (test harness) silently ignores the clause. +// +// Caller must run inside a transaction for the lock to outlive the call. +func (r *PendingUploadRepository) FindUnclaimedForUser(userID uint, ids []uint) ([]models.PendingUpload, error) { + if len(ids) == 0 { + return nil, nil + } + var rows []models.PendingUpload + err := r.db. + Clauses(clause.Locking{Strength: "UPDATE"}). + Where("user_id = ? AND id IN ? AND claimed_at IS NULL", userID, ids). + Find(&rows).Error + return rows, err +} + +// MarkClaimed writes actual_bytes + claimed_at. Returns gorm.ErrRecordNotFound +// if the row was claimed by another transaction in the meantime. +func (r *PendingUploadRepository) MarkClaimed(id uint, actualBytes int64, now time.Time) error { + res := r.db.Model(&models.PendingUpload{}). + Where("id = ? AND claimed_at IS NULL", id). + Updates(map[string]interface{}{ + "actual_bytes": actualBytes, + "claimed_at": now, + }) + if res.Error != nil { + return res.Error + } + if res.RowsAffected == 0 { + return gorm.ErrRecordNotFound + } + return nil +} + +// CountUnclaimedActiveForUser returns how many in-flight (unclaimed, +// not-yet-expired) sessions a user holds. Used for the concurrency cap. +func (r *PendingUploadRepository) CountUnclaimedActiveForUser(userID uint, now time.Time) (int64, error) { + var n int64 + err := r.db.Model(&models.PendingUpload{}). + Where("user_id = ? AND claimed_at IS NULL AND expires_at > ?", userID, now). + Count(&n).Error + return n, err +} + +// CountCreatedSinceForUser returns the number of presign requests issued in +// the last `since` window. The service layer uses Redis for the rate-limit +// hot path; this is a fallback / consistency check. +func (r *PendingUploadRepository) CountCreatedSinceForUser(userID uint, since time.Time) (int64, error) { + var n int64 + err := r.db.Model(&models.PendingUpload{}). + Where("user_id = ? AND created_at > ?", userID, since). + Count(&n).Error + return n, err +} + +// FindExpiredUnclaimed returns up to `limit` sessions ready to reap. Caller +// is responsible for deleting the corresponding B2 objects + rows. +func (r *PendingUploadRepository) FindExpiredUnclaimed(now time.Time, limit int) ([]models.PendingUpload, error) { + var rows []models.PendingUpload + err := r.db. + Where("claimed_at IS NULL AND expires_at < ?", now). + Order("expires_at ASC"). + Limit(limit). + Find(&rows).Error + return rows, err +} + +// DeleteByID removes a single session row. +func (r *PendingUploadRepository) DeleteByID(id uint) error { + return r.db.Delete(&models.PendingUpload{}, id).Error +} + +// Transaction runs fn inside a DB transaction. Mirrors how task_service +// composes multi-step writes. +func (r *PendingUploadRepository) Transaction(fn func(tx *gorm.DB) error) error { + return r.db.Transaction(fn) +} diff --git a/internal/router/router.go b/internal/router/router.go index f24a91e..51d3ee1 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -260,6 +260,20 @@ func SetupRouter(deps *Dependencies) *echo.Echo { if deps.StorageService != nil { uploadHandler = handlers.NewUploadHandler(deps.StorageService, services.NewFileOwnershipService(deps.DB)) mediaHandler = handlers.NewMediaHandler(documentRepo, taskRepo, residenceRepo, deps.StorageService) + + // Presigned-URL upload path requires S3-compatible backend. With local + // disk we silently skip; the route returns 500 if hit. + if s3 := deps.StorageService.S3Backend(); s3 != nil { + pendingUploadRepo := repositories.NewPendingUploadRepository(deps.DB) + uploadService := services.NewUploadService(pendingUploadRepo, s3, &cfg.Storage, deps.Cache) + uploadHandler.SetUploadService(uploadService) + // Task and document services need the upload service to claim + // pending_uploads rows when /api/task-completions/ or /api/documents/ + // is called with `upload_ids: [..]` instead of multipart. + taskService.SetUploadService(uploadService) + documentService.SetStorageService(deps.StorageService) + documentService.SetUploadService(uploadService) + } } // Legacy Prometheus-shaped metrics from internal/monitoring (consumed by @@ -724,6 +738,7 @@ func setupUploadRoutes(api *echo.Group, uploadHandler *handlers.UploadHandler) { uploads.POST("/image/", uploadHandler.UploadImage) uploads.POST("/document/", uploadHandler.UploadDocument) uploads.POST("/completion/", uploadHandler.UploadCompletion) + uploads.POST("/presign/", uploadHandler.PresignUpload) uploads.DELETE("/", uploadHandler.DeleteFile) } } diff --git a/internal/services/document_service.go b/internal/services/document_service.go index 6877a02..804d82d 100644 --- a/internal/services/document_service.go +++ b/internal/services/document_service.go @@ -22,9 +22,11 @@ import ( // DocumentService handles document business logic type DocumentService struct { - documentRepo *repositories.DocumentRepository - residenceRepo *repositories.ResidenceRepository - cache *CacheService + documentRepo *repositories.DocumentRepository + residenceRepo *repositories.ResidenceRepository + storageService *StorageService + uploadService *UploadService + cache *CacheService } // NewDocumentService creates a new document service @@ -40,6 +42,19 @@ func (s *DocumentService) SetCacheService(cache *CacheService) { s.cache = cache } +// SetStorageService wires the storage service so URLs for presigned uploads +// can be generated using the same BaseURL the legacy uploader uses. +func (s *DocumentService) SetStorageService(ss *StorageService) { + s.storageService = ss +} + +// SetUploadService wires the presigned-URL upload service so CreateDocument +// can claim pending_uploads rows by id and convert them into document_image +// rows (or, for category=document_file, set the document's main file fields). +func (s *DocumentService) SetUploadService(us *UploadService) { + s.uploadService = us +} + // GetDocument gets a document by ID with access check func (s *DocumentService) GetDocument(ctx context.Context, documentID, userID uint) (*responses.DocumentResponse, error) { document, err := s.documentRepo.WithContext(ctx).FindByID(documentID) @@ -154,11 +169,42 @@ func (s *DocumentService) CreateDocument(ctx context.Context, req *requests.Crea IsActive: true, } + // Claim presigned uploads BEFORE the document insert. If the client + // passed a category=document_file row, lift it onto the document's + // FileURL/FileName/FileSize/MimeType fields rather than creating an + // image row for it. Image categories produce DocumentImage rows below. + var claimedUploads []models.PendingUpload + if len(req.UploadIDs) > 0 && s.uploadService != nil { + var claimErr error + claimedUploads, claimErr = s.uploadService.VerifyAndClaim(ctx, userID, req.UploadIDs) + if claimErr != nil { + return nil, claimErr + } + // Lift the (single) document_file upload, if present, onto the + // document fields. Multiple document_file claims aren't meaningful; + // take the first and ignore extras to keep the surface narrow. + for _, pu := range claimedUploads { + if pu.Category == models.UploadCategoryDocumentFile { + if document.FileURL == "" { + document.FileURL = urlForUploadKey(s.storageService, pu.B2Key) + } + if document.MimeType == "" { + document.MimeType = pu.ContentType + } + if document.FileSize == nil && pu.ActualBytes != nil { + b := *pu.ActualBytes + document.FileSize = &b + } + break + } + } + } + if err := s.documentRepo.WithContext(ctx).Create(document); err != nil { return nil, apperrors.Internal(err) } - // Create images if provided + // Legacy multipart path — already-uploaded URLs. for _, imageURL := range req.ImageURLs { if imageURL != "" { img := &models.DocumentImage{ @@ -172,6 +218,26 @@ func (s *DocumentService) CreateDocument(ctx context.Context, req *requests.Crea } } + // New presigned path — claimed image uploads become DocumentImage rows. + // The document_file row (if any) was already lifted onto the document above. + for i := range claimedUploads { + pu := claimedUploads[i] + if pu.Category == models.UploadCategoryDocumentFile { + continue + } + img := &models.DocumentImage{ + DocumentID: document.ID, + ImageURL: urlForUploadKey(s.storageService, pu.B2Key), + PendingUploadID: &pu.ID, + } + if err := s.documentRepo.WithContext(ctx).CreateDocumentImage(img); err != nil { + // Don't fail the whole document for an image insert failure; + // matches the legacy ImageURLs behavior. The orphaned upload + // row is benign (still claimed, just unreferenced). + continue + } + } + // Reload with relations document, err = s.documentRepo.WithContext(ctx).FindByID(document.ID) if err != nil { diff --git a/internal/services/storage_backend_s3.go b/internal/services/storage_backend_s3.go index 572d49c..264a3bc 100644 --- a/internal/services/storage_backend_s3.go +++ b/internal/services/storage_backend_s3.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "net/url" "time" "github.com/minio/minio-go/v7" @@ -101,3 +102,81 @@ func (b *S3Backend) ReadStream(key string) (io.ReadCloser, error) { } return obj, nil } + +// PresignedPostResult is the data a client needs to perform a direct multipart +// POST to S3-compatible storage. The caller assembles a multipart/form-data +// request with the fields below as form parts (in order) and the file last. +type PresignedPostResult struct { + URL string // e.g. https://s3.us-east-005.backblazeb2.com/honeyDueProd + Fields map[string]string // policy, x-amz-*, key, Content-Type, etc. +} + +// PresignedPost generates a POST policy that constrains uploads at the protocol +// level: only the named key, only the named content-type, only sizes within +// the requested range. S3 (and B2's S3-compatible endpoint) reject anything +// that doesn't satisfy every condition before accepting the body. +// +// minBytes/maxBytes are inclusive. The returned URL + Fields can be sent +// straight to the client. +func (b *S3Backend) PresignedPost(ctx context.Context, key, contentType string, minBytes, maxBytes int64, ttl time.Duration) (*PresignedPostResult, error) { + policy := minio.NewPostPolicy() + if err := policy.SetBucket(b.bucket); err != nil { + return nil, fmt.Errorf("set bucket: %w", err) + } + if err := policy.SetKey(key); err != nil { + return nil, fmt.Errorf("set key: %w", err) + } + if err := policy.SetContentType(contentType); err != nil { + return nil, fmt.Errorf("set content-type: %w", err) + } + if err := policy.SetContentLengthRange(minBytes, maxBytes); err != nil { + return nil, fmt.Errorf("set content-length-range: %w", err) + } + if err := policy.SetExpires(time.Now().UTC().Add(ttl)); err != nil { + return nil, fmt.Errorf("set expires: %w", err) + } + + u, fields, err := b.client.PresignedPostPolicy(ctx, policy) + if err != nil { + return nil, fmt.Errorf("presign post policy: %w", err) + } + return &PresignedPostResult{ + URL: stripQuery(u), + Fields: fields, + }, nil +} + +// Stat returns object metadata without fetching the body. Used by the attach +// path to verify the uploaded object's size and content-type match what the +// client claimed when requesting the presign. +type ObjectInfo struct { + Size int64 + ContentType string + ETag string +} + +func (b *S3Backend) Stat(ctx context.Context, key string) (*ObjectInfo, error) { + info, err := b.client.StatObject(ctx, b.bucket, key, minio.StatObjectOptions{}) + if err != nil { + return nil, fmt.Errorf("stat S3 object: %w", err) + } + return &ObjectInfo{ + Size: info.Size, + ContentType: info.ContentType, + ETag: info.ETag, + }, nil +} + +// stripQuery returns the URL with its query string removed. minio-go encodes +// the policy/signature into both the form fields and the query; the form +// fields are the source of truth for POST policy uploads, and many clients +// (including Apple's NSURLSession) will reject the request if the same +// signature appears in both places. +func stripQuery(u *url.URL) string { + if u == nil { + return "" + } + clone := *u + clone.RawQuery = "" + return clone.String() +} diff --git a/internal/services/storage_service.go b/internal/services/storage_service.go index d52bb48..8a7c130 100644 --- a/internal/services/storage_service.go +++ b/internal/services/storage_service.go @@ -40,6 +40,16 @@ type UploadResult struct { MimeType string `json:"mime_type"` } +// S3Backend returns the underlying S3-compatible backend if one is in use, +// or nil for local-disk storage. Used by the presigned-URL upload path which +// requires features (POST policies, StatObject) only available on S3. +func (s *StorageService) S3Backend() *S3Backend { + if b, ok := s.backend.(*S3Backend); ok { + return b + } + return nil +} + // NewStorageService creates a new storage service with the appropriate backend. // If S3 config is set, uses S3-compatible storage (B2, MinIO). // Otherwise, uses local filesystem. diff --git a/internal/services/task_service.go b/internal/services/task_service.go index f8a104a..09fc331 100644 --- a/internal/services/task_service.go +++ b/internal/services/task_service.go @@ -37,9 +37,18 @@ type TaskService struct { notificationService *NotificationService emailService *EmailService storageService *StorageService + uploadService *UploadService // optional — only set when S3 storage is configured cache *CacheService } +// SetUploadService wires the presigned-URL upload service so CreateCompletion +// can claim pending_uploads rows by id and convert them into completion image +// rows. Optional: with local-disk storage there's no presigned flow and the +// service is left nil. +func (s *TaskService) SetUploadService(us *UploadService) { + s.uploadService = us +} + // SetCacheService wires Redis caching for residence-ID lookups. func (s *TaskService) SetCacheService(cache *CacheService) { s.cache = cache @@ -694,6 +703,21 @@ func (s *TaskService) CreateCompletion(ctx context.Context, req *requests.Create task.InProgress = false } + // New presigned-URL path: claim pending_uploads rows that the client + // already POSTed to B2. We do this BEFORE the txn because VerifyAndClaim + // HEADs each B2 object — we don't want to hold a Postgres transaction + // open across HTTP calls. If the txn rolls back later, the rows stay + // claimed but unreferenced; they're cents of storage and visible via + // admin queries if cleanup ever matters. + var claimedUploads []models.PendingUpload + if len(req.UploadIDs) > 0 && s.uploadService != nil { + var claimErr error + claimedUploads, claimErr = s.uploadService.VerifyAndClaim(ctx, userID, req.UploadIDs) + if claimErr != nil { + return nil, claimErr + } + } + // P1-5 + B-07: Wrap completion creation, task update, and image creation // in a single transaction for atomicity. If any operation fails, all are rolled back. txErr := s.taskRepo.WithContext(ctx).DB().Transaction(func(tx *gorm.DB) error { @@ -703,7 +727,12 @@ func (s *TaskService) CreateCompletion(ctx context.Context, req *requests.Create if err := s.taskRepo.WithContext(ctx).UpdateTx(tx, task); err != nil { return err } - // B-07: Create images inside the same transaction as completion + // B-07: Create images inside the same transaction as completion. + // Two sources contribute, both produce TaskCompletionImage rows: + // 1. Legacy multipart path — client uploaded via the API and got + // back URLs in req.ImageURLs. + // 2. New presigned path — client uploaded direct to B2 and we + // claimed the pending_uploads rows above. for _, imageURL := range req.ImageURLs { if imageURL != "" { img := &models.TaskCompletionImage{ @@ -715,6 +744,17 @@ func (s *TaskService) CreateCompletion(ctx context.Context, req *requests.Create } } } + for i := range claimedUploads { + pu := claimedUploads[i] + img := &models.TaskCompletionImage{ + CompletionID: completion.ID, + ImageURL: urlForUploadKey(s.storageService, pu.B2Key), + PendingUploadID: &pu.ID, + } + if err := tx.Create(img).Error; err != nil { + return fmt.Errorf("failed to create completion image from upload %d: %w", pu.ID, err) + } + } return nil }) if txErr != nil { diff --git a/internal/services/upload_service.go b/internal/services/upload_service.go new file mode 100644 index 0000000..0884672 --- /dev/null +++ b/internal/services/upload_service.go @@ -0,0 +1,366 @@ +package services + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/google/uuid" + "github.com/redis/go-redis/v9" + "github.com/rs/zerolog/log" + + "github.com/treytartt/honeydue-api/internal/apperrors" + "github.com/treytartt/honeydue-api/internal/config" + "github.com/treytartt/honeydue-api/internal/dto/responses" + "github.com/treytartt/honeydue-api/internal/models" + "github.com/treytartt/honeydue-api/internal/repositories" +) + +// Upload policy constants. These are NOT tier-differentiated by request — a +// single number applies to every authenticated user regardless of free/pro +// status. Adjust here only; per-tier overrides should be added via +// config/SubscriptionSettings rather than splitting the constants. +const ( + UploadMaxBytes = 10 * 1024 * 1024 // 10 MiB per single object + UploadPresignSlackBytes = 256 // ± slack for content-length-range + UploadPresignTTL = 15 * time.Minute // signed URL lifetime + UploadCleanupTTL = 24 * time.Hour // unclaimed row reap window + UploadPresignsPerHour = 50 // per-user rate cap + UploadConcurrentUnclaimed = 10 // per-user in-flight cap + uploadRateLimitRedisPrefix = "upload:presign:" +) + +// allowedContentTypes maps each category to the set of mime types accepted +// for the upload. Anything outside this set is rejected before signing. +var allowedContentTypes = map[models.UploadCategory]map[string]bool{ + models.UploadCategoryCompletion: { + "image/jpeg": true, + "image/png": true, + "image/heic": true, + "image/heif": true, + "image/webp": true, + }, + models.UploadCategoryDocumentImage: { + "image/jpeg": true, + "image/png": true, + "image/heic": true, + "image/heif": true, + "image/webp": true, + }, + models.UploadCategoryDocumentFile: { + "image/jpeg": true, + "image/png": true, + "image/heic": true, + "image/heif": true, + "image/webp": true, + "application/pdf": true, + }, +} + +// uploadCategoryToSubdir maps the category to the path prefix used inside the +// bucket. Mirrors the pattern in storage_service.go. +var uploadCategoryToSubdir = map[models.UploadCategory]string{ + models.UploadCategoryCompletion: "completions", + models.UploadCategoryDocumentImage: "documents", + models.UploadCategoryDocumentFile: "documents", +} + +// UploadService orchestrates presigned-URL upload sessions. It owns: +// +// - validation (size, mime, category) +// - quota + rate-limit enforcement (Redis preferred; DB fallback) +// - signing the B2 POST policy via the existing S3Backend +// - tracking the session in pending_uploads so the attach path can verify +// and claim the object +type UploadService struct { + repo *repositories.PendingUploadRepository + s3 *S3Backend + cfg *config.StorageConfig + cache *CacheService + redisEnabled bool +} + +// NewUploadService wires the service. s3 may be nil if storage isn't +// configured for S3 (local-disk dev mode); presign requests will then +// return a clear error. +func NewUploadService( + repo *repositories.PendingUploadRepository, + s3 *S3Backend, + cfg *config.StorageConfig, + cache *CacheService, +) *UploadService { + return &UploadService{ + repo: repo, + s3: s3, + cfg: cfg, + cache: cache, + redisEnabled: cache != nil, + } +} + +// Presign validates the request, enforces quota + rate limits, signs a B2 +// POST policy, persists the pending_uploads row, and returns the response +// the client needs to perform the upload. +// +// Errors are mapped to apperrors so the HTTP layer can return the right +// status code: +// +// 413 — content_length over UploadMaxBytes +// 422 — content_type not allowed for the category +// 429 — over the rate limit OR over the concurrent in-flight cap +// 500 — storage not configured / signing failure +func (s *UploadService) Presign( + ctx context.Context, + userID uint, + category models.UploadCategory, + contentType string, + contentLength int64, +) (*responses.PresignUploadResponse, error) { + if s.s3 == nil { + return nil, apperrors.Internal(fmt.Errorf("upload: S3 backend not configured")) + } + + // Size cap. 413 semantically; we use BadRequest because that's the + // pattern across the codebase for dto-validation rejections. + if contentLength <= 0 || contentLength > UploadMaxBytes { + return nil, apperrors.BadRequest("error.upload_too_large") + } + + // Mime check is per-category — completion photos can't be PDFs. + allowed, ok := allowedContentTypes[category] + if !ok { + return nil, apperrors.BadRequest("error.upload_invalid_category") + } + if !allowed[strings.ToLower(contentType)] { + return nil, apperrors.BadRequest("error.upload_unsupported_content_type") + } + + now := time.Now().UTC() + + // Concurrency cap: how many sessions does this user have unclaimed and + // not yet expired? Cheap COUNT, indexed. + active, err := s.repo.WithContext(ctx).CountUnclaimedActiveForUser(userID, now) + if err != nil { + return nil, apperrors.Internal(err) + } + if active >= UploadConcurrentUnclaimed { + return nil, apperrors.TooManyRequests("error.upload_too_many_in_flight") + } + + // Rate limit: presigns issued in the last hour. Redis when available + // (cheap atomic counter); DB fallback for dev/test. + if err := s.checkRateLimit(ctx, userID, now); err != nil { + return nil, err + } + + // Pick a stable storage key. UUID + extension is enough — no need for the + // timestamp-prefix trick in storage_service.go because we never list this + // path; lookups go through pending_uploads.id. + subdir := uploadCategoryToSubdir[category] + if subdir == "" { + subdir = "uploads" + } + ext := extensionForContentType(contentType) + key := fmt.Sprintf("uploads/%s/%d/%s%s", subdir, userID, uuid.New().String(), ext) + + // Sign the POST policy. The slack window lets the client encode once; + // the server still rejects anything materially different from claimed. + minB := contentLength - UploadPresignSlackBytes + if minB < 0 { + minB = 0 + } + maxB := contentLength + UploadPresignSlackBytes + if maxB > UploadMaxBytes { + maxB = UploadMaxBytes + } + + post, err := s.s3.PresignedPost(ctx, key, contentType, minB, maxB, UploadPresignTTL) + if err != nil { + return nil, apperrors.Internal(fmt.Errorf("presign upload: %w", err)) + } + + expiresAt := now.Add(UploadPresignTTL) + row := &models.PendingUpload{ + UserID: userID, + Category: category, + B2Key: key, + ContentType: strings.ToLower(contentType), + ExpectedBytes: contentLength, + ExpiresAt: expiresAt, + } + if err := s.repo.WithContext(ctx).Create(row); err != nil { + return nil, apperrors.Internal(err) + } + + return &responses.PresignUploadResponse{ + ID: row.ID, + URL: post.URL, + Fields: post.Fields, + Key: key, + ExpiresAt: expiresAt.Format(time.RFC3339), + }, nil +} + +// VerifyAndClaim is called from CreateCompletion / CreateDocument. It locks +// the pending rows, HEADs each B2 object to confirm size + content-type, +// flips claimed_at, and returns the verified rows so the caller can write +// task_completion_image / document_image rows referencing them. +// +// If any verification fails, the entire batch is rejected and no rows are +// claimed — atomic semantics matter so one bad upload doesn't half-attach. +func (s *UploadService) VerifyAndClaim( + ctx context.Context, + userID uint, + uploadIDs []uint, +) ([]models.PendingUpload, error) { + if len(uploadIDs) == 0 { + return nil, nil + } + if s.s3 == nil { + return nil, apperrors.Internal(fmt.Errorf("upload: S3 backend not configured")) + } + + rows, err := s.repo.WithContext(ctx).FindUnclaimedForUser(userID, uploadIDs) + if err != nil { + return nil, apperrors.Internal(err) + } + if len(rows) != len(uploadIDs) { + // Either some IDs don't exist, belong to another user, or are already + // claimed. We don't differentiate — same status either way. + return nil, apperrors.NotFound("error.upload_not_found") + } + + now := time.Now().UTC() + verified := make([]models.PendingUpload, 0, len(rows)) + for i := range rows { + r := rows[i] + if r.IsExpired(now) { + return nil, apperrors.BadRequest("error.upload_expired") + } + + info, err := s.s3.Stat(ctx, r.B2Key) + if err != nil { + log.Warn().Err(err).Uint("upload_id", r.ID).Str("key", r.B2Key).Msg("upload claim: stat failed") + return nil, apperrors.BadRequest("error.upload_not_uploaded") + } + // Size must match the claimed bytes within the policy slack window. + // Anything outside it means the client lied or B2 misreported. + if info.Size < r.ExpectedBytes-UploadPresignSlackBytes || + info.Size > r.ExpectedBytes+UploadPresignSlackBytes { + return nil, apperrors.BadRequest("error.upload_size_mismatch") + } + + if err := s.repo.WithContext(ctx).MarkClaimed(r.ID, info.Size, now); err != nil { + return nil, apperrors.Internal(err) + } + r.ActualBytes = &info.Size + r.ClaimedAt = &now + verified = append(verified, r) + } + return verified, nil +} + +// CleanupExpired finds unclaimed rows past their expires_at, deletes the +// corresponding B2 objects, and removes the rows. Called from the Asynq +// hourly cron in cmd/worker. +// +// Returns the number of rows reaped. Errors per row are logged and the loop +// continues — one stuck object shouldn't block the others. +func (s *UploadService) CleanupExpired(ctx context.Context, batchLimit int) (int, error) { + if s.s3 == nil { + return 0, fmt.Errorf("upload: S3 backend not configured") + } + now := time.Now().UTC() + rows, err := s.repo.WithContext(ctx).FindExpiredUnclaimed(now, batchLimit) + if err != nil { + return 0, fmt.Errorf("find expired: %w", err) + } + reaped := 0 + for _, r := range rows { + if err := s.s3.Delete(r.B2Key); err != nil { + log.Warn().Err(err).Uint("upload_id", r.ID).Str("key", r.B2Key).Msg("upload cleanup: B2 delete failed (continuing)") + // Continue to row delete anyway — bucket lifecycle backstop will + // pick up the orphaned B2 object after 7 days. + } + if err := s.repo.WithContext(ctx).DeleteByID(r.ID); err != nil { + log.Warn().Err(err).Uint("upload_id", r.ID).Msg("upload cleanup: row delete failed") + continue + } + reaped++ + } + return reaped, nil +} + +// checkRateLimit enforces UploadPresignsPerHour using Redis (preferred) or +// the DB as a fallback. Returns a 429 apperror when over. +func (s *UploadService) checkRateLimit(ctx context.Context, userID uint, now time.Time) error { + if s.redisEnabled { + bucket := now.Truncate(time.Hour).Unix() + key := fmt.Sprintf("%s%d:%d", uploadRateLimitRedisPrefix, userID, bucket) + client := s.cache.Client() + count, err := client.Incr(ctx, key).Result() + if err == nil { + // Best-effort EXPIRE; only set on first INCR (count == 1). + if count == 1 { + _ = client.Expire(ctx, key, 2*time.Hour).Err() + } + if count > UploadPresignsPerHour { + return apperrors.TooManyRequests("error.upload_rate_limit") + } + return nil + } + // Redis transient failure — fall through to DB. Don't open the gates, + // don't crash the request. + if err != redis.Nil { + log.Warn().Err(err).Msg("upload rate limit: redis unavailable, falling back to DB") + } + } + since := now.Add(-1 * time.Hour) + count, err := s.repo.WithContext(ctx).CountCreatedSinceForUser(userID, since) + if err != nil { + return apperrors.Internal(err) + } + if count >= UploadPresignsPerHour { + return apperrors.TooManyRequests("error.upload_rate_limit") + } + return nil +} + +// urlForUploadKey returns the public URL for a stored object given its B2 +// key. Mirrors the URL format used by StorageService.Upload so the existing +// media handler can serve presigned-uploaded objects without changes. +// +// If storageService is nil (shouldn't happen in production but defensive) +// we fall back to returning the raw key prefixed with "/" — better than an +// empty URL on a clearly buggy code path. +func urlForUploadKey(storageService *StorageService, b2Key string) string { + if storageService == nil || storageService.cfg == nil { + return "/" + b2Key + } + base := strings.TrimRight(storageService.cfg.BaseURL, "/") + return fmt.Sprintf("%s/%s", base, b2Key) +} + +// extensionForContentType picks a sensible file extension for the storage +// key. Falls back to .bin so the key is always non-empty even for unknown +// types — the policy will reject anything with the wrong content-type +// regardless of the extension we pick here. +func extensionForContentType(contentType string) string { + switch strings.ToLower(strings.TrimSpace(contentType)) { + case "image/jpeg", "image/jpg": + return ".jpg" + case "image/png": + return ".png" + case "image/heic": + return ".heic" + case "image/heif": + return ".heif" + case "image/webp": + return ".webp" + case "application/pdf": + return ".pdf" + default: + return ".bin" + } +} diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index fa87d92..70f34ac 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -81,6 +81,7 @@ func SetupTestDB(t *testing.T) *gorm.DB { &models.TierLimits{}, &models.FeatureBenefit{}, &models.UpgradeTrigger{}, + &models.PendingUpload{}, &models.Promotion{}, &models.AuditLog{}, &models.TaskTemplate{}, diff --git a/internal/worker/jobs/handler.go b/internal/worker/jobs/handler.go index 2163369..9426af2 100644 --- a/internal/worker/jobs/handler.go +++ b/internal/worker/jobs/handler.go @@ -26,6 +26,7 @@ const ( TypeSendPush = "push:send" TypeOnboardingEmails = "email:onboarding" TypeReminderLogCleanup = "maintenance:reminder_log_cleanup" + TypeUploadCleanup = "maintenance:upload_cleanup" // Reaps expired pending_uploads ) // Handler handles background job processing @@ -39,9 +40,17 @@ type Handler struct { emailService EmailSender notificationService NotificationSender onboardingService OnboardingEmailSender + uploadService *services.UploadService config *config.Config } +// SetUploadService wires the upload service so HandleUploadCleanup can reap +// expired pending_uploads rows. Optional; nil-safe — the cleanup handler +// no-ops when not configured (e.g. local-disk dev environments). +func (h *Handler) SetUploadService(us *services.UploadService) { + h.uploadService = us +} + // NewHandler creates a new job handler func NewHandler(db *gorm.DB, pushClient *push.Client, emailService *services.EmailService, notificationService *services.NotificationService, cfg *config.Config) *Handler { h := &Handler{ @@ -647,3 +656,24 @@ func (h *Handler) HandleReminderLogCleanup(ctx context.Context, task *asynq.Task log.Info().Int64("deleted", deleted).Msg("Reminder log cleanup completed") return nil } + +// HandleUploadCleanup reaps expired pending_uploads rows and their B2 objects. +// +// Runs hourly. Each tick processes up to 500 expired sessions; if the queue +// is deeper than that, the next hourly run picks up the rest. The B2 bucket +// also has a 7-day lifecycle rule on the uploads/ prefix as a backstop in +// case this worker is offline for long stretches. +func (h *Handler) HandleUploadCleanup(ctx context.Context, task *asynq.Task) error { + if h.uploadService == nil { + log.Debug().Msg("Upload cleanup skipped: upload service not configured (local-disk storage)") + return nil + } + log.Info().Msg("Processing pending_uploads cleanup...") + reaped, err := h.uploadService.CleanupExpired(ctx, 500) + if err != nil { + log.Error().Err(err).Msg("Pending uploads cleanup failed") + return err + } + log.Info().Int("reaped", reaped).Msg("Pending uploads cleanup completed") + return nil +} diff --git a/migrations/000002_pending_uploads.sql b/migrations/000002_pending_uploads.sql new file mode 100644 index 0000000..f30671e --- /dev/null +++ b/migrations/000002_pending_uploads.sql @@ -0,0 +1,39 @@ +-- +goose Up +-- pending_uploads tracks short-lived presigned-URL upload sessions for direct +-- client-to-B2 uploads. A row is created when the client requests a presigned +-- POST policy, and either claimed (linked to a task_completion_image or +-- document_image) or reaped by the cleanup worker after expiry. +CREATE TABLE pending_uploads ( + id BIGSERIAL PRIMARY KEY, + user_id BIGINT NOT NULL REFERENCES auth_user(id) ON DELETE CASCADE, + category VARCHAR(32) NOT NULL, + b2_key VARCHAR(255) NOT NULL UNIQUE, + content_type VARCHAR(127) NOT NULL, + expected_bytes BIGINT NOT NULL, + actual_bytes BIGINT, + claimed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + expires_at TIMESTAMPTZ NOT NULL +); + +-- Quota lookups: SUM/COUNT by user, ordered by recency. +CREATE INDEX idx_pending_uploads_user_created + ON pending_uploads (user_id, created_at DESC); + +-- Cleanup worker scan: only unclaimed expired rows. Partial index keeps it tiny. +CREATE INDEX idx_pending_uploads_cleanup + ON pending_uploads (expires_at) WHERE claimed_at IS NULL; + +-- task_completion_image and document_image gain an optional FK to the +-- pending_uploads row that produced them. Nullable so legacy rows (uploaded +-- through the multipart path) keep working. +ALTER TABLE task_taskcompletionimage + ADD COLUMN pending_upload_id BIGINT REFERENCES pending_uploads(id) ON DELETE SET NULL; + +ALTER TABLE task_documentimage + ADD COLUMN pending_upload_id BIGINT REFERENCES pending_uploads(id) ON DELETE SET NULL; + +-- +goose Down +ALTER TABLE task_documentimage DROP COLUMN IF EXISTS pending_upload_id; +ALTER TABLE task_taskcompletionimage DROP COLUMN IF EXISTS pending_upload_id; +DROP TABLE IF EXISTS pending_uploads;