first commit
This commit is contained in:
39
internal/webhook/model.go
Normal file
39
internal/webhook/model.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package webhook
|
||||
|
||||
import "time"
|
||||
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
EventUserPaid EventType = "user.paid"
|
||||
EventUserAdded EventType = "user.added"
|
||||
EventUserExtended EventType = "user.extended"
|
||||
EventUserRemoved EventType = "user.removed"
|
||||
)
|
||||
|
||||
type Status string
|
||||
|
||||
const (
|
||||
StatusPending Status = "pending"
|
||||
StatusDelivered Status = "delivered"
|
||||
StatusDead Status = "dead"
|
||||
)
|
||||
|
||||
type Payload struct {
|
||||
Event EventType `json:"event"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
Domain string `json:"domain"`
|
||||
Data map[string]any `json:"data"`
|
||||
}
|
||||
|
||||
type OutboxItem struct {
|
||||
ID int64
|
||||
EventType EventType
|
||||
Payload string
|
||||
Attempts int
|
||||
LastAttemptAt *time.Time
|
||||
NextAttemptAt time.Time
|
||||
Status Status
|
||||
LastError string
|
||||
CreatedAt time.Time
|
||||
}
|
||||
86
internal/webhook/repo.go
Normal file
86
internal/webhook/repo.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package webhook
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"time"
|
||||
|
||||
"github.com/noderunners/nip05api/internal/db"
|
||||
)
|
||||
|
||||
type Repo struct{ db *db.DB }
|
||||
|
||||
func NewRepo(d *db.DB) *Repo { return &Repo{db: d} }
|
||||
|
||||
func (r *Repo) Insert(ctx context.Context, eventType EventType, payload string) error {
|
||||
_, err := r.db.ExecContext(ctx, `INSERT INTO webhook_outbox
|
||||
(event_type, payload, next_attempt_at) VALUES (?, ?, ?)`,
|
||||
string(eventType), payload, time.Now().UTC().Format(time.RFC3339))
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *Repo) Claim(ctx context.Context, limit int) ([]*OutboxItem, error) {
|
||||
rows, err := r.db.QueryContext(ctx, `SELECT id, event_type, payload, attempts,
|
||||
last_attempt_at, next_attempt_at, status, COALESCE(last_error, ''), created_at
|
||||
FROM webhook_outbox
|
||||
WHERE status = 'pending' AND next_attempt_at <= ?
|
||||
ORDER BY next_attempt_at ASC LIMIT ?`,
|
||||
time.Now().UTC().Format(time.RFC3339), limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
out := []*OutboxItem{}
|
||||
for rows.Next() {
|
||||
var it OutboxItem
|
||||
var status, eventType string
|
||||
var lastAttempt, nextAttempt, created sql.NullString
|
||||
if err := rows.Scan(&it.ID, &eventType, &it.Payload, &it.Attempts,
|
||||
&lastAttempt, &nextAttempt, &status, &it.LastError, &created); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
it.EventType = EventType(eventType)
|
||||
it.Status = Status(status)
|
||||
if lastAttempt.Valid {
|
||||
if t, err := time.Parse(time.RFC3339, lastAttempt.String); err == nil {
|
||||
it.LastAttemptAt = &t
|
||||
}
|
||||
}
|
||||
if nextAttempt.Valid {
|
||||
if t, err := time.Parse(time.RFC3339, nextAttempt.String); err == nil {
|
||||
it.NextAttemptAt = t
|
||||
}
|
||||
}
|
||||
if created.Valid {
|
||||
if t, err := time.Parse(time.RFC3339, created.String); err == nil {
|
||||
it.CreatedAt = t
|
||||
} else if t, err := time.Parse("2006-01-02 15:04:05", created.String); err == nil {
|
||||
it.CreatedAt = t
|
||||
}
|
||||
}
|
||||
out = append(out, &it)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
func (r *Repo) MarkDelivered(ctx context.Context, id int64) error {
|
||||
_, err := r.db.ExecContext(ctx, `UPDATE webhook_outbox SET status = 'delivered',
|
||||
last_attempt_at = ?, last_error = '' WHERE id = ?`,
|
||||
time.Now().UTC().Format(time.RFC3339), id)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *Repo) MarkRetry(ctx context.Context, id int64, attempts int, nextAt time.Time, errMsg string) error {
|
||||
_, err := r.db.ExecContext(ctx, `UPDATE webhook_outbox SET attempts = ?,
|
||||
last_attempt_at = ?, next_attempt_at = ?, last_error = ? WHERE id = ?`,
|
||||
attempts, time.Now().UTC().Format(time.RFC3339),
|
||||
nextAt.UTC().Format(time.RFC3339), errMsg, id)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *Repo) MarkDead(ctx context.Context, id int64, errMsg string) error {
|
||||
_, err := r.db.ExecContext(ctx, `UPDATE webhook_outbox SET status = 'dead',
|
||||
last_attempt_at = ?, last_error = ? WHERE id = ?`,
|
||||
time.Now().UTC().Format(time.RFC3339), errMsg, id)
|
||||
return err
|
||||
}
|
||||
36
internal/webhook/service.go
Normal file
36
internal/webhook/service.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package webhook
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
repo *Repo
|
||||
domain string
|
||||
enabled bool
|
||||
}
|
||||
|
||||
func NewService(repo *Repo, domain string, enabled bool) *Service {
|
||||
return &Service{repo: repo, domain: domain, enabled: enabled}
|
||||
}
|
||||
|
||||
func (s *Service) Enabled() bool { return s.enabled }
|
||||
|
||||
func (s *Service) Enqueue(ctx context.Context, event EventType, data map[string]any) error {
|
||||
if !s.enabled {
|
||||
return nil
|
||||
}
|
||||
p := Payload{
|
||||
Event: event,
|
||||
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
||||
Domain: s.domain,
|
||||
Data: data,
|
||||
}
|
||||
b, err := json.Marshal(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.repo.Insert(ctx, event, string(b))
|
||||
}
|
||||
16
internal/webhook/signer.go
Normal file
16
internal/webhook/signer.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package webhook
|
||||
|
||||
import (
|
||||
"crypto/hmac"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
)
|
||||
|
||||
func Sign(secret string, body []byte) string {
|
||||
if secret == "" {
|
||||
return ""
|
||||
}
|
||||
mac := hmac.New(sha256.New, []byte(secret))
|
||||
mac.Write(body)
|
||||
return hex.EncodeToString(mac.Sum(nil))
|
||||
}
|
||||
21
internal/webhook/signer_test.go
Normal file
21
internal/webhook/signer_test.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package webhook
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestSign(t *testing.T) {
|
||||
if Sign("", []byte("hello")) != "" {
|
||||
t.Error("empty secret should return empty signature")
|
||||
}
|
||||
sig := Sign("supersecret", []byte("hello"))
|
||||
if len(sig) != 64 {
|
||||
t.Errorf("expected 64-char hex hmac, got %d: %q", len(sig), sig)
|
||||
}
|
||||
again := Sign("supersecret", []byte("hello"))
|
||||
if sig != again {
|
||||
t.Error("signature should be deterministic")
|
||||
}
|
||||
other := Sign("different", []byte("hello"))
|
||||
if sig == other {
|
||||
t.Error("different secret should produce different signature")
|
||||
}
|
||||
}
|
||||
123
internal/webhook/worker.go
Normal file
123
internal/webhook/worker.go
Normal file
@@ -0,0 +1,123 @@
|
||||
package webhook
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var retrySchedule = []time.Duration{
|
||||
30 * time.Second,
|
||||
2 * time.Minute,
|
||||
10 * time.Minute,
|
||||
1 * time.Hour,
|
||||
6 * time.Hour,
|
||||
}
|
||||
|
||||
type Worker struct {
|
||||
repo *Repo
|
||||
url string
|
||||
secret string
|
||||
timeout time.Duration
|
||||
maxRetries int
|
||||
hc *http.Client
|
||||
}
|
||||
|
||||
func NewWorker(repo *Repo, url, secret string, timeoutSecs, maxRetries int) *Worker {
|
||||
if maxRetries <= 0 {
|
||||
maxRetries = len(retrySchedule)
|
||||
}
|
||||
return &Worker{
|
||||
repo: repo,
|
||||
url: url,
|
||||
secret: secret,
|
||||
timeout: time.Duration(timeoutSecs) * time.Second,
|
||||
maxRetries: maxRetries,
|
||||
hc: &http.Client{Timeout: time.Duration(timeoutSecs) * time.Second},
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) Run(ctx context.Context) {
|
||||
if w.url == "" {
|
||||
<-ctx.Done()
|
||||
return
|
||||
}
|
||||
t := time.NewTicker(1 * time.Second)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
w.tick(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) tick(ctx context.Context) {
|
||||
items, err := w.repo.Claim(ctx, 5)
|
||||
if err != nil {
|
||||
slog.Error("webhook claim", "err", err)
|
||||
return
|
||||
}
|
||||
if len(items) == 0 {
|
||||
return
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
for _, it := range items {
|
||||
wg.Add(1)
|
||||
go func(it *OutboxItem) {
|
||||
defer wg.Done()
|
||||
w.deliver(ctx, it)
|
||||
}(it)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (w *Worker) deliver(ctx context.Context, it *OutboxItem) {
|
||||
body := []byte(it.Payload)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, w.url, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
w.handleErr(ctx, it, err)
|
||||
return
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("User-Agent", "nip05api/1.0")
|
||||
req.Header.Set("X-Webhook-Event", string(it.EventType))
|
||||
if sig := Sign(w.secret, body); sig != "" {
|
||||
req.Header.Set("X-Webhook-Signature", sig)
|
||||
}
|
||||
|
||||
resp, err := w.hc.Do(req)
|
||||
if err != nil {
|
||||
w.handleErr(ctx, it, err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode/100 == 2 {
|
||||
_ = w.repo.MarkDelivered(ctx, it.ID)
|
||||
slog.Info("webhook delivered", "event", it.EventType, "id", it.ID)
|
||||
return
|
||||
}
|
||||
w.handleErr(ctx, it, fmt.Errorf("status %d", resp.StatusCode))
|
||||
}
|
||||
|
||||
func (w *Worker) handleErr(ctx context.Context, it *OutboxItem, err error) {
|
||||
attempts := it.Attempts + 1
|
||||
if attempts >= w.maxRetries {
|
||||
_ = w.repo.MarkDead(ctx, it.ID, err.Error())
|
||||
slog.Error("webhook dead", "id", it.ID, "err", err)
|
||||
return
|
||||
}
|
||||
idx := attempts - 1
|
||||
if idx >= len(retrySchedule) {
|
||||
idx = len(retrySchedule) - 1
|
||||
}
|
||||
next := time.Now().UTC().Add(retrySchedule[idx])
|
||||
_ = w.repo.MarkRetry(ctx, it.ID, attempts, next, err.Error())
|
||||
slog.Warn("webhook retry", "id", it.ID, "attempts", attempts, "next", next, "err", err)
|
||||
}
|
||||
179
internal/webhook/worker_test.go
Normal file
179
internal/webhook/worker_test.go
Normal file
@@ -0,0 +1,179 @@
|
||||
package webhook
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/noderunners/nip05api/internal/db"
|
||||
)
|
||||
|
||||
func setupRepo(t *testing.T) *Repo {
|
||||
t.Helper()
|
||||
d, err := db.Open(filepath.Join(t.TempDir(), "test.db"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := d.Migrate(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Cleanup(func() { d.Close() })
|
||||
return NewRepo(d)
|
||||
}
|
||||
|
||||
func TestWorker_DeliversAndMarksDelivered(t *testing.T) {
|
||||
repo := setupRepo(t)
|
||||
var (
|
||||
mu sync.Mutex
|
||||
hits int32
|
||||
gotSig string
|
||||
gotEvent string
|
||||
)
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&hits, 1)
|
||||
mu.Lock()
|
||||
gotSig = r.Header.Get("X-Webhook-Signature")
|
||||
gotEvent = r.Header.Get("X-Webhook-Event")
|
||||
mu.Unlock()
|
||||
_, _ = io.Copy(io.Discard, r.Body)
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
svc := NewService(repo, "test.local", true)
|
||||
if err := svc.Enqueue(context.Background(), EventUserPaid, map[string]any{"username": "alice"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
w := &Worker{
|
||||
repo: repo,
|
||||
url: srv.URL,
|
||||
secret: "topsecret",
|
||||
timeout: 2 * time.Second,
|
||||
maxRetries: 5,
|
||||
hc: &http.Client{Timeout: 2 * time.Second},
|
||||
}
|
||||
w.tick(context.Background())
|
||||
|
||||
if atomic.LoadInt32(&hits) == 0 {
|
||||
t.Fatal("webhook never received")
|
||||
}
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if gotEvent != "user.paid" {
|
||||
t.Errorf("event header: %q", gotEvent)
|
||||
}
|
||||
if len(gotSig) != 64 {
|
||||
t.Errorf("signature header: %q", gotSig)
|
||||
}
|
||||
|
||||
// Row should be marked delivered.
|
||||
var status string
|
||||
if err := repo.db.QueryRowContext(context.Background(),
|
||||
`SELECT status FROM webhook_outbox LIMIT 1`).Scan(&status); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if status != string(StatusDelivered) {
|
||||
t.Errorf("expected delivered, got %s", status)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWorker_RetriesOn5xx(t *testing.T) {
|
||||
repo := setupRepo(t)
|
||||
var hits int32
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&hits, 1)
|
||||
_, _ = io.Copy(io.Discard, r.Body)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
svc := NewService(repo, "test.local", true)
|
||||
if err := svc.Enqueue(context.Background(), EventUserAdded, map[string]any{}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// First attempt fires immediately (next_attempt_at = now).
|
||||
w := &Worker{
|
||||
repo: repo,
|
||||
url: srv.URL,
|
||||
timeout: 1 * time.Second,
|
||||
maxRetries: 5,
|
||||
hc: &http.Client{Timeout: 1 * time.Second},
|
||||
}
|
||||
|
||||
w.tick(context.Background())
|
||||
|
||||
if atomic.LoadInt32(&hits) != 1 {
|
||||
t.Fatalf("expected 1 attempt, got %d", hits)
|
||||
}
|
||||
|
||||
// Verify row updated with attempts=1, status still pending.
|
||||
rows, err := repo.Claim(context.Background(), 10)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(rows) != 0 {
|
||||
t.Errorf("row should not be claimable yet (next_attempt_at in future), got %d", len(rows))
|
||||
}
|
||||
|
||||
var attempts int
|
||||
if err := repo.db.QueryRowContext(context.Background(),
|
||||
`SELECT attempts FROM webhook_outbox LIMIT 1`).Scan(&attempts); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if attempts != 1 {
|
||||
t.Errorf("expected attempts=1, got %d", attempts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWorker_DeadAfterMaxRetries(t *testing.T) {
|
||||
repo := setupRepo(t)
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
svc := NewService(repo, "test.local", true)
|
||||
_ = svc.Enqueue(context.Background(), EventUserAdded, map[string]any{})
|
||||
|
||||
w := &Worker{
|
||||
repo: repo,
|
||||
url: srv.URL,
|
||||
timeout: 1 * time.Second,
|
||||
maxRetries: 2,
|
||||
hc: &http.Client{Timeout: 1 * time.Second},
|
||||
}
|
||||
// First failure → retry scheduled.
|
||||
w.tick(context.Background())
|
||||
// Force the row eligible again by rewinding next_attempt_at.
|
||||
if _, err := repo.db.ExecContext(context.Background(),
|
||||
`UPDATE webhook_outbox SET next_attempt_at = ?`,
|
||||
time.Now().UTC().Format(time.RFC3339)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w.tick(context.Background())
|
||||
|
||||
var status string
|
||||
if err := repo.db.QueryRowContext(context.Background(),
|
||||
`SELECT status FROM webhook_outbox LIMIT 1`).Scan(&status); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if status != string(StatusDead) {
|
||||
t.Errorf("expected dead after 2 attempts, got %s", status)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWorker_NoURL_NoOp(t *testing.T) {
|
||||
repo := setupRepo(t)
|
||||
w := NewWorker(repo, "", "", 5, 5)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancel()
|
||||
w.Run(ctx) // returns when ctx fires; should not hang.
|
||||
}
|
||||
Reference in New Issue
Block a user