140 lines
3.2 KiB
Go
140 lines
3.2 KiB
Go
package sync
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/noderunners/nip05api/internal/nostr"
|
|
"github.com/noderunners/nip05api/internal/user"
|
|
)
|
|
|
|
type Worker struct {
|
|
users *user.Service
|
|
pool *nostr.Pool
|
|
interval time.Duration
|
|
enabled bool
|
|
domain string
|
|
reserved []string
|
|
}
|
|
|
|
func NewWorker(users *user.Service, pool *nostr.Pool, intervalMins int, enabled bool, domain string, reserved []string) *Worker {
|
|
if intervalMins <= 0 {
|
|
intervalMins = 15
|
|
}
|
|
return &Worker{
|
|
users: users,
|
|
pool: pool,
|
|
interval: time.Duration(intervalMins) * time.Minute,
|
|
enabled: enabled,
|
|
domain: domain,
|
|
reserved: reserved,
|
|
}
|
|
}
|
|
|
|
func (w *Worker) Run(ctx context.Context) {
|
|
if !w.enabled {
|
|
<-ctx.Done()
|
|
return
|
|
}
|
|
t := time.NewTicker(w.interval)
|
|
defer t.Stop()
|
|
w.RunOnce(ctx)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
w.RunOnce(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *Worker) RunOnce(ctx context.Context) {
|
|
runCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
|
defer cancel()
|
|
|
|
stale := time.Now().Add(-24 * time.Hour)
|
|
users, err := w.users.Repo().ListForSync(runCtx, stale)
|
|
if err != nil {
|
|
slog.Error("sync list", "err", err)
|
|
return
|
|
}
|
|
if len(users) == 0 {
|
|
return
|
|
}
|
|
slog.Info("profile sync starting", "count", len(users))
|
|
updated := 0
|
|
for _, u := range users {
|
|
if runCtx.Err() != nil {
|
|
break
|
|
}
|
|
if w.syncOne(runCtx, u) {
|
|
updated++
|
|
}
|
|
}
|
|
slog.Info("profile sync complete", "updated", updated, "checked", len(users))
|
|
}
|
|
|
|
// syncOne fetches the latest kind:0 profile for u and rewrites the username
|
|
// when a better candidate is available. Returns true when the username was
|
|
// actually changed in the database. The user's last_synced_at is always
|
|
// touched so the worker doesn't re-poll the same pubkey on every tick.
|
|
func (w *Worker) syncOne(ctx context.Context, u *user.User) bool {
|
|
md, err := nostr.FetchMetadata(ctx, w.pool, u.Pubkey)
|
|
now := time.Now().UTC()
|
|
u.LastSyncedAt = &now
|
|
|
|
touch := func(reason string, extra ...any) {
|
|
if err := w.users.Repo().Update(ctx, u); err != nil {
|
|
slog.Error("sync update", "pubkey", u.Pubkey, "err", err)
|
|
return
|
|
}
|
|
args := append([]any{"pubkey", u.Pubkey, "reason", reason}, extra...)
|
|
slog.Debug("profile sync skipped", args...)
|
|
}
|
|
|
|
if err != nil {
|
|
touch("fetch_error", "err", err)
|
|
return false
|
|
}
|
|
if md == nil {
|
|
touch("no_metadata")
|
|
return false
|
|
}
|
|
|
|
candidate := user.CandidateFromMetadata(
|
|
md.Name, md.DisplayName, md.DisplayNameAlt, md.Username,
|
|
md.NIP05, w.domain, w.reserved,
|
|
)
|
|
if candidate == "" {
|
|
touch("empty_candidate")
|
|
return false
|
|
}
|
|
if candidate == u.Username {
|
|
touch("unchanged")
|
|
return false
|
|
}
|
|
|
|
other, err := w.users.Repo().GetByUsername(ctx, candidate)
|
|
if err != nil && !errors.Is(err, user.ErrUserNotFound) {
|
|
touch("lookup_error", "err", err)
|
|
return false
|
|
}
|
|
if other != nil && other.Pubkey != u.Pubkey {
|
|
touch("taken", "candidate", candidate)
|
|
return false
|
|
}
|
|
|
|
previous := u.Username
|
|
u.Username = candidate
|
|
if err := w.users.Repo().Update(ctx, u); err != nil {
|
|
slog.Error("sync update", "pubkey", u.Pubkey, "err", err)
|
|
return false
|
|
}
|
|
slog.Info("profile sync username updated",
|
|
"pubkey", u.Pubkey, "from", previous, "to", candidate)
|
|
return true
|
|
}
|