package repositories import ( "context" "time" "gorm.io/gorm" "gorm.io/gorm/clause" "github.com/treytartt/honeydue-api/internal/models" ) // PendingUploadRepository handles persistence for upload sessions. type PendingUploadRepository struct { db *gorm.DB } // NewPendingUploadRepository constructs a repo bound to the given GORM handle. func NewPendingUploadRepository(db *gorm.DB) *PendingUploadRepository { return &PendingUploadRepository{db: db} } // WithContext returns a session bound to ctx so DB spans nest under the // request span in tracing. func (r *PendingUploadRepository) WithContext(ctx context.Context) *PendingUploadRepository { return &PendingUploadRepository{db: r.db.WithContext(ctx)} } // Create inserts a new upload session. func (r *PendingUploadRepository) Create(p *models.PendingUpload) error { return r.db.Create(p).Error } // FindByID returns a single session, or gorm.ErrRecordNotFound. func (r *PendingUploadRepository) FindByID(id uint) (*models.PendingUpload, error) { var p models.PendingUpload if err := r.db.First(&p, id).Error; err != nil { return nil, err } return &p, nil } // FindUnclaimedForUser locks and returns rows belonging to userID matching // ids, where claimed_at IS NULL. Used by the attach path to ensure exactly // one claim per row even under concurrent requests. Postgres applies real // row locks; SQLite (test harness) silently ignores the clause. // // Caller must run inside a transaction for the lock to outlive the call. func (r *PendingUploadRepository) FindUnclaimedForUser(userID uint, ids []uint) ([]models.PendingUpload, error) { if len(ids) == 0 { return nil, nil } var rows []models.PendingUpload err := r.db. Clauses(clause.Locking{Strength: "UPDATE"}). Where("user_id = ? AND id IN ? AND claimed_at IS NULL", userID, ids). Find(&rows).Error return rows, err } // MarkClaimed writes actual_bytes + claimed_at. Returns gorm.ErrRecordNotFound // if the row was claimed by another transaction in the meantime. func (r *PendingUploadRepository) MarkClaimed(id uint, actualBytes int64, now time.Time) error { res := r.db.Model(&models.PendingUpload{}). Where("id = ? AND claimed_at IS NULL", id). Updates(map[string]interface{}{ "actual_bytes": actualBytes, "claimed_at": now, }) if res.Error != nil { return res.Error } if res.RowsAffected == 0 { return gorm.ErrRecordNotFound } return nil } // CountUnclaimedActiveForUser returns how many in-flight (unclaimed, // not-yet-expired) sessions a user holds. Used for the concurrency cap. func (r *PendingUploadRepository) CountUnclaimedActiveForUser(userID uint, now time.Time) (int64, error) { var n int64 err := r.db.Model(&models.PendingUpload{}). Where("user_id = ? AND claimed_at IS NULL AND expires_at > ?", userID, now). Count(&n).Error return n, err } // CountCreatedSinceForUser returns the number of presign requests issued in // the last `since` window. The service layer uses Redis for the rate-limit // hot path; this is a fallback / consistency check. func (r *PendingUploadRepository) CountCreatedSinceForUser(userID uint, since time.Time) (int64, error) { var n int64 err := r.db.Model(&models.PendingUpload{}). Where("user_id = ? AND created_at > ?", userID, since). Count(&n).Error return n, err } // FindExpiredUnclaimed returns up to `limit` sessions ready to reap. Caller // is responsible for deleting the corresponding B2 objects + rows. func (r *PendingUploadRepository) FindExpiredUnclaimed(now time.Time, limit int) ([]models.PendingUpload, error) { var rows []models.PendingUpload err := r.db. Where("claimed_at IS NULL AND expires_at < ?", now). Order("expires_at ASC"). Limit(limit). Find(&rows).Error return rows, err } // DeleteByID removes a single session row. func (r *PendingUploadRepository) DeleteByID(id uint) error { return r.db.Delete(&models.PendingUpload{}, id).Error } // Transaction runs fn inside a DB transaction. Mirrors how task_service // composes multi-step writes. func (r *PendingUploadRepository) Transaction(fn func(tx *gorm.DB) error) error { return r.db.Transaction(fn) }