Introduces a StorageBackend interface with local filesystem and S3 implementations. The StorageService delegates raw I/O to the backend while keeping validation, encryption, and URL generation unchanged. Backend selection is config-driven: set B2_ENDPOINT + B2_KEY_ID + B2_APP_KEY + B2_BUCKET_NAME for S3 mode, or STORAGE_UPLOAD_DIR for local mode. STORAGE_USE_SSL=false for in-cluster MinIO (HTTP). All existing tests pass unchanged — the local backend preserves identical behavior to the previous direct-filesystem implementation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
104 lines
2.7 KiB
Go
104 lines
2.7 KiB
Go
package services
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/minio/minio-go/v7"
|
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
// S3Backend stores files in S3-compatible storage (Backblaze B2, MinIO, AWS S3).
|
|
type S3Backend struct {
|
|
client *minio.Client
|
|
bucket string
|
|
}
|
|
|
|
// NewS3Backend creates an S3-compatible storage backend.
|
|
func NewS3Backend(endpoint, keyID, appKey, bucket string, useSSL bool, region string) (*S3Backend, error) {
|
|
if region == "" {
|
|
region = "us-east-1"
|
|
}
|
|
|
|
client, err := minio.New(endpoint, &minio.Options{
|
|
Creds: credentials.NewStaticV4(keyID, appKey, ""),
|
|
Secure: useSSL,
|
|
Region: region,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create S3 client: %w", err)
|
|
}
|
|
|
|
// Verify bucket exists
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
exists, err := client.BucketExists(ctx, bucket)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to check bucket %q: %w", bucket, err)
|
|
}
|
|
if !exists {
|
|
return nil, fmt.Errorf("bucket %q does not exist", bucket)
|
|
}
|
|
|
|
log.Info().
|
|
Str("endpoint", endpoint).
|
|
Str("bucket", bucket).
|
|
Bool("ssl", useSSL).
|
|
Msg("S3 storage backend initialized")
|
|
|
|
return &S3Backend{client: client, bucket: bucket}, nil
|
|
}
|
|
|
|
func (b *S3Backend) Write(key string, data []byte) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
|
defer cancel()
|
|
|
|
_, err := b.client.PutObject(ctx, b.bucket, key, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to upload to S3: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *S3Backend) Read(key string) ([]byte, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
|
defer cancel()
|
|
|
|
obj, err := b.client.GetObject(ctx, b.bucket, key, minio.GetObjectOptions{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get S3 object: %w", err)
|
|
}
|
|
defer obj.Close()
|
|
|
|
data, err := io.ReadAll(obj)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read S3 object: %w", err)
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
func (b *S3Backend) Delete(key string) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
err := b.client.RemoveObject(ctx, b.bucket, key, minio.RemoveObjectOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete S3 object: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *S3Backend) ReadStream(key string) (io.ReadCloser, error) {
|
|
ctx := context.Background() // caller controls lifetime by closing the reader
|
|
obj, err := b.client.GetObject(ctx, b.bucket, key, minio.GetObjectOptions{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get S3 object stream: %w", err)
|
|
}
|
|
return obj, nil
|
|
}
|