29c9014a33
Replaces the multipart-via-API path for image uploads with a three-step
direct-to-storage flow:
1. Client POSTs /api/uploads/presign with content_length + content_type;
server validates size (10 MB cap), mime allow-list per category, rate
limit (50/hour/user via Redis sliding window), and concurrent unclaimed
cap (10 in-flight per user). On success it persists a pending_uploads
row, signs an S3 POST policy with content-length-range bound to the
claimed length ±256 bytes, and returns the URL+fields.
2. Client POSTs the bytes directly to B2 using the signed policy. B2
enforces size, content-type, and key match before accepting.
3. Client passes upload_ids[] to /api/task-completions/ or /api/documents/.
Service HEADs each B2 object, verifies size matches expected_bytes
within slack, marks pending_uploads claimed_at, and creates the
associated TaskCompletionImage / DocumentImage rows.
Bytes never traverse our API server. The 1 MB Echo BodyLimit middleware
that was rejecting all task-completion image uploads becomes irrelevant
for this path. Existing multipart endpoints stay functional alongside,
soak-testing the new path before legacy removal.
Cleanup:
- cmd/worker registers a new hourly cron (TypeUploadCleanup, "30 * * * *")
that reaps pending_uploads where claimed_at IS NULL AND expires_at < NOW().
Reaps both the B2 object and the row.
- B2 bucket lifecycle rule on `uploads/` prefix (7 days hide → 1 day delete)
documented in deploy-k3s/manifests/b2-lifecycle.md as a backstop.
Schema:
- migrations/000002_pending_uploads.sql adds the table + partial index for
cleanup + nullable pending_upload_id FKs on task_taskcompletionimage and
task_documentimage.
Policy (single tier, no free/pro split):
- 10 MB cap per upload
- 50 presigns/hour/user
- 10 concurrent unclaimed uploads/user
- allow-list: jpeg/png/heic/heif/webp for image categories;
+ pdf for document_file
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
122 lines
4.0 KiB
Go
122 lines
4.0 KiB
Go
package repositories
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"gorm.io/gorm"
|
|
"gorm.io/gorm/clause"
|
|
|
|
"github.com/treytartt/honeydue-api/internal/models"
|
|
)
|
|
|
|
// PendingUploadRepository handles persistence for upload sessions.
|
|
type PendingUploadRepository struct {
|
|
db *gorm.DB
|
|
}
|
|
|
|
// NewPendingUploadRepository constructs a repo bound to the given GORM handle.
|
|
func NewPendingUploadRepository(db *gorm.DB) *PendingUploadRepository {
|
|
return &PendingUploadRepository{db: db}
|
|
}
|
|
|
|
// WithContext returns a session bound to ctx so DB spans nest under the
|
|
// request span in tracing.
|
|
func (r *PendingUploadRepository) WithContext(ctx context.Context) *PendingUploadRepository {
|
|
return &PendingUploadRepository{db: r.db.WithContext(ctx)}
|
|
}
|
|
|
|
// Create inserts a new upload session.
|
|
func (r *PendingUploadRepository) Create(p *models.PendingUpload) error {
|
|
return r.db.Create(p).Error
|
|
}
|
|
|
|
// FindByID returns a single session, or gorm.ErrRecordNotFound.
|
|
func (r *PendingUploadRepository) FindByID(id uint) (*models.PendingUpload, error) {
|
|
var p models.PendingUpload
|
|
if err := r.db.First(&p, id).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
return &p, nil
|
|
}
|
|
|
|
// FindUnclaimedForUser locks and returns rows belonging to userID matching
|
|
// ids, where claimed_at IS NULL. Used by the attach path to ensure exactly
|
|
// one claim per row even under concurrent requests. Postgres applies real
|
|
// row locks; SQLite (test harness) silently ignores the clause.
|
|
//
|
|
// Caller must run inside a transaction for the lock to outlive the call.
|
|
func (r *PendingUploadRepository) FindUnclaimedForUser(userID uint, ids []uint) ([]models.PendingUpload, error) {
|
|
if len(ids) == 0 {
|
|
return nil, nil
|
|
}
|
|
var rows []models.PendingUpload
|
|
err := r.db.
|
|
Clauses(clause.Locking{Strength: "UPDATE"}).
|
|
Where("user_id = ? AND id IN ? AND claimed_at IS NULL", userID, ids).
|
|
Find(&rows).Error
|
|
return rows, err
|
|
}
|
|
|
|
// MarkClaimed writes actual_bytes + claimed_at. Returns gorm.ErrRecordNotFound
|
|
// if the row was claimed by another transaction in the meantime.
|
|
func (r *PendingUploadRepository) MarkClaimed(id uint, actualBytes int64, now time.Time) error {
|
|
res := r.db.Model(&models.PendingUpload{}).
|
|
Where("id = ? AND claimed_at IS NULL", id).
|
|
Updates(map[string]interface{}{
|
|
"actual_bytes": actualBytes,
|
|
"claimed_at": now,
|
|
})
|
|
if res.Error != nil {
|
|
return res.Error
|
|
}
|
|
if res.RowsAffected == 0 {
|
|
return gorm.ErrRecordNotFound
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CountUnclaimedActiveForUser returns how many in-flight (unclaimed,
|
|
// not-yet-expired) sessions a user holds. Used for the concurrency cap.
|
|
func (r *PendingUploadRepository) CountUnclaimedActiveForUser(userID uint, now time.Time) (int64, error) {
|
|
var n int64
|
|
err := r.db.Model(&models.PendingUpload{}).
|
|
Where("user_id = ? AND claimed_at IS NULL AND expires_at > ?", userID, now).
|
|
Count(&n).Error
|
|
return n, err
|
|
}
|
|
|
|
// CountCreatedSinceForUser returns the number of presign requests issued in
|
|
// the last `since` window. The service layer uses Redis for the rate-limit
|
|
// hot path; this is a fallback / consistency check.
|
|
func (r *PendingUploadRepository) CountCreatedSinceForUser(userID uint, since time.Time) (int64, error) {
|
|
var n int64
|
|
err := r.db.Model(&models.PendingUpload{}).
|
|
Where("user_id = ? AND created_at > ?", userID, since).
|
|
Count(&n).Error
|
|
return n, err
|
|
}
|
|
|
|
// FindExpiredUnclaimed returns up to `limit` sessions ready to reap. Caller
|
|
// is responsible for deleting the corresponding B2 objects + rows.
|
|
func (r *PendingUploadRepository) FindExpiredUnclaimed(now time.Time, limit int) ([]models.PendingUpload, error) {
|
|
var rows []models.PendingUpload
|
|
err := r.db.
|
|
Where("claimed_at IS NULL AND expires_at < ?", now).
|
|
Order("expires_at ASC").
|
|
Limit(limit).
|
|
Find(&rows).Error
|
|
return rows, err
|
|
}
|
|
|
|
// DeleteByID removes a single session row.
|
|
func (r *PendingUploadRepository) DeleteByID(id uint) error {
|
|
return r.db.Delete(&models.PendingUpload{}, id).Error
|
|
}
|
|
|
|
// Transaction runs fn inside a DB transaction. Mirrors how task_service
|
|
// composes multi-step writes.
|
|
func (r *PendingUploadRepository) Transaction(fn func(tx *gorm.DB) error) error {
|
|
return r.db.Transaction(fn)
|
|
}
|