Files
Nip-05-api/internal/payments/worker.go
2026-04-29 02:35:00 +00:00

174 lines
4.2 KiB
Go

package payments
import (
"context"
"errors"
"log/slog"
"time"
"github.com/noderunners/nip05api/internal/audit"
"github.com/noderunners/nip05api/internal/dm"
"github.com/noderunners/nip05api/internal/invoice"
"github.com/noderunners/nip05api/internal/user"
"github.com/noderunners/nip05api/internal/webhook"
)
type Worker struct {
invoices *invoice.Service
users *user.Service
lnbits *invoice.LNbitsClient
dms *dm.Service
hooks *webhook.Service
audit *audit.Logger
domain string
frontend string
interval time.Duration
enabled bool
}
func NewWorker(inv *invoice.Service, u *user.Service, ln *invoice.LNbitsClient, dms *dm.Service, hooks *webhook.Service, aud *audit.Logger, domain, frontend string, enabled bool) *Worker {
return &Worker{
invoices: inv,
users: u,
lnbits: ln,
dms: dms,
hooks: hooks,
audit: aud,
domain: domain,
frontend: frontend,
interval: 5 * time.Second,
enabled: enabled,
}
}
func (w *Worker) Run(ctx context.Context) {
if !w.enabled {
<-ctx.Done()
return
}
t := time.NewTicker(w.interval)
defer t.Stop()
cleanup := time.NewTicker(15 * time.Minute)
defer cleanup.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
w.tick(ctx)
case <-cleanup.C:
_ = w.invoices.Repo().PurgeOldUnpaid(ctx)
}
}
}
func (w *Worker) tick(ctx context.Context) {
pending, err := w.invoices.Repo().ListUnpaid(ctx)
if err != nil {
slog.Error("payments list", "err", err)
return
}
for _, p := range pending {
paid, err := w.lnbits.Status(ctx, p.PaymentHash)
if err != nil {
slog.Warn("lnbits status", "hash", p.PaymentHash, "err", err)
continue
}
if !paid {
continue
}
if err := w.confirm(ctx, p); err != nil {
slog.Error("confirm payment", "hash", p.PaymentHash, "err", err)
}
}
}
// confirm completes a paid invoice idempotently. Crash recovery is safe at any
// point: we capture the target expiry once, then apply absolute updates.
func (w *Worker) confirm(ctx context.Context, p *invoice.PendingInvoice) error {
existing, getErr := w.users.Repo().GetByPubkey(ctx, p.Pubkey)
if getErr != nil && !errors.Is(getErr, user.ErrUserNotFound) {
return getErr
}
target, err := w.resolveTarget(ctx, p, existing)
if err != nil {
return err
}
wasNew := errors.Is(getErr, user.ErrUserNotFound)
if wasNew {
u := &user.User{
Pubkey: p.Pubkey,
Username: p.Username,
SubscriptionType: p.SubscriptionType,
ExpiresAt: target,
IsActive: true,
}
if err := w.users.Repo().Insert(ctx, u); err != nil {
// Likely UNIQUE constraint from a concurrent recovery attempt;
// re-fetch and treat as existing.
existing2, err2 := w.users.Repo().GetByPubkey(ctx, p.Pubkey)
if err2 != nil {
return err
}
existing = existing2
wasNew = false
}
}
if !wasNew {
if err := w.users.Repo().SetActiveExpiry(ctx, p.Pubkey, p.SubscriptionType, target); err != nil {
return err
}
}
claimed, err := w.invoices.Repo().ClaimPaid(ctx, p.PaymentHash)
if err != nil {
return err
}
if !claimed {
return nil // another tick already dispatched events
}
final, err := w.users.Repo().GetByPubkey(ctx, p.Pubkey)
if err != nil {
return err
}
dmEvent := dm.EventWelcome
if !wasNew && p.IsRenewal {
dmEvent = dm.EventExtended
}
w.dispatchEvents(ctx, final, p, dmEvent)
return nil
}
// resolveTarget returns the canonical expiry to apply to the user. Persisted
// on first call so retries see the same value.
func (w *Worker) resolveTarget(ctx context.Context, p *invoice.PendingInvoice, existing *user.User) (*time.Time, error) {
if p.TargetSet {
return p.TargetExpiresAt, nil
}
target := computeTarget(p, existing, time.Now().UTC())
if _, err := w.invoices.Repo().SetTargetIfUnset(ctx, p.PaymentHash, target); err != nil {
return nil, err
}
fresh, err := w.invoices.Repo().Get(ctx, p.PaymentHash)
if err != nil {
return nil, err
}
return fresh.TargetExpiresAt, nil
}
func computeTarget(p *invoice.PendingInvoice, existing *user.User, now time.Time) *time.Time {
if p.SubscriptionType == user.SubLifetime {
return nil
}
var current *time.Time
if existing != nil {
current = existing.ExpiresAt
}
return user.YearlyTargetExpiry(current, p.Years, now)
}