174 lines
4.2 KiB
Go
174 lines
4.2 KiB
Go
package payments
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/noderunners/nip05api/internal/audit"
|
|
"github.com/noderunners/nip05api/internal/dm"
|
|
"github.com/noderunners/nip05api/internal/invoice"
|
|
"github.com/noderunners/nip05api/internal/user"
|
|
"github.com/noderunners/nip05api/internal/webhook"
|
|
)
|
|
|
|
type Worker struct {
|
|
invoices *invoice.Service
|
|
users *user.Service
|
|
lnbits *invoice.LNbitsClient
|
|
dms *dm.Service
|
|
hooks *webhook.Service
|
|
audit *audit.Logger
|
|
domain string
|
|
frontend string
|
|
interval time.Duration
|
|
enabled bool
|
|
}
|
|
|
|
func NewWorker(inv *invoice.Service, u *user.Service, ln *invoice.LNbitsClient, dms *dm.Service, hooks *webhook.Service, aud *audit.Logger, domain, frontend string, enabled bool) *Worker {
|
|
return &Worker{
|
|
invoices: inv,
|
|
users: u,
|
|
lnbits: ln,
|
|
dms: dms,
|
|
hooks: hooks,
|
|
audit: aud,
|
|
domain: domain,
|
|
frontend: frontend,
|
|
interval: 5 * time.Second,
|
|
enabled: enabled,
|
|
}
|
|
}
|
|
|
|
func (w *Worker) Run(ctx context.Context) {
|
|
if !w.enabled {
|
|
<-ctx.Done()
|
|
return
|
|
}
|
|
t := time.NewTicker(w.interval)
|
|
defer t.Stop()
|
|
cleanup := time.NewTicker(15 * time.Minute)
|
|
defer cleanup.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
w.tick(ctx)
|
|
case <-cleanup.C:
|
|
_ = w.invoices.Repo().PurgeOldUnpaid(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *Worker) tick(ctx context.Context) {
|
|
pending, err := w.invoices.Repo().ListUnpaid(ctx)
|
|
if err != nil {
|
|
slog.Error("payments list", "err", err)
|
|
return
|
|
}
|
|
for _, p := range pending {
|
|
paid, err := w.lnbits.Status(ctx, p.PaymentHash)
|
|
if err != nil {
|
|
slog.Warn("lnbits status", "hash", p.PaymentHash, "err", err)
|
|
continue
|
|
}
|
|
if !paid {
|
|
continue
|
|
}
|
|
if err := w.confirm(ctx, p); err != nil {
|
|
slog.Error("confirm payment", "hash", p.PaymentHash, "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// confirm completes a paid invoice idempotently. Crash recovery is safe at any
|
|
// point: we capture the target expiry once, then apply absolute updates.
|
|
func (w *Worker) confirm(ctx context.Context, p *invoice.PendingInvoice) error {
|
|
existing, getErr := w.users.Repo().GetByPubkey(ctx, p.Pubkey)
|
|
if getErr != nil && !errors.Is(getErr, user.ErrUserNotFound) {
|
|
return getErr
|
|
}
|
|
|
|
target, err := w.resolveTarget(ctx, p, existing)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
wasNew := errors.Is(getErr, user.ErrUserNotFound)
|
|
if wasNew {
|
|
u := &user.User{
|
|
Pubkey: p.Pubkey,
|
|
Username: p.Username,
|
|
SubscriptionType: p.SubscriptionType,
|
|
ExpiresAt: target,
|
|
IsActive: true,
|
|
}
|
|
if err := w.users.Repo().Insert(ctx, u); err != nil {
|
|
// Likely UNIQUE constraint from a concurrent recovery attempt;
|
|
// re-fetch and treat as existing.
|
|
existing2, err2 := w.users.Repo().GetByPubkey(ctx, p.Pubkey)
|
|
if err2 != nil {
|
|
return err
|
|
}
|
|
existing = existing2
|
|
wasNew = false
|
|
}
|
|
}
|
|
if !wasNew {
|
|
if err := w.users.Repo().SetActiveExpiry(ctx, p.Pubkey, p.SubscriptionType, target); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
claimed, err := w.invoices.Repo().ClaimPaid(ctx, p.PaymentHash)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !claimed {
|
|
return nil // another tick already dispatched events
|
|
}
|
|
|
|
final, err := w.users.Repo().GetByPubkey(ctx, p.Pubkey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dmEvent := dm.EventWelcome
|
|
if !wasNew && p.IsRenewal {
|
|
dmEvent = dm.EventExtended
|
|
}
|
|
w.dispatchEvents(ctx, final, p, dmEvent)
|
|
return nil
|
|
}
|
|
|
|
// resolveTarget returns the canonical expiry to apply to the user. Persisted
|
|
// on first call so retries see the same value.
|
|
func (w *Worker) resolveTarget(ctx context.Context, p *invoice.PendingInvoice, existing *user.User) (*time.Time, error) {
|
|
if p.TargetSet {
|
|
return p.TargetExpiresAt, nil
|
|
}
|
|
target := computeTarget(p, existing, time.Now().UTC())
|
|
if _, err := w.invoices.Repo().SetTargetIfUnset(ctx, p.PaymentHash, target); err != nil {
|
|
return nil, err
|
|
}
|
|
fresh, err := w.invoices.Repo().Get(ctx, p.PaymentHash)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return fresh.TargetExpiresAt, nil
|
|
}
|
|
|
|
func computeTarget(p *invoice.PendingInvoice, existing *user.User, now time.Time) *time.Time {
|
|
if p.SubscriptionType == user.SubLifetime {
|
|
return nil
|
|
}
|
|
var current *time.Time
|
|
if existing != nil {
|
|
current = existing.ExpiresAt
|
|
}
|
|
return user.YearlyTargetExpiry(current, p.Years, now)
|
|
}
|
|
|