package scheduler import ( "context" "encoding/json" "fmt" "log" "time" "github.com/calendarapi/internal/repository" "github.com/calendarapi/internal/utils" "github.com/google/uuid" "github.com/hibiken/asynq" ) const TypeReminder = "reminder:send" const TypeSubscriptionSync = "subscription:sync" type ReminderPayload struct { EventID uuid.UUID `json:"event_id"` ReminderID uuid.UUID `json:"reminder_id"` UserID uuid.UUID `json:"user_id"` } type Scheduler struct { client *asynq.Client } func NewScheduler(redisAddr string) *Scheduler { client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr}) return &Scheduler{client: client} } func (s *Scheduler) ScheduleReminder(_ context.Context, eventID, reminderID, userID uuid.UUID, triggerAt time.Time) error { payload, err := json.Marshal(ReminderPayload{ EventID: eventID, ReminderID: reminderID, UserID: userID, }) if err != nil { return fmt.Errorf("marshal reminder payload: %w", err) } task := asynq.NewTask(TypeReminder, payload) _, err = s.client.Enqueue(task, asynq.ProcessAt(triggerAt), asynq.MaxRetry(5), asynq.Queue("reminders"), ) if err != nil { return fmt.Errorf("enqueue reminder: %w", err) } return nil } type SubscriptionSyncPayload struct { SubscriptionID string `json:"subscription_id"` } func (s *Scheduler) EnqueueSubscriptionSync(ctx context.Context, subscriptionID string) error { payload, err := json.Marshal(SubscriptionSyncPayload{SubscriptionID: subscriptionID}) if err != nil { return fmt.Errorf("marshal subscription sync payload: %w", err) } task := asynq.NewTask(TypeSubscriptionSync, payload) _, err = s.client.Enqueue(task, asynq.MaxRetry(3), asynq.Queue("subscriptions"), ) if err != nil { return fmt.Errorf("enqueue subscription sync: %w", err) } return nil } func (s *Scheduler) Close() error { return s.client.Close() } // StartSubscriptionEnqueuer runs a goroutine that every interval lists subscriptions due for sync // and enqueues a sync task for each. Call with a cancellable context to stop. func StartSubscriptionEnqueuer(ctx context.Context, queries *repository.Queries, sched *Scheduler, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: subs, err := queries.ListSubscriptionsDueForSync(ctx) if err != nil { log.Printf("subscription enqueuer: list due for sync: %v", err) continue } for _, sub := range subs { subID := utils.FromPgUUID(sub.ID) if err := sched.EnqueueSubscriptionSync(ctx, subID.String()); err != nil { log.Printf("subscription enqueuer: enqueue %s: %v", subID, err) } } if len(subs) > 0 { log.Printf("subscription enqueuer: enqueued %d subscriptions", len(subs)) } } } } type NoopScheduler struct{} func (NoopScheduler) ScheduleReminder(_ context.Context, _, _, _ uuid.UUID, _ time.Time) error { return nil } func (NoopScheduler) Close() error { return nil }