Files
Nip-05-api/internal/webhook/worker_test.go
2026-04-29 02:35:00 +00:00

180 lines
4.5 KiB
Go

package webhook
import (
"context"
"io"
"net/http"
"net/http/httptest"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/noderunners/nip05api/internal/db"
)
func setupRepo(t *testing.T) *Repo {
t.Helper()
d, err := db.Open(filepath.Join(t.TempDir(), "test.db"))
if err != nil {
t.Fatal(err)
}
if err := d.Migrate(context.Background()); err != nil {
t.Fatal(err)
}
t.Cleanup(func() { d.Close() })
return NewRepo(d)
}
func TestWorker_DeliversAndMarksDelivered(t *testing.T) {
repo := setupRepo(t)
var (
mu sync.Mutex
hits int32
gotSig string
gotEvent string
)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&hits, 1)
mu.Lock()
gotSig = r.Header.Get("X-Webhook-Signature")
gotEvent = r.Header.Get("X-Webhook-Event")
mu.Unlock()
_, _ = io.Copy(io.Discard, r.Body)
w.WriteHeader(http.StatusNoContent)
}))
t.Cleanup(srv.Close)
svc := NewService(repo, "test.local", true)
if err := svc.Enqueue(context.Background(), EventUserPaid, map[string]any{"username": "alice"}); err != nil {
t.Fatal(err)
}
w := &Worker{
repo: repo,
url: srv.URL,
secret: "topsecret",
timeout: 2 * time.Second,
maxRetries: 5,
hc: &http.Client{Timeout: 2 * time.Second},
}
w.tick(context.Background())
if atomic.LoadInt32(&hits) == 0 {
t.Fatal("webhook never received")
}
mu.Lock()
defer mu.Unlock()
if gotEvent != "user.paid" {
t.Errorf("event header: %q", gotEvent)
}
if len(gotSig) != 64 {
t.Errorf("signature header: %q", gotSig)
}
// Row should be marked delivered.
var status string
if err := repo.db.QueryRowContext(context.Background(),
`SELECT status FROM webhook_outbox LIMIT 1`).Scan(&status); err != nil {
t.Fatal(err)
}
if status != string(StatusDelivered) {
t.Errorf("expected delivered, got %s", status)
}
}
func TestWorker_RetriesOn5xx(t *testing.T) {
repo := setupRepo(t)
var hits int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&hits, 1)
_, _ = io.Copy(io.Discard, r.Body)
w.WriteHeader(http.StatusInternalServerError)
}))
t.Cleanup(srv.Close)
svc := NewService(repo, "test.local", true)
if err := svc.Enqueue(context.Background(), EventUserAdded, map[string]any{}); err != nil {
t.Fatal(err)
}
// First attempt fires immediately (next_attempt_at = now).
w := &Worker{
repo: repo,
url: srv.URL,
timeout: 1 * time.Second,
maxRetries: 5,
hc: &http.Client{Timeout: 1 * time.Second},
}
w.tick(context.Background())
if atomic.LoadInt32(&hits) != 1 {
t.Fatalf("expected 1 attempt, got %d", hits)
}
// Verify row updated with attempts=1, status still pending.
rows, err := repo.Claim(context.Background(), 10)
if err != nil {
t.Fatal(err)
}
if len(rows) != 0 {
t.Errorf("row should not be claimable yet (next_attempt_at in future), got %d", len(rows))
}
var attempts int
if err := repo.db.QueryRowContext(context.Background(),
`SELECT attempts FROM webhook_outbox LIMIT 1`).Scan(&attempts); err != nil {
t.Fatal(err)
}
if attempts != 1 {
t.Errorf("expected attempts=1, got %d", attempts)
}
}
func TestWorker_DeadAfterMaxRetries(t *testing.T) {
repo := setupRepo(t)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
t.Cleanup(srv.Close)
svc := NewService(repo, "test.local", true)
_ = svc.Enqueue(context.Background(), EventUserAdded, map[string]any{})
w := &Worker{
repo: repo,
url: srv.URL,
timeout: 1 * time.Second,
maxRetries: 2,
hc: &http.Client{Timeout: 1 * time.Second},
}
// First failure → retry scheduled.
w.tick(context.Background())
// Force the row eligible again by rewinding next_attempt_at.
if _, err := repo.db.ExecContext(context.Background(),
`UPDATE webhook_outbox SET next_attempt_at = ?`,
time.Now().UTC().Format(time.RFC3339)); err != nil {
t.Fatal(err)
}
w.tick(context.Background())
var status string
if err := repo.db.QueryRowContext(context.Background(),
`SELECT status FROM webhook_outbox LIMIT 1`).Scan(&status); err != nil {
t.Fatal(err)
}
if status != string(StatusDead) {
t.Errorf("expected dead after 2 attempts, got %s", status)
}
}
func TestWorker_NoURL_NoOp(t *testing.T) {
repo := setupRepo(t)
w := NewWorker(repo, "", "", 5, 5)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
w.Run(ctx) // returns when ctx fires; should not hang.
}