package scheduler import ( "context" "encoding/json" "fmt" "log" "github.com/calendarapi/internal/repository" "github.com/calendarapi/internal/utils" "github.com/hibiken/asynq" ) type ReminderWorker struct { queries *repository.Queries } func NewReminderWorker(queries *repository.Queries) *ReminderWorker { return &ReminderWorker{queries: queries} } func (w *ReminderWorker) HandleReminderTask(ctx context.Context, t *asynq.Task) error { var payload ReminderPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { return fmt.Errorf("unmarshal payload: %w", err) } ev, err := w.queries.GetEventByID(ctx, utils.ToPgUUID(payload.EventID)) if err != nil { return fmt.Errorf("get event: %w", err) } if ev.DeletedAt.Valid { log.Printf("reminder skipped: event %s deleted", payload.EventID) return nil } log.Printf("reminder triggered: event=%s user=%s title=%s", payload.EventID, payload.UserID, ev.Title) return nil } func StartWorker(redisAddr string, worker *ReminderWorker) *asynq.Server { srv := asynq.NewServer( asynq.RedisClientOpt{Addr: redisAddr}, asynq.Config{ Concurrency: 10, Queues: map[string]int{ "reminders": 6, "default": 3, }, RetryDelayFunc: asynq.DefaultRetryDelayFunc, }, ) mux := asynq.NewServeMux() mux.HandleFunc(TypeReminder, worker.HandleReminderTask) go func() { if err := srv.Run(mux); err != nil { log.Printf("asynq worker error: %v", err) } }() return srv }