180 lines
4.5 KiB
Go
180 lines
4.5 KiB
Go
package webhook
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"path/filepath"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/noderunners/nip05api/internal/db"
|
|
)
|
|
|
|
func setupRepo(t *testing.T) *Repo {
|
|
t.Helper()
|
|
d, err := db.Open(filepath.Join(t.TempDir(), "test.db"))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if err := d.Migrate(context.Background()); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(func() { d.Close() })
|
|
return NewRepo(d)
|
|
}
|
|
|
|
func TestWorker_DeliversAndMarksDelivered(t *testing.T) {
|
|
repo := setupRepo(t)
|
|
var (
|
|
mu sync.Mutex
|
|
hits int32
|
|
gotSig string
|
|
gotEvent string
|
|
)
|
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
atomic.AddInt32(&hits, 1)
|
|
mu.Lock()
|
|
gotSig = r.Header.Get("X-Webhook-Signature")
|
|
gotEvent = r.Header.Get("X-Webhook-Event")
|
|
mu.Unlock()
|
|
_, _ = io.Copy(io.Discard, r.Body)
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}))
|
|
t.Cleanup(srv.Close)
|
|
|
|
svc := NewService(repo, "test.local", true)
|
|
if err := svc.Enqueue(context.Background(), EventUserPaid, map[string]any{"username": "alice"}); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
w := &Worker{
|
|
repo: repo,
|
|
url: srv.URL,
|
|
secret: "topsecret",
|
|
timeout: 2 * time.Second,
|
|
maxRetries: 5,
|
|
hc: &http.Client{Timeout: 2 * time.Second},
|
|
}
|
|
w.tick(context.Background())
|
|
|
|
if atomic.LoadInt32(&hits) == 0 {
|
|
t.Fatal("webhook never received")
|
|
}
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
if gotEvent != "user.paid" {
|
|
t.Errorf("event header: %q", gotEvent)
|
|
}
|
|
if len(gotSig) != 64 {
|
|
t.Errorf("signature header: %q", gotSig)
|
|
}
|
|
|
|
// Row should be marked delivered.
|
|
var status string
|
|
if err := repo.db.QueryRowContext(context.Background(),
|
|
`SELECT status FROM webhook_outbox LIMIT 1`).Scan(&status); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if status != string(StatusDelivered) {
|
|
t.Errorf("expected delivered, got %s", status)
|
|
}
|
|
}
|
|
|
|
func TestWorker_RetriesOn5xx(t *testing.T) {
|
|
repo := setupRepo(t)
|
|
var hits int32
|
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
atomic.AddInt32(&hits, 1)
|
|
_, _ = io.Copy(io.Discard, r.Body)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
}))
|
|
t.Cleanup(srv.Close)
|
|
|
|
svc := NewService(repo, "test.local", true)
|
|
if err := svc.Enqueue(context.Background(), EventUserAdded, map[string]any{}); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// First attempt fires immediately (next_attempt_at = now).
|
|
w := &Worker{
|
|
repo: repo,
|
|
url: srv.URL,
|
|
timeout: 1 * time.Second,
|
|
maxRetries: 5,
|
|
hc: &http.Client{Timeout: 1 * time.Second},
|
|
}
|
|
|
|
w.tick(context.Background())
|
|
|
|
if atomic.LoadInt32(&hits) != 1 {
|
|
t.Fatalf("expected 1 attempt, got %d", hits)
|
|
}
|
|
|
|
// Verify row updated with attempts=1, status still pending.
|
|
rows, err := repo.Claim(context.Background(), 10)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if len(rows) != 0 {
|
|
t.Errorf("row should not be claimable yet (next_attempt_at in future), got %d", len(rows))
|
|
}
|
|
|
|
var attempts int
|
|
if err := repo.db.QueryRowContext(context.Background(),
|
|
`SELECT attempts FROM webhook_outbox LIMIT 1`).Scan(&attempts); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if attempts != 1 {
|
|
t.Errorf("expected attempts=1, got %d", attempts)
|
|
}
|
|
}
|
|
|
|
func TestWorker_DeadAfterMaxRetries(t *testing.T) {
|
|
repo := setupRepo(t)
|
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
}))
|
|
t.Cleanup(srv.Close)
|
|
|
|
svc := NewService(repo, "test.local", true)
|
|
_ = svc.Enqueue(context.Background(), EventUserAdded, map[string]any{})
|
|
|
|
w := &Worker{
|
|
repo: repo,
|
|
url: srv.URL,
|
|
timeout: 1 * time.Second,
|
|
maxRetries: 2,
|
|
hc: &http.Client{Timeout: 1 * time.Second},
|
|
}
|
|
// First failure → retry scheduled.
|
|
w.tick(context.Background())
|
|
// Force the row eligible again by rewinding next_attempt_at.
|
|
if _, err := repo.db.ExecContext(context.Background(),
|
|
`UPDATE webhook_outbox SET next_attempt_at = ?`,
|
|
time.Now().UTC().Format(time.RFC3339)); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
w.tick(context.Background())
|
|
|
|
var status string
|
|
if err := repo.db.QueryRowContext(context.Background(),
|
|
`SELECT status FROM webhook_outbox LIMIT 1`).Scan(&status); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if status != string(StatusDead) {
|
|
t.Errorf("expected dead after 2 attempts, got %s", status)
|
|
}
|
|
}
|
|
|
|
func TestWorker_NoURL_NoOp(t *testing.T) {
|
|
repo := setupRepo(t)
|
|
w := NewWorker(repo, "", "", 5, 5)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
defer cancel()
|
|
w.Run(ctx) // returns when ctx fires; should not hang.
|
|
}
|