Files
Nip-05-api/internal/sync/worker.go
2026-04-29 02:35:00 +00:00

140 lines
3.2 KiB
Go

package sync
import (
"context"
"errors"
"log/slog"
"time"
"github.com/noderunners/nip05api/internal/nostr"
"github.com/noderunners/nip05api/internal/user"
)
type Worker struct {
users *user.Service
pool *nostr.Pool
interval time.Duration
enabled bool
domain string
reserved []string
}
func NewWorker(users *user.Service, pool *nostr.Pool, intervalMins int, enabled bool, domain string, reserved []string) *Worker {
if intervalMins <= 0 {
intervalMins = 15
}
return &Worker{
users: users,
pool: pool,
interval: time.Duration(intervalMins) * time.Minute,
enabled: enabled,
domain: domain,
reserved: reserved,
}
}
func (w *Worker) Run(ctx context.Context) {
if !w.enabled {
<-ctx.Done()
return
}
t := time.NewTicker(w.interval)
defer t.Stop()
w.RunOnce(ctx)
for {
select {
case <-ctx.Done():
return
case <-t.C:
w.RunOnce(ctx)
}
}
}
func (w *Worker) RunOnce(ctx context.Context) {
runCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
stale := time.Now().Add(-24 * time.Hour)
users, err := w.users.Repo().ListForSync(runCtx, stale)
if err != nil {
slog.Error("sync list", "err", err)
return
}
if len(users) == 0 {
return
}
slog.Info("profile sync starting", "count", len(users))
updated := 0
for _, u := range users {
if runCtx.Err() != nil {
break
}
if w.syncOne(runCtx, u) {
updated++
}
}
slog.Info("profile sync complete", "updated", updated, "checked", len(users))
}
// syncOne fetches the latest kind:0 profile for u and rewrites the username
// when a better candidate is available. Returns true when the username was
// actually changed in the database. The user's last_synced_at is always
// touched so the worker doesn't re-poll the same pubkey on every tick.
func (w *Worker) syncOne(ctx context.Context, u *user.User) bool {
md, err := nostr.FetchMetadata(ctx, w.pool, u.Pubkey)
now := time.Now().UTC()
u.LastSyncedAt = &now
touch := func(reason string, extra ...any) {
if err := w.users.Repo().Update(ctx, u); err != nil {
slog.Error("sync update", "pubkey", u.Pubkey, "err", err)
return
}
args := append([]any{"pubkey", u.Pubkey, "reason", reason}, extra...)
slog.Debug("profile sync skipped", args...)
}
if err != nil {
touch("fetch_error", "err", err)
return false
}
if md == nil {
touch("no_metadata")
return false
}
candidate := user.CandidateFromMetadata(
md.Name, md.DisplayName, md.DisplayNameAlt, md.Username,
md.NIP05, w.domain, w.reserved,
)
if candidate == "" {
touch("empty_candidate")
return false
}
if candidate == u.Username {
touch("unchanged")
return false
}
other, err := w.users.Repo().GetByUsername(ctx, candidate)
if err != nil && !errors.Is(err, user.ErrUserNotFound) {
touch("lookup_error", "err", err)
return false
}
if other != nil && other.Pubkey != u.Pubkey {
touch("taken", "candidate", candidate)
return false
}
previous := u.Username
u.Username = candidate
if err := w.users.Repo().Update(ctx, u); err != nil {
slog.Error("sync update", "pubkey", u.Pubkey, "err", err)
return false
}
slog.Info("profile sync username updated",
"pubkey", u.Pubkey, "from", previous, "to", candidate)
return true
}