Documentation
¶
Overview ¶
Package webhook provides HMAC-SHA256 signing and dispatch for nSelf webhook deliveries.
Index ¶
- Variables
- func GenerateSecret() (string, error)
- func InjectHeaders(r *http.Request, secret, deliveryID, eventName string, ts time.Time, ...)
- func NewWebhookClient(t time.Duration) *http.Client
- func NextRetryAt(attempt int) time.Time
- func Sign(secret string, unixSeconds int64, body []byte) string
- func ValidateWebhookURL(rawURL string) error
- func Verify(secret string, unixSeconds int64, body []byte, headerValue string) bool
- type Circuit
- type CircuitState
- type DLQEntry
- type DLQManager
- type DLQStore
- type Delivery
- type DeliveryStore
- type Dispatcher
- type DispatcherConfig
Constants ¶
This section is empty.
Variables ¶
var ErrDLQEntryNotFound = fmt.Errorf("webhook: DLQ entry not found")
ErrDLQEntryNotFound is returned when a DLQ entry ID does not exist.
Functions ¶
func GenerateSecret ¶
GenerateSecret returns a 32-byte hex-encoded HMAC signing secret.
func InjectHeaders ¶
func InjectHeaders(r *http.Request, secret, deliveryID, eventName string, ts time.Time, body []byte)
InjectHeaders adds the nSelf webhook authentication headers to r.
- X-Nself-Signature-256: sha256=<hmac>
- X-Nself-Delivery: <delivery_uuid>
- X-Nself-Event: <event_name>
- X-Nself-Timestamp: <unix_seconds>
If secret is empty, the signature header is omitted (unsigned delivery).
func NewWebhookClient ¶ added in v1.1.3
NewWebhookClient returns an *http.Client with SSRF-safe transport for webhook delivery. Timeout defaults to 30 s when t == 0. Use this instead of a plain &http.Client{} for all outbound webhook requests. Provides defence-in-depth: dial-time IP validation supplements the pre-flight ValidateWebhookURL check against DNS-rebinding attacks.
func NextRetryAt ¶
NextRetryAt computes the next_retry_at timestamp for attempt n (1-indexed). Returns zero time if n exceeds maxAttempts (should move to DLQ instead).
func Sign ¶
Sign computes the HMAC-SHA256 signature for an outbound webhook payload.
Signature input: "<unix_seconds>.<body_bytes>" Header: X-Nself-Signature-256: sha256=<hex_hmac>
func ValidateWebhookURL ¶ added in v1.1.3
ValidateWebhookURL returns an error when rawURL targets a private/loopback/ link-local or cloud-metadata address, or uses a non-http(s) scheme. Call this before dispatching any outbound webhook request.
Set SSRF_ALLOWED_HOSTS=host1,host2 to allowlist specific hostnames (bypasses the IP/DNS check; scheme check always applies).
Types ¶
type Circuit ¶
type Circuit struct {
// contains filtered or unexported fields
}
Circuit is a per-endpoint circuit breaker. All methods are safe for concurrent use.
func (*Circuit) Allow ¶
Allow reports whether a delivery attempt should proceed. Open circuits only allow attempts after the probe interval elapses (half-open transition).
func (*Circuit) RecordFailure ¶
func (c *Circuit) RecordFailure()
RecordFailure records a failed delivery. After openThreshold consecutive failures the circuit trips to Open.
func (*Circuit) RecordSuccess ¶
func (c *Circuit) RecordSuccess()
RecordSuccess records a successful delivery.
func (*Circuit) State ¶
func (c *Circuit) State() CircuitState
State returns the current circuit state.
type CircuitState ¶
type CircuitState string
CircuitState represents the state of an endpoint's circuit breaker.
const ( // CircuitClosed is the normal operating state — deliveries proceed. CircuitClosed CircuitState = "closed" // CircuitOpen means the endpoint is failing; deliveries are skipped. CircuitOpen CircuitState = "open" // CircuitHalfOpen means a single probe is allowed to test recovery. CircuitHalfOpen CircuitState = "half-open" )
type DLQEntry ¶
type DLQEntry struct {
ID string `json:"id"`
EndpointID string `json:"endpoint_id"`
DeliveryID string `json:"delivery_id"`
Payload []byte `json:"payload"`
FinalError string `json:"final_error"`
QuarantinedAt time.Time `json:"quarantined_at"`
ReEnqueuedAt *time.Time `json:"re_enqueued_at,omitempty"`
}
DLQEntry represents a row in the np_webhook_dlq table.
type DLQManager ¶
type DLQManager struct {
// contains filtered or unexported fields
}
DLQManager wraps a DLQStore and exposes higher-level operations.
func NewDLQManager ¶
func NewDLQManager(store DLQStore) *DLQManager
NewDLQManager returns a DLQManager backed by store.
func (*DLQManager) Quarantine ¶
func (m *DLQManager) Quarantine(ctx context.Context, endpointID, deliveryID string, payload []byte, finalErr string) error
Quarantine moves a delivery to the DLQ after exhausting all retries.
func (*DLQManager) ReEnqueue ¶
func (m *DLQManager) ReEnqueue(ctx context.Context, id string) error
ReEnqueue re-queues a DLQ entry for immediate retry. The entry's parent delivery has its attempt_count reset to 0 and next_retry_at set to now. Re-enqueue completes within 5 seconds (enforced by the store implementation).
type DLQStore ¶
type DLQStore interface {
// Save persists a new DLQ entry (quarantine).
Save(ctx context.Context, entry DLQEntry) error
// List returns paginated DLQ entries, most recent first.
List(ctx context.Context, limit, offset int) ([]DLQEntry, error)
// ReEnqueue marks the entry as re-enqueued and resets its parent delivery for retry.
// Returns ErrDLQEntryNotFound if id does not exist.
ReEnqueue(ctx context.Context, id string) error
}
DLQStore defines the persistence contract for DLQ operations. Implementations provide the DB-specific logic (Postgres, mock, etc.).
type Delivery ¶
type Delivery struct {
ID string
EndpointID string
EndpointURL string
SigningSecret string // may be empty for unsigned endpoints
EventName string
Payload []byte
AttemptCount int
// Delivered is set by the store when a prior attempt succeeded.
// Dispatcher skips any delivery where Delivered is true (idempotency guard).
Delivered bool
}
Delivery represents a queued outbound webhook delivery.
type DeliveryStore ¶
type DeliveryStore interface {
// NextPending returns up to n deliveries whose next_retry_at <= now and delivered = false.
NextPending(ctx context.Context, n int) ([]Delivery, error)
// MarkDelivered marks a delivery as successfully delivered.
MarkDelivered(ctx context.Context, id string, responseMS int) error
// ScheduleRetry increments attempt_count and sets next_retry_at.
ScheduleRetry(ctx context.Context, id string, nextAttempt int, at time.Time) error
// UpdateHealth updates endpoint health_score and circuit_state.
UpdateHealth(ctx context.Context, endpointID string, score int, state CircuitState) error
}
DeliveryStore is the persistence contract used by the Dispatcher.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher is the goroutine pool that picks up pending deliveries and sends them.
func NewDispatcher ¶
func NewDispatcher(cfg DispatcherConfig, store DeliveryStore, dlq *DLQManager) *Dispatcher
NewDispatcher creates a Dispatcher. Call Run to start the worker pool.
func (*Dispatcher) Run ¶
func (d *Dispatcher) Run(ctx context.Context)
Run starts the dispatcher and blocks until ctx is cancelled.
type DispatcherConfig ¶
type DispatcherConfig struct {
Workers int // goroutine pool size (default 8)
BatchSize int // max deliveries fetched per tick (default 32)
PollTick time.Duration // how often to poll for pending deliveries (default 5s)
}
DispatcherConfig controls Dispatcher concurrency and polling.