diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 1db3156..7066d0e 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -232,7 +232,11 @@ func main() { mux.HandleFunc(jobs.TypeOnboardingEmails, jobHandler.HandleOnboardingEmails) mux.HandleFunc(jobs.TypeReminderLogCleanup, jobHandler.HandleReminderLogCleanup) mux.HandleFunc(jobs.TypeUploadCleanup, jobHandler.HandleUploadCleanup) + mux.HandleFunc(jobs.TypeNotificationCleanup, jobHandler.HandleNotificationCleanup) + mux.HandleFunc(jobs.TypeWebhookLogCleanup, jobHandler.HandleWebhookLogCleanup) + mux.HandleFunc(jobs.TypeAuditLogCleanup, jobHandler.HandleAuditLogCleanup) mux.HandleFunc(worker.TypeTaskCompletedNotification, jobHandler.HandleTaskCompletedNotification) + mux.HandleFunc(worker.TypeDataExport, jobHandler.HandleDataExport) // Register email job handlers (welcome, verification, password reset, password changed) if emailService != nil { @@ -281,6 +285,23 @@ func main() { } log.Info().Str("cron", "30 * * * *").Msg("Registered pending_uploads cleanup job (runs hourly)") + // Data-retention cleanups (BE-2). Staggered off the 3:00 reminder cleanup to + // avoid piling DELETEs onto the same Neon connection window. + if _, err := scheduler.Register("0 2 * * *", asynq.NewTask(jobs.TypeNotificationCleanup, nil)); err != nil { + log.Fatal().Err(err).Msg("Failed to register notification cleanup job") + } + log.Info().Str("cron", "0 2 * * *").Msg("Registered notification cleanup job (daily 02:00 UTC, 90d retention)") + + if _, err := scheduler.Register("30 2 * * 0", asynq.NewTask(jobs.TypeWebhookLogCleanup, nil)); err != nil { + log.Fatal().Err(err).Msg("Failed to register webhook log cleanup job") + } + log.Info().Str("cron", "30 2 * * 0").Msg("Registered webhook log cleanup job (weekly Sun 02:30 UTC, 180d retention)") + + if _, err := scheduler.Register("30 3 * * 0", asynq.NewTask(jobs.TypeAuditLogCleanup, nil)); err != nil { + log.Fatal().Err(err).Msg("Failed to register audit log cleanup job") + } + log.Info().Str("cron", "30 3 * * 0").Msg("Registered audit log cleanup job (weekly Sun 03:30 UTC, 365d retention)") + // Handle graceful shutdown quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) @@ -292,6 +313,12 @@ func main() { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"status":"ok"}`)) }) + // Expose Prometheus metrics so vmagent can scrape the worker. The + // apns_send_*, fcm_send_*, asynq_job_* and cache_ops_* series have been + // recorded on this process all along — they were just never exposed, which + // is why those dashboard panels read empty. Same :6060 as health; in-cluster + // only (not externally published). + healthMux.Handle("/metrics", prom.HTTPHandler()) healthSrv := &http.Server{ Addr: workerHealthAddr, Handler: healthMux, diff --git a/deploy-k3s/manifests/observability/vmagent.yaml b/deploy-k3s/manifests/observability/vmagent.yaml index 5a88cb9..bda2aed 100644 --- a/deploy-k3s/manifests/observability/vmagent.yaml +++ b/deploy-k3s/manifests/observability/vmagent.yaml @@ -78,18 +78,25 @@ data: - target_label: service replacement: node-exporter - # honeyDue worker — also exposes /metrics if/when we add it. - # Keep this stanza commented until the worker has a /metrics endpoint; - # uncommented form drops scrapes silently. - # - job_name: worker - # kubernetes_sd_configs: - # - role: pod - # namespaces: - # names: [honeydue] - # relabel_configs: - # - source_labels: [__meta_kubernetes_pod_label_app_kubernetes_io_name] - # action: keep - # regex: worker + # honeyDue worker — exposes /metrics on :6060 (apns/fcm/asynq/cache series). + - job_name: worker + kubernetes_sd_configs: + - role: pod + namespaces: + names: [honeydue] + relabel_configs: + - source_labels: [__meta_kubernetes_pod_label_app_kubernetes_io_name] + action: keep + regex: worker + - source_labels: [__meta_kubernetes_pod_container_port_number] + action: keep + regex: "6060" + - source_labels: [__meta_kubernetes_pod_name] + target_label: pod + - source_labels: [__meta_kubernetes_pod_node_name] + target_label: node + - target_label: service + replacement: worker --- apiVersion: v1 diff --git a/deploy-k3s/manifests/worker/deployment.yaml b/deploy-k3s/manifests/worker/deployment.yaml index 53f8f8d..a7fe0d8 100644 --- a/deploy-k3s/manifests/worker/deployment.yaml +++ b/deploy-k3s/manifests/worker/deployment.yaml @@ -43,6 +43,11 @@ spec: - name: worker image: IMAGE_PLACEHOLDER # Replaced by 03-deploy.sh imagePullPolicy: IfNotPresent # audit CODE-L4 — explicit; images are SHA/digest-pinned + ports: + # health + Prometheus /metrics (in-cluster only; scraped by vmagent) + - name: metrics + containerPort: 6060 + protocol: TCP securityContext: allowPrivilegeEscalation: false readOnlyRootFilesystem: true @@ -95,3 +100,46 @@ spec: - name: tmp emptyDir: sizeLimit: 64Mi +--- +# Allow vmagent to scrape the worker's /metrics on :6060 (default-deny-all is in +# force; the worker otherwise receives no ingress). Additive — see node-exporter. +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: allow-ingress-to-worker-metrics + namespace: honeydue +spec: + podSelector: + matchLabels: + app.kubernetes.io/name: worker + policyTypes: + - Ingress + ingress: + - from: + - podSelector: + matchLabels: + app.kubernetes.io/name: vmagent + ports: + - port: 6060 + protocol: TCP +--- +# vmagent's base egress policy only opens :8000/:8080 to the pod CIDR; this +# additive policy opens :6060 for the worker scrape (leaves the base untouched). +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: allow-egress-from-vmagent-to-worker + namespace: honeydue +spec: + podSelector: + matchLabels: + app.kubernetes.io/name: vmagent + policyTypes: + - Egress + egress: + - to: + - ipBlock: + cidr: 10.42.0.0/16 + ports: + - port: 6060 + protocol: TCP diff --git a/internal/handlers/auth_handler.go b/internal/handlers/auth_handler.go index d86a73c..9b03a7b 100644 --- a/internal/handlers/auth_handler.go +++ b/internal/handlers/auth_handler.go @@ -12,6 +12,7 @@ import ( "github.com/treytartt/honeydue-api/internal/middleware" "github.com/treytartt/honeydue-api/internal/services" "github.com/treytartt/honeydue-api/internal/validator" + "github.com/treytartt/honeydue-api/internal/worker" ) // AuthHandler handles user profile and account management endpoints. @@ -23,6 +24,7 @@ type AuthHandler struct { cache *services.CacheService storageService *services.StorageService auditService *services.AuditService + enqueuer worker.Enqueuer } // NewAuthHandler creates a new auth handler. @@ -44,6 +46,38 @@ func (h *AuthHandler) SetAuditService(auditService *services.AuditService) { h.auditService = auditService } +// SetEnqueuer sets the async task enqueuer (used by the GDPR data-export endpoint). +func (h *AuthHandler) SetEnqueuer(enqueuer worker.Enqueuer) { + h.enqueuer = enqueuer +} + +// ExportData handles POST /api/auth/export/ — queues a GDPR data-export job that +// emails the user a zip of all their data. Async (202) because gathering, +// zipping, and emailing can take seconds; doing it inline would block the request. +func (h *AuthHandler) ExportData(c echo.Context) error { + noStore(c) + user, err := middleware.MustGetAuthUser(c) + if err != nil { + return err + } + if h.enqueuer == nil { + return echo.NewHTTPError(http.StatusServiceUnavailable, "data export is temporarily unavailable") + } + if err := h.enqueuer.EnqueueDataExport(user.ID); err != nil { + log.Error().Err(err).Uint("user_id", user.ID).Msg("Failed to enqueue data export") + return echo.NewHTTPError(http.StatusInternalServerError, "failed to queue data export") + } + if h.auditService != nil { + h.auditService.LogEvent(c, &user.ID, services.AuditEventDataExport, map[string]interface{}{ + "user_id": user.ID, + "email": user.Email, + }) + } + return c.JSON(http.StatusAccepted, map[string]string{ + "message": "Your data export has been queued. You'll receive an email with your data shortly.", + }) +} + // noStore marks a response as non-cacheable. func noStore(c echo.Context) { c.Response().Header().Set("Cache-Control", "no-store") diff --git a/internal/prom/metrics.go b/internal/prom/metrics.go index 3b4daba..dd130b3 100644 --- a/internal/prom/metrics.go +++ b/internal/prom/metrics.go @@ -1,6 +1,7 @@ package prom import ( + "net/http" "strconv" "time" @@ -54,6 +55,11 @@ var ( Help: "Duration of asynq background job execution in seconds.", Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30, 60, 300}, }, []string{"task_type", "result"}) + + cacheOps = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "cache_ops_total", + Help: "Redis cache operations by type and result.", + }, []string{"operation", "result"}) // operation: get|set; result: hit|miss|ok|error ) func init() { @@ -67,6 +73,7 @@ func init() { apnsSendDuration, fcmSendDuration, asynqJobDuration, + cacheOps, ) } @@ -77,6 +84,20 @@ func Handler() echo.HandlerFunc { return echo.WrapHandler(h) } +// HTTPHandler returns a net/http handler bound to the package Registry, for the +// worker's plain http.ServeMux (the api uses Handler() for Echo). This is what +// lets the worker's apns/fcm/asynq histograms actually get scraped — they were +// recorded all along but the worker exposed no /metrics endpoint. +func HTTPHandler() http.Handler { + return promhttp.HandlerFor(Registry, promhttp.HandlerOpts{Registry: Registry}) +} + +// ObserveCacheOp records a Redis cache operation. operation is "get" or "set"; +// result is "hit"/"miss"/"error" for gets and "ok"/"error" for sets. +func ObserveCacheOp(operation, result string) { + cacheOps.WithLabelValues(operation, result).Inc() +} + // HTTPMiddleware records http_request_duration_seconds for every request, // labeled by Echo route pattern, method, and status code. func HTTPMiddleware() echo.MiddlewareFunc { diff --git a/internal/router/router.go b/internal/router/router.go index beb86e5..1cd6333 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -270,6 +270,9 @@ func SetupRouter(deps *Dependencies) *echo.Echo { authHandler := handlers.NewAuthHandler(authService, deps.EmailService, deps.Cache) authHandler.SetStorageService(deps.StorageService) authHandler.SetAuditService(auditService) + if deps.TaskEnqueuer != nil { + authHandler.SetEnqueuer(deps.TaskEnqueuer) + } userHandler := handlers.NewUserHandler(userService) residenceHandler := handlers.NewResidenceHandler(residenceService, deps.PDFService, deps.EmailService, cfg.Features.PDFReportsEnabled) taskHandler := handlers.NewTaskHandler(taskService, deps.StorageService) @@ -559,6 +562,7 @@ func setupProtectedAuthRoutes(api *echo.Group, authHandler *handlers.AuthHandler auth.PUT("/profile/", authHandler.UpdateProfile) auth.PATCH("/profile/", authHandler.UpdateProfile) auth.DELETE("/account/", authHandler.DeleteAccount) + auth.POST("/export/", authHandler.ExportData) } } diff --git a/internal/services/audit_service.go b/internal/services/audit_service.go index 703e1d2..970738a 100644 --- a/internal/services/audit_service.go +++ b/internal/services/audit_service.go @@ -19,6 +19,7 @@ const ( AuditEventPasswordReset = "auth.password_reset" AuditEventPasswordChanged = "auth.password_changed" AuditEventAccountDeleted = "auth.account_deleted" + AuditEventDataExport = "auth.data_export_requested" ) // AuditService handles audit logging for security-relevant events. diff --git a/internal/services/cache_service.go b/internal/services/cache_service.go index 454821e..2859722 100644 --- a/internal/services/cache_service.go +++ b/internal/services/cache_service.go @@ -3,6 +3,7 @@ package services import ( "context" "encoding/json" + "errors" "fmt" "hash/fnv" "sync" @@ -13,6 +14,7 @@ import ( "github.com/treytartt/honeydue-api/internal/config" "github.com/treytartt/honeydue-api/internal/i18n" + "github.com/treytartt/honeydue-api/internal/prom" ) // CacheService provides Redis caching functionality @@ -93,16 +95,28 @@ func (c *CacheService) Set(ctx context.Context, key string, value interface{}, e return fmt.Errorf("failed to marshal value: %w", err) } - return c.client.Set(ctx, key, data, expiration).Err() + err = c.client.Set(ctx, key, data, expiration).Err() + if err != nil { + prom.ObserveCacheOp("set", "error") + } else { + prom.ObserveCacheOp("set", "ok") + } + return err } // Get retrieves a value by key func (c *CacheService) Get(ctx context.Context, key string, dest interface{}) error { data, err := c.client.Get(ctx, key).Bytes() if err != nil { + if errors.Is(err, redis.Nil) { + prom.ObserveCacheOp("get", "miss") + } else { + prom.ObserveCacheOp("get", "error") + } return err } + prom.ObserveCacheOp("get", "hit") return json.Unmarshal(data, dest) } diff --git a/internal/worker/enqueuer.go b/internal/worker/enqueuer.go index 5991dd4..99f17fd 100644 --- a/internal/worker/enqueuer.go +++ b/internal/worker/enqueuer.go @@ -12,11 +12,17 @@ type Enqueuer interface { EnqueuePasswordResetEmail(to, firstName, code, resetToken string) error EnqueuePasswordChangedEmail(to, firstName string) error EnqueueTaskCompletedNotification(taskID, completionID uint) error + EnqueueDataExport(userID uint) error } // Verify TaskClient satisfies the interface at compile time. var _ Enqueuer = (*TaskClient)(nil) +// BuildDataExportPayload marshals a DataExportPayload to JSON bytes. +func BuildDataExportPayload(userID uint) ([]byte, error) { + return json.Marshal(DataExportPayload{UserID: userID}) +} + // BuildWelcomeEmailPayload marshals a WelcomeEmailPayload to JSON bytes. func BuildWelcomeEmailPayload(to, firstName, code string) ([]byte, error) { return json.Marshal(WelcomeEmailPayload{ diff --git a/internal/worker/jobs/cleanup.go b/internal/worker/jobs/cleanup.go new file mode 100644 index 0000000..2565e32 --- /dev/null +++ b/internal/worker/jobs/cleanup.go @@ -0,0 +1,69 @@ +package jobs + +import ( + "context" + "time" + + "github.com/hibiken/asynq" + "github.com/rs/zerolog/log" + + "github.com/treytartt/honeydue-api/internal/models" + "github.com/treytartt/honeydue-api/internal/repositories" +) + +// Data-retention cleanup job types. Registered as periodic crons in +// cmd/worker/main.go. These keep transient/log tables from growing unbounded; +// none touch user-facing data that the app reads back. +const ( + TypeNotificationCleanup = "maintenance:notification_cleanup" + TypeWebhookLogCleanup = "maintenance:webhook_log_cleanup" + TypeAuditLogCleanup = "maintenance:audit_log_cleanup" +) + +// Retention windows (days). +const ( + notificationRetentionDays = 90 + webhookLogRetentionDays = 180 + auditLogRetentionDays = 365 // keep 1 year of security events +) + +// HandleNotificationCleanup deletes notification rows older than the retention +// window. Notifications are delivery records (push/digest history); 90 days is +// ample for any in-app history a client might show. +func (h *Handler) HandleNotificationCleanup(ctx context.Context, _ *asynq.Task) error { + cutoff := time.Now().UTC().AddDate(0, 0, -notificationRetentionDays) + res := h.db.WithContext(ctx).Where("created_at < ?", cutoff).Delete(&models.Notification{}) + if res.Error != nil { + log.Error().Err(res.Error).Msg("notification cleanup failed") + return res.Error + } + log.Info().Int64("deleted", res.RowsAffected).Int("retention_days", notificationRetentionDays).Msg("notification cleanup completed") + return nil +} + +// HandleWebhookLogCleanup prunes the webhook dedup log. Rows only matter for the +// window in which a provider (Apple/Google) might redeliver an event; 180 days +// is a generous safety margin past any real redelivery. +func (h *Handler) HandleWebhookLogCleanup(ctx context.Context, _ *asynq.Task) error { + cutoff := time.Now().UTC().AddDate(0, 0, -webhookLogRetentionDays) + res := h.db.WithContext(ctx).Where("processed_at < ?", cutoff).Delete(&repositories.WebhookEvent{}) + if res.Error != nil { + log.Error().Err(res.Error).Msg("webhook log cleanup failed") + return res.Error + } + log.Info().Int64("deleted", res.RowsAffected).Int("retention_days", webhookLogRetentionDays).Msg("webhook log cleanup completed") + return nil +} + +// HandleAuditLogCleanup prunes audit events older than the retention window. +// One year of security events is retained for compliance/forensics. +func (h *Handler) HandleAuditLogCleanup(ctx context.Context, _ *asynq.Task) error { + cutoff := time.Now().UTC().AddDate(0, 0, -auditLogRetentionDays) + res := h.db.WithContext(ctx).Where("created_at < ?", cutoff).Delete(&models.AuditLog{}) + if res.Error != nil { + log.Error().Err(res.Error).Msg("audit log cleanup failed") + return res.Error + } + log.Info().Int64("deleted", res.RowsAffected).Int("retention_days", auditLogRetentionDays).Msg("audit log cleanup completed") + return nil +} diff --git a/internal/worker/jobs/data_export.go b/internal/worker/jobs/data_export.go new file mode 100644 index 0000000..68868ac --- /dev/null +++ b/internal/worker/jobs/data_export.go @@ -0,0 +1,138 @@ +package jobs + +import ( + "archive/zip" + "bytes" + "context" + "encoding/json" + "fmt" + "time" + + "github.com/hibiken/asynq" + "github.com/rs/zerolog/log" + + "github.com/treytartt/honeydue-api/internal/models" + "github.com/treytartt/honeydue-api/internal/services" + "github.com/treytartt/honeydue-api/internal/worker" +) + +// HandleDataExport gathers all of a user's data (GDPR data portability), zips it +// as one JSON file per category, and emails the archive as an attachment. +// Triggered by POST /api/auth/export/ -> Enqueuer.EnqueueDataExport. +// +// Residence-scoped data (tasks, contractors, documents, share codes) covers only +// residences the user OWNS — shared residences belong to their owner and are +// intentionally excluded. Document/photo *files* are referenced by URL (in +// documents.json); the bytes live in B2 and aren't inlined. +func (h *Handler) HandleDataExport(ctx context.Context, task *asynq.Task) error { + var payload worker.DataExportPayload + if err := json.Unmarshal(task.Payload(), &payload); err != nil { + log.Error().Err(err).Msg("data export: malformed payload") + return asynq.SkipRetry + } + db := h.db.WithContext(ctx) + + var user models.User + if err := db.First(&user, payload.UserID).Error; err != nil { + log.Error().Err(err).Uint("user_id", payload.UserID).Msg("data export: user not found") + return err + } + if h.emailService == nil { + log.Warn().Uint("user_id", payload.UserID).Msg("data export: email service unavailable; cannot deliver") + return nil // retrying won't help a structurally-disabled mailer + } + + var ownedIDs []uint + db.Model(&models.Residence{}).Where("owner_id = ?", payload.UserID).Pluck("id", &ownedIDs) + + var ( + profile []models.UserProfile + residences []models.Residence + tasks []models.Task + contractors []models.Contractor + documents []models.Document + shareCodes []models.ResidenceShareCode + notifs []models.Notification + notifPrefs []models.NotificationPreference + apnsDevices []models.APNSDevice + gcmDevices []models.GCMDevice + subscription []models.UserSubscription + auditLog []models.AuditLog + ) + db.Where("user_id = ?", payload.UserID).Find(&profile) + db.Where("owner_id = ?", payload.UserID).Find(&residences) + if len(ownedIDs) > 0 { + db.Where("residence_id IN ?", ownedIDs).Find(&tasks) + db.Where("residence_id IN ?", ownedIDs).Find(&contractors) + db.Where("residence_id IN ?", ownedIDs).Find(&documents) + db.Where("residence_id IN ?", ownedIDs).Find(&shareCodes) + } + db.Where("user_id = ?", payload.UserID).Find(¬ifs) + db.Where("user_id = ?", payload.UserID).Find(¬ifPrefs) + db.Where("user_id = ?", payload.UserID).Find(&apnsDevices) + db.Where("user_id = ?", payload.UserID).Find(&gcmDevices) + db.Where("user_id = ?", payload.UserID).Find(&subscription) + db.Where("user_id = ?", payload.UserID).Find(&auditLog) + + sections := []struct { + name string + data interface{} + }{ + {"account", user}, + {"profile", profile}, + {"residences", residences}, + {"tasks", tasks}, + {"contractors", contractors}, + {"documents", documents}, + {"share_codes", shareCodes}, + {"notifications", notifs}, + {"notification_preferences", notifPrefs}, + {"push_tokens_ios", apnsDevices}, + {"push_tokens_android", gcmDevices}, + {"subscription", subscription}, + {"audit_log", auditLog}, + } + + var buf bytes.Buffer + zw := zip.NewWriter(&buf) + readme := fmt.Sprintf("honeyDue data export\nGenerated: %s UTC\nAccount: %s\n\n"+ + "One JSON file per data category. Residence-scoped data covers residences you own.\n"+ + "Document and photo files are referenced by URL in documents.json.\n", + time.Now().UTC().Format(time.RFC3339), user.Email) + if w, err := zw.Create("README.txt"); err == nil { + _, _ = w.Write([]byte(readme)) + } + for _, s := range sections { + w, err := zw.Create(s.name + ".json") + if err != nil { + _ = zw.Close() + return fmt.Errorf("data export: zip create %s: %w", s.name, err) + } + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + if err := enc.Encode(s.data); err != nil { + _ = zw.Close() + return fmt.Errorf("data export: encode %s: %w", s.name, err) + } + } + if err := zw.Close(); err != nil { + return fmt.Errorf("data export: finalize zip: %w", err) + } + + subject := "Your honeyDue data export" + text := "Attached is a copy of your honeyDue data, as a zip of JSON files.\n" + + "If you didn't request this, you can ignore this email.\n" + html := "

Attached is a copy of your honeyDue data, as a zip of JSON files.

" + + "

If you didn't request this, you can ignore this email.

" + attach := &services.EmailAttachment{ + Filename: "honeydue-data-export.zip", + ContentType: "application/zip", + Data: buf.Bytes(), + } + if err := h.emailService.SendEmailWithAttachment(user.Email, subject, html, text, attach); err != nil { + log.Error().Err(err).Uint("user_id", payload.UserID).Msg("data export: email send failed") + return err + } + log.Info().Uint("user_id", payload.UserID).Int("zip_bytes", buf.Len()).Msg("data export emailed") + return nil +} diff --git a/internal/worker/jobs/handler_test.go b/internal/worker/jobs/handler_test.go index 043b936..e85d21c 100644 --- a/internal/worker/jobs/handler_test.go +++ b/internal/worker/jobs/handler_test.go @@ -12,6 +12,7 @@ import ( "github.com/treytartt/honeydue-api/internal/config" "github.com/treytartt/honeydue-api/internal/models" "github.com/treytartt/honeydue-api/internal/repositories" + "github.com/treytartt/honeydue-api/internal/services" ) // --- Mock implementations --- @@ -27,6 +28,13 @@ func (m *mockEmailSender) SendEmail(to, subject, htmlBody, textBody string) erro return nil } +func (m *mockEmailSender) SendEmailWithAttachment(to, subject, htmlBody, textBody string, _ *services.EmailAttachment) error { + if m.sendFn != nil { + return m.sendFn(to, subject, htmlBody, textBody) + } + return nil +} + type mockPushSender struct { sendFn func(ctx context.Context, iosTokens, androidTokens []string, title, message string, data map[string]string) error } diff --git a/internal/worker/jobs/interfaces.go b/internal/worker/jobs/interfaces.go index 0c040c1..85c5e4c 100644 --- a/internal/worker/jobs/interfaces.go +++ b/internal/worker/jobs/interfaces.go @@ -6,6 +6,7 @@ import ( "github.com/treytartt/honeydue-api/internal/models" "github.com/treytartt/honeydue-api/internal/repositories" + "github.com/treytartt/honeydue-api/internal/services" ) // TaskRepo defines task query operations needed by job handlers. @@ -46,6 +47,7 @@ type PushSender interface { // EmailSender sends emails. type EmailSender interface { SendEmail(to, subject, htmlBody, textBody string) error + SendEmailWithAttachment(to, subject, htmlBody, textBody string, attachment *services.EmailAttachment) error } // OnboardingEmailSender sends onboarding campaign emails. diff --git a/internal/worker/scheduler.go b/internal/worker/scheduler.go index a563b3a..9a5cc7c 100644 --- a/internal/worker/scheduler.go +++ b/internal/worker/scheduler.go @@ -21,8 +21,17 @@ const ( // Moves the ~1-1.5s of synchronous APNs+SMTP+B2-fetch work out of the // POST /api/task-completions/ request path. TypeTaskCompletedNotification = "notification:task_completed" + + // TypeDataExport is emitted by POST /api/auth/export/. The worker gathers + // all of the user's data into a zip and emails it (GDPR data portability). + TypeDataExport = "user:data_export" ) +// DataExportPayload carries just the user id; the worker re-fetches all rows. +type DataExportPayload struct { + UserID uint `json:"user_id"` +} + // TaskCompletedNotificationPayload carries only the IDs needed for the // worker to re-fetch the canonical Task + TaskCompletion rows. Keeping the // payload to IDs (vs. full model graphs) keeps the Redis queue cheap and @@ -93,6 +102,26 @@ func (c *TaskClient) EnqueueWelcomeEmail(to, firstName, code string) error { return nil } +// EnqueueDataExport enqueues a GDPR data-export task for a user. The worker +// gathers the user's data, zips it, and emails it. Low priority — there's no +// rush, and it shouldn't compete with notifications for the critical queue. +func (c *TaskClient) EnqueueDataExport(userID uint) error { + payload, err := BuildDataExportPayload(userID) + if err != nil { + return err + } + + task := asynq.NewTask(TypeDataExport, payload) + _, err = c.client.Enqueue(task, asynq.Queue("low"), asynq.MaxRetry(3)) + if err != nil { + log.Error().Err(err).Uint("user_id", userID).Msg("Failed to enqueue data export") + return err + } + + log.Info().Uint("user_id", userID).Msg("Data export task enqueued") + return nil +} + // EnqueueVerificationEmail enqueues a verification email task func (c *TaskClient) EnqueueVerificationEmail(to, firstName, code string) error { payload, err := BuildVerificationEmailPayload(to, firstName, code)