webhook

package
v1.0.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package webhook provides HMAC-SHA256 signing and dispatch for nSelf webhook deliveries.

Index

Constants

This section is empty.

Variables

View Source
var ErrDLQEntryNotFound = fmt.Errorf("webhook: DLQ entry not found")

ErrDLQEntryNotFound is returned when a DLQ entry ID does not exist.

Functions

func GenerateSecret

func GenerateSecret() (string, error)

GenerateSecret returns a 32-byte hex-encoded HMAC signing secret.

func InjectHeaders

func InjectHeaders(r *http.Request, secret, deliveryID, eventName string, ts time.Time, body []byte)

InjectHeaders adds the nSelf webhook authentication headers to r.

  • X-Nself-Signature-256: sha256=<hmac>
  • X-Nself-Delivery: <delivery_uuid>
  • X-Nself-Event: <event_name>
  • X-Nself-Timestamp: <unix_seconds>

If secret is empty, the signature header is omitted (unsigned delivery).

func NextRetryAt

func NextRetryAt(attempt int) time.Time

NextRetryAt computes the next_retry_at timestamp for attempt n (1-indexed). Returns zero time if n exceeds maxAttempts (should move to DLQ instead).

func Sign

func Sign(secret string, unixSeconds int64, body []byte) string

Sign computes the HMAC-SHA256 signature for an outbound webhook payload.

Signature input: "<unix_seconds>.<body_bytes>" Header: X-Nself-Signature-256: sha256=<hex_hmac>

func Verify

func Verify(secret string, unixSeconds int64, body []byte, headerValue string) bool

Verify checks whether the X-Nself-Signature-256 header value matches the payload. It uses a constant-time comparison to prevent timing attacks.

Types

type Circuit

type Circuit struct {
	// contains filtered or unexported fields
}

Circuit is a per-endpoint circuit breaker. All methods are safe for concurrent use.

func NewCircuit

func NewCircuit() *Circuit

NewCircuit returns a circuit in the Closed state.

func (*Circuit) Allow

func (c *Circuit) Allow() bool

Allow reports whether a delivery attempt should proceed. Open circuits only allow attempts after the probe interval elapses (half-open transition).

func (*Circuit) RecordFailure

func (c *Circuit) RecordFailure()

RecordFailure records a failed delivery. After openThreshold consecutive failures the circuit trips to Open.

func (*Circuit) RecordSuccess

func (c *Circuit) RecordSuccess()

RecordSuccess records a successful delivery.

func (*Circuit) State

func (c *Circuit) State() CircuitState

State returns the current circuit state.

type CircuitState

type CircuitState string

CircuitState represents the state of an endpoint's circuit breaker.

const (
	// CircuitClosed is the normal operating state — deliveries proceed.
	CircuitClosed CircuitState = "closed"
	// CircuitOpen means the endpoint is failing; deliveries are skipped.
	CircuitOpen CircuitState = "open"
	// CircuitHalfOpen means a single probe is allowed to test recovery.
	CircuitHalfOpen CircuitState = "half-open"
)

type DLQEntry

type DLQEntry struct {
	ID            string     `json:"id"`
	EndpointID    string     `json:"endpoint_id"`
	DeliveryID    string     `json:"delivery_id"`
	Payload       []byte     `json:"payload"`
	FinalError    string     `json:"final_error"`
	QuarantinedAt time.Time  `json:"quarantined_at"`
	ReEnqueuedAt  *time.Time `json:"re_enqueued_at,omitempty"`
}

DLQEntry represents a row in the np_webhook_dlq table.

type DLQManager

type DLQManager struct {
	// contains filtered or unexported fields
}

DLQManager wraps a DLQStore and exposes higher-level operations.

func NewDLQManager

func NewDLQManager(store DLQStore) *DLQManager

NewDLQManager returns a DLQManager backed by store.

func (*DLQManager) List

func (m *DLQManager) List(ctx context.Context, limit, offset int) ([]DLQEntry, error)

List returns a paginated slice of DLQ entries.

func (*DLQManager) Quarantine

func (m *DLQManager) Quarantine(ctx context.Context, endpointID, deliveryID string, payload []byte, finalErr string) error

Quarantine moves a delivery to the DLQ after exhausting all retries.

func (*DLQManager) ReEnqueue

func (m *DLQManager) ReEnqueue(ctx context.Context, id string) error

ReEnqueue re-queues a DLQ entry for immediate retry. The entry's parent delivery has its attempt_count reset to 0 and next_retry_at set to now. Re-enqueue completes within 5 seconds (enforced by the store implementation).

type DLQStore

type DLQStore interface {
	// Save persists a new DLQ entry (quarantine).
	Save(ctx context.Context, entry DLQEntry) error
	// List returns paginated DLQ entries, most recent first.
	List(ctx context.Context, limit, offset int) ([]DLQEntry, error)
	// ReEnqueue marks the entry as re-enqueued and resets its parent delivery for retry.
	// Returns ErrDLQEntryNotFound if id does not exist.
	ReEnqueue(ctx context.Context, id string) error
}

DLQStore defines the persistence contract for DLQ operations. Implementations provide the DB-specific logic (Postgres, mock, etc.).

type Delivery

type Delivery struct {
	ID            string
	EndpointID    string
	EndpointURL   string
	SigningSecret string // may be empty for unsigned endpoints
	EventName     string
	Payload       []byte
	AttemptCount  int
	// Delivered is set by the store when a prior attempt succeeded.
	// Dispatcher skips any delivery where Delivered is true (idempotency guard).
	Delivered bool
}

Delivery represents a queued outbound webhook delivery.

type DeliveryStore

type DeliveryStore interface {
	// NextPending returns up to n deliveries whose next_retry_at <= now and delivered = false.
	NextPending(ctx context.Context, n int) ([]Delivery, error)
	// MarkDelivered marks a delivery as successfully delivered.
	MarkDelivered(ctx context.Context, id string, responseMS int) error
	// ScheduleRetry increments attempt_count and sets next_retry_at.
	ScheduleRetry(ctx context.Context, id string, nextAttempt int, at time.Time) error
	// UpdateHealth updates endpoint health_score and circuit_state.
	UpdateHealth(ctx context.Context, endpointID string, score int, state CircuitState) error
}

DeliveryStore is the persistence contract used by the Dispatcher.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher is the goroutine pool that picks up pending deliveries and sends them.

func NewDispatcher

func NewDispatcher(cfg DispatcherConfig, store DeliveryStore, dlq *DLQManager) *Dispatcher

NewDispatcher creates a Dispatcher. Call Run to start the worker pool.

func (*Dispatcher) Run

func (d *Dispatcher) Run(ctx context.Context)

Run starts the dispatcher and blocks until ctx is cancelled.

type DispatcherConfig

type DispatcherConfig struct {
	Workers   int           // goroutine pool size (default 8)
	BatchSize int           // max deliveries fetched per tick (default 32)
	PollTick  time.Duration // how often to poll for pending deliveries (default 5s)
}

DispatcherConfig controls Dispatcher concurrency and polling.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL