package scheduler import ( "context" "encoding/json" "fmt" "log" "github.com/calendarapi/internal/repository" "github.com/calendarapi/internal/utils" "github.com/hibiken/asynq" ) // SubscriptionSyncer performs a subscription sync (used by background worker). type SubscriptionSyncer interface { SyncSubscriptionBackground(ctx context.Context, subscriptionID string) error } type ReminderWorker struct { queries *repository.Queries } func NewReminderWorker(queries *repository.Queries) *ReminderWorker { return &ReminderWorker{queries: queries} } func (w *ReminderWorker) HandleReminderTask(ctx context.Context, t *asynq.Task) error { var payload ReminderPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { return fmt.Errorf("unmarshal payload: %w", err) } ev, err := w.queries.GetEventByID(ctx, utils.ToPgUUID(payload.EventID)) if err != nil { return fmt.Errorf("get event: %w", err) } if ev.DeletedAt.Valid { log.Printf("reminder skipped: event %s deleted", payload.EventID) return nil } log.Printf("reminder triggered: event=%s user=%s title=%s", payload.EventID, payload.UserID, ev.Title) return nil } func (w *ReminderWorker) HandleTaskReminder(ctx context.Context, t *asynq.Task) error { var payload TaskReminderPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { return fmt.Errorf("unmarshal task reminder payload: %w", err) } task, err := w.queries.GetTaskByID(ctx, repository.GetTaskByIDParams{ ID: utils.ToPgUUID(payload.TaskID), OwnerID: utils.ToPgUUID(payload.OwnerID), }) if err != nil { return fmt.Errorf("get task: %w", err) } if task.DeletedAt.Valid { return nil } log.Printf("task reminder triggered: task=%s title=%s", payload.TaskID, task.Title) return nil } func (w *ReminderWorker) HandleRecurringTask(ctx context.Context, t *asynq.Task) error { var payload RecurringTaskPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { return fmt.Errorf("unmarshal recurring task payload: %w", err) } task, err := w.queries.GetTaskByID(ctx, repository.GetTaskByIDParams{ ID: utils.ToPgUUID(payload.TaskID), OwnerID: utils.ToPgUUID(payload.OwnerID), }) if err != nil { return nil } if !task.RecurrenceRule.Valid || task.RecurrenceRule.String == "" { return nil } log.Printf("recurring task: task=%s (generation stub)", payload.TaskID) return nil } type SubscriptionSyncWorker struct { syncer SubscriptionSyncer } func NewSubscriptionSyncWorker(syncer SubscriptionSyncer) *SubscriptionSyncWorker { return &SubscriptionSyncWorker{syncer: syncer} } func (w *SubscriptionSyncWorker) HandleSubscriptionSync(ctx context.Context, t *asynq.Task) error { var payload SubscriptionSyncPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { return fmt.Errorf("unmarshal subscription sync payload: %w", err) } if err := w.syncer.SyncSubscriptionBackground(ctx, payload.SubscriptionID); err != nil { return fmt.Errorf("sync subscription %s: %w", payload.SubscriptionID, err) } log.Printf("subscription sync completed: %s", payload.SubscriptionID) return nil } func StartWorker(redisAddr string, worker *ReminderWorker, subSyncWorker *SubscriptionSyncWorker) *asynq.Server { srv := asynq.NewServer( asynq.RedisClientOpt{Addr: redisAddr}, asynq.Config{ Concurrency: 10, Queues: map[string]int{ "reminders": 6, "subscriptions": 2, "default": 3, }, RetryDelayFunc: asynq.DefaultRetryDelayFunc, }, ) mux := asynq.NewServeMux() mux.HandleFunc(TypeReminder, worker.HandleReminderTask) mux.HandleFunc(TypeTaskReminder, worker.HandleTaskReminder) mux.HandleFunc(TypeRecurringTask, worker.HandleRecurringTask) if subSyncWorker != nil { mux.HandleFunc(TypeSubscriptionSync, subSyncWorker.HandleSubscriptionSync) } go func() { if err := srv.Run(mux); err != nil { log.Printf("asynq worker error: %v", err) } }() return srv }