tickr

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2026 License: MIT Imports: 11 Imported by: 0

README

alt text

tickr

CI Go Reference Go Report Card Release

Reliable async messaging for Go microservices. Implements the transactional outbox pattern without an external broker — the storage table is the queue, and registered handlers run in-process via a horizontally-scalable worker pool.

  • At-least-once delivery with idempotent handlers
  • Transactional outbox: enqueue inside your own DB transaction
  • Exponential backoff with jitter, configurable per handler
  • Status machine (CREATED → HANDLING → SUCCESS / FAILED → RETRYING → DEAD) with per-message history
  • Delayed messages, dead-letter queue, idempotency-key dedup, per-event-type concurrency limits
  • Configurable poll interval and retention
  • First-class observability: Prometheus metrics, OpenTelemetry tracing with W3C propagation, ready-to-import Grafana dashboard
  • Pluggable storage — v1 ships a PostgreSQL adapter (pgx/v5, SELECT … FOR UPDATE SKIP LOCKED)

Target throughput: 1M messages/minute across a horizontally scaled fleet.

Layout

tickr/                       core API (Client, Worker, types, Storage interface)
  storage/postgres/          PostgreSQL adapter + embedded migrations
  codec/json/                low-level json codec (most users want tickr.On[T])
  metrics/prom/              Prometheus implementation of tickr.Metrics
  tracing/otel/              OpenTelemetry implementation of tickr.Tracer
  grafana/                   ready-to-import dashboard JSON
  examples/orders/           HTTP service + outbox + worker demo

Producer (write side)

pool, _ := pgxpool.New(ctx, dsn)
store   := pgstore.New(pool)
client, _ := tickr.NewClient(tickr.ClientConfig{Storage: store})

tx, _ := pool.Begin(ctx)
defer tx.Rollback(ctx)

if _, err := tx.Exec(ctx, `INSERT INTO orders …`); err != nil { return err }

payload, _ := tickr.Encode(order)
_, err := client.Enqueue(ctx, pgstore.WrapTx(tx), tickr.Message{
    Type:           "order.created",
    Payload:        payload,
    IdempotencyKey: order.ID,
})
if err != nil && !tickr.IsDuplicate(err) { return err }

return tx.Commit(ctx)

Consumer (worker side)

tickr.On[T] registers a typed handler — the payload is JSON-decoded into T before your function runs. A malformed payload is dead-lettered immediately (it won't decode on retry either).

reg := tickr.NewRegistry()
_ = tickr.On(reg, "order.created",
    func(ctx context.Context, msg *tickr.InboundMessage, body OrderCreated) error {
        return chargeCustomer(ctx, body)
    },
    tickr.WithMaxAttempts(5),
    tickr.WithAttemptTimeout(10*time.Second),
)

w, _ := tickr.NewWorker(tickr.WorkerConfig{
    Storage:  store,
    Registry: reg,
    Stats:    tickr.StatsPolicy{Interval: 10 * time.Second},
})
_ = w.Start(ctx) // blocks until ctx is cancelled

Drop down to reg.On(eventType, tickr.Handler, …) when you need raw []byte access or a non-JSON codec.

Batch handlers

Process same-type messages in groups when the downstream work batches naturally (bulk inserts, bulk-API calls). tickr.OnBatch[T] registers an all-or-nothing handler: returning nil marks every message in the batch SUCCESS; a non-nil error fails the whole batch with the same error (each message retries on its own attempt count).

_ = tickr.OnBatch(reg, "order.created",
    func(ctx context.Context, batch []tickr.BatchItem[OrderCreated]) error {
        rows := make([]OrderCreated, len(batch))
        for i, it := range batch {
            rows[i] = it.Body
        }
        return db.BulkInsert(ctx, rows)
    },
    tickr.WithMaxBatchSize(100),
    tickr.WithAttemptTimeout(30*time.Second),
)

The worker groups same-type messages from each claim cycle and chunks them by WithMaxBatchSize (zero = the whole group in one call). Use reg.OnBatch(eventType, tickr.BatchHandler, …) for raw []byte access.

Built-in transport handlers

Two subpackages skip the boilerplate when the handler's only job is to forward the message to a downstream service. Each lives in its own Go module so its transport-specific deps stay out of the core go.mod.

HTTP webhook — handlers/http
import httphandler "github.com/ndmt1at21/tickr/handlers/http"

_ = tickr.On(reg, "email.send",
    httphandler.PostJSON[Email](http.DefaultClient, httphandler.Config[Email]{
        URLFunc: func(_ context.Context, _ *tickr.InboundMessage, e Email) string {
            return e.HookURL
        },
    }),
    tickr.WithMaxAttempts(8),
    tickr.WithAttemptTimeout(15*time.Second),
)

Defaults: 2xx → success; 4xx (except 408/425/429) → DeadLetter; 408/425/429 + 5xx → retry (Retry-After honored); transport errors → retry. The message's Headers (W3C traceparent included) and IdempotencyKey are forwarded as HTTP headers automatically. Override classification or header forwarding via Config.

gRPC unary — handlers/grpc
import grpchandler "github.com/ndmt1at21/tickr/handlers/grpc"

client := userpb.NewUserServiceClient(conn)

_ = tickr.On(reg, "user.signup",
    grpchandler.Unary(client.Notify, grpchandler.Config[*userpb.NotifyRequest]{}),
    tickr.WithMaxAttempts(5),
    tickr.WithAttemptTimeout(10*time.Second),
)

Defaults follow gRPC retry semantics: UNAVAILABLE, DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, ABORTED retry; INVALID_ARGUMENT, NOT_FOUND, PERMISSION_DENIED, UNAUTHENTICATED, ALREADY_EXISTS, FAILED_PRECONDITION, OUT_OF_RANGE, UNIMPLEMENTED DeadLetter; INTERNAL / UNKNOWN / DATA_LOSS retry by default (override via Config.Classifier). Trace context and idempotency key are attached as outgoing gRPC metadata.

Migrations

m, _ := tickr.NewMigrator(store)
_ = m.Up(ctx)

The Postgres adapter embeds its SQL via embed.FS; no external migration tool required.

Status machine

CREATED  ──claim──▶  HANDLING ──nil───▶  SUCCESS
                       │
                       ├─ err (attempt<max) ─▶  FAILED ─▶ RETRYING ─▶ HANDLING
                       │
                       ├─ err (attempt==max) ▶  FAILED ─▶ DEAD
                       │
                       ├─ DeadLetter() ──────▶  DEAD
                       │
                       └─ ctx.Canceled (shutdown) ▶ CREATED|RETRYING  (no attempt burn)

DEAD ── admin Requeue ──▶ CREATED                  (manual recovery)

Every transition appends a row to tickr_history.

Failure handling

Scenario Mechanism
Handler returns error (attempt < max) scheduled RETRYING with exponential backoff
Handler returns error (attempt == max) terminates in DEAD
Handler returns tickr.DeadLetter(err) jumps to DEAD without retry
Handler returns tickr.RetryAfter(d, err) overrides RetryPolicy with explicit delay
Handler returns tickr.Skip(reason) SUCCESS without side effects
Handler panics recovered, treated as error, normal retry path
Handler timeout (WithAttemptTimeout) normal retry path
Worker graceful shutdown (SIGTERM) inflight handlers get ctx.Done(), claim released without burning attempt
Worker hard crash (SIGKILL) lease expires → reclaimer transitions back to RETRYING, attempt stays incremented (poison-pill protection)
Lease auto-extension

WithAttemptTimeout can exceed the worker's Lease safely — the engine extends the storage lease every Lease/3 while the handler runs. If the lease is lost to another worker, the in-flight attempt's ctx is cancelled so duplicate work is avoided.

Observability

Prometheus
import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    tprom "github.com/ndmt1at21/tickr/metrics/prom"
)

reg := prometheus.NewRegistry()
m   := tprom.New(reg)

// ... pass m as ClientConfig.Metrics and WorkerConfig.Metrics
http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))

Metrics emitted:

Metric Labels
tickr_messages_enqueued_total event_type
tickr_handler_attempts_total event_type, outcome
tickr_handler_duration_seconds event_type, outcome
tickr_messages_dead_total event_type
tickr_queue_depth event_type, status
tickr_claim_batch_size
tickr_claim_duration_seconds
tickr_leases_reclaimed_total event_type
tickr_inflight_handlers event_type
OpenTelemetry tracing
import totel "github.com/ndmt1at21/tickr/tracing/otel"

tracer := totel.New() // picks up the global TracerProvider + propagator

The tracer injects W3C traceparent and tracestate into Message.Headers at enqueue time, and extracts them at handler dispatch — so a single trace spans producer → outbox → consumer across processes, even if the consumer runs hours later.

Span attributes follow the OpenTelemetry messaging semantic conventions.

Grafana

Import grafana/tickr-dashboard.json. See grafana/README.md for scrape config, alert rules, and Tempo/Jaeger deep-links by messaging.message.id.

Throughput

For head-to-head numbers against River, Gue, Watermill SQL, and Asynq on identical workloads, see BENCHMARKS.md. The bench code lives in benchmarks/ as a separate Go module.

For 1M msg/min (16.7k/sec) sustained, the recommended baseline configuration is:

Knob Value
Fleet × claim goroutines 30 × 4 = 120
BatchSize 200
PollInterval 100 ms with adaptive backoff to 2 s on empty
Lease 30 s with handler auto-extension
PG pool per instance 8 connections (240 total — front with PgBouncer)

The bottleneck is not SKIP LOCKED itself but autovacuum keeping up with dead tuples on the hot table. Tune:

ALTER TABLE tickr_messages SET (
    autovacuum_vacuum_scale_factor = 0.02,
    autovacuum_naptime             = 10
);

Run the example

cd examples/orders
docker compose up --build

This brings up Postgres, Prometheus, Grafana, Tempo, and the orders service. Then:

curl -X POST http://localhost:8080/orders \
  -H 'content-type: application/json' \
  -d '{"order_id":"o-1","customer_id":"c-1","total":42.50}'

Observe:

  • Logs: handler invocation in the orders container
  • Postgres: tickr_messages row transitions, tickr_history rows
  • Grafana (http://localhost:3000): throughput, latency, queue depth
  • Tempo: end-to-end span from HTTP request → enqueue → handler

Limitations

Before adopting, read ARCHITECTURE.md §11. Highlights:

  • Producer and outbox must share a database (the outbox guarantee requires a single transaction).
  • At-least-once delivery only — handlers must be idempotent.
  • Partitioned Postgres scopes idempotency per partition (per month by default).
  • MySQL and CockroachDB adapters do not implement LISTEN/NOTIFY and fall back to pure polling (~100 ms typical wake-up).
  • tickr_history retention is opt-in via RetentionPolicy.History (default off for backwards compatibility).

Section 11.3 lists planned follow-ups; contributions welcome.

Architecture

See ARCHITECTURE.md for the design deep-dive: status machine, claim query, lease auto-extension, partitioning trade-offs, leader-lock strategies per adapter, and the conformance suite.

License

Released under the MIT License.

Documentation

Overview

Package tickr is a reliable async messaging library for microservices implementing the outbox pattern. The storage table is the queue: callers enqueue messages inside their own DB transaction (transactional outbox), and registered handlers run in-process via the worker pool.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DeadLetter

func DeadLetter(err error) error

DeadLetter wraps err and instructs the engine to move the message to DEAD without consuming further retry budget. Use this for permanent failures the handler can detect (malformed payload, business rule violations).

func Encode added in v1.0.0

func Encode(body any) ([]byte, error)

Encode JSON-encodes body for use as [Message.Payload]. The companion to On: producers should encode with this so the symmetry with the consumer side is explicit.

func ExtractRetryAfter

func ExtractRetryAfter(err error) (time.Duration, bool)

ExtractRetryAfter returns the explicit retry delay if err was produced by RetryAfter, else (0, false).

func IsDeadLetter

func IsDeadLetter(err error) bool

IsDeadLetter reports whether err was produced by DeadLetter.

func IsDuplicate

func IsDuplicate(err error) bool

IsDuplicate reports whether err (or anything it wraps) is *ErrDuplicate.

func IsSkip

func IsSkip(err error) bool

IsSkip reports whether err was produced by Skip.

func MustOn added in v1.0.0

func MustOn[T any](r *HandlerRegistry, eventType string, h TypedHandler[T], opts ...HandlerOption)

MustOn is like On but panics on error. Convenient for handler registration during program init.

func MustOnBatch added in v1.0.0

func MustOnBatch[T any](r *HandlerRegistry, eventType string, h TypedBatchHandler[T], opts ...HandlerOption)

MustOnBatch is like OnBatch but panics on error.

func On added in v1.0.0

func On[T any](r *HandlerRegistry, eventType string, h TypedHandler[T], opts ...HandlerOption) error

On registers a typed handler in one line, eliminating the jsoncodec.Wrap boilerplate. The payload is decoded with encoding/json before the handler runs; a decode failure is returned as a permanent dead-letter error (the same message will never decode) so the message goes straight to DEAD without burning the retry budget.

Example:

tickr.On(reg, "order.created",
    func(ctx context.Context, msg *tickr.InboundMessage, body OrderCreated) error {
        return chargeCustomer(ctx, body)
    },
    tickr.WithMaxAttempts(5),
)

Use the lower-level HandlerRegistry.On when you need a custom codec or raw []byte access.

func OnBatch added in v1.0.0

func OnBatch[T any](r *HandlerRegistry, eventType string, h TypedBatchHandler[T], opts ...HandlerOption) error

OnBatch registers a typed batch handler. If any message's payload fails to decode, the whole batch is dead-lettered with the decode error — matching the all-or-nothing contract. Use HandlerRegistry.OnBatch directly if you need a custom codec or to inspect raw payloads.

Example:

tickr.OnBatch(reg, "order.created",
    func(ctx context.Context, batch []tickr.BatchItem[OrderCreated]) error {
        rows := make([]OrderCreated, len(batch))
        for i, it := range batch { rows[i] = it.Body }
        return db.BulkInsert(ctx, rows)
    },
    tickr.WithMaxBatchSize(100),
)

func RetryAfter

func RetryAfter(delay time.Duration, err error) error

RetryAfter wraps err and instructs the engine to delay the next attempt by exactly delay (bypassing the configured RetryPolicy). If attempts are exhausted, the message still moves to DEAD.

func Skip

func Skip(reason string) error

Skip marks the current attempt as SUCCESS without running side effects. reason is recorded in history for auditability.

Types

type Alerter added in v1.0.0

type Alerter interface {
	OnError(ctx context.Context, evt ErrorEvent)
}

Alerter is the optional hook for asynchronous error notifications — e.g. post to Slack, send an email, or page via PagerDuty.

OnError is invoked in a fresh goroutine detached from the caller's context, so the call neither blocks the engine nor is cancelled by shutdown. The supplied ctx carries a 30s timeout. The library recovers from panics inside OnError.

Implementations should still be cheap to invoke (or hand off to their own buffered worker): a sustained error burst means one goroutine per event. Throttle, batch, or coalesce inside OnError if you call out to rate-limited services.

type BatchHandler added in v1.0.0

type BatchHandler func(ctx context.Context, msgs []*InboundMessage) error

BatchHandler processes a slice of InboundMessage of the same event type as a single all-or-nothing attempt: returning nil marks every message in the batch SUCCESS, while a non-nil error fails every message with the same error (each one retries or dead-letters on its own attempt count).

The worker groups same-type messages from one claim cycle and chunks them by WithMaxBatchSize; each chunk runs in a single goroutine, so a BatchHandler must be safe for concurrent use across event types but is invoked serially within one batch. Outcome modifiers (DeadLetter, RetryAfter, Skip) apply uniformly to the whole batch.

type BatchItem added in v1.0.0

type BatchItem[T any] struct {
	Msg  *InboundMessage
	Body T
}

BatchItem pairs a raw InboundMessage with its JSON-decoded body, as passed to a TypedBatchHandler.

type ClaimParams

type ClaimParams struct {
	WorkerID   string
	Batch      int
	Lease      time.Duration
	EventTypes []string
}

ClaimParams controls one poll cycle.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is the producer-side entry point. It writes outbox rows through the configured Storage adapter, optionally participating in the caller's transaction.

func NewClient

func NewClient(cfg ClientConfig) (*Client, error)

NewClient builds a Client. Storage is required.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, tx Tx, msg Message) (MessageID, error)

Enqueue writes a single message. tx may be the zero value (no caller transaction) — in that case the adapter performs a single auto-commit.

Returns *ErrDuplicate (still with a usable existing-ID) when the (Type, IdempotencyKey) pair already exists.

func (*Client) EnqueueBatch

func (c *Client) EnqueueBatch(ctx context.Context, tx Tx, msgs []Message) ([]MessageID, error)

EnqueueBatch writes many messages in one round-trip. The returned slice is parallel to msgs; entries may be nil only when an error occurred for that index.

type ClientConfig

type ClientConfig struct {
	Storage Storage

	// DefaultMaxAttempts is the value used for messages whose
	// Message.MaxAttempts is zero. Defaults to 10.
	DefaultMaxAttempts int

	Logger  Logger
	Metrics Metrics
	Tracer  Tracer
}

ClientConfig configures the producer-side Client.

type EndSpanFunc

type EndSpanFunc func(err error)

EndSpanFunc finishes a span started by a Tracer call. Pass the resulting error (or nil on success) so the span can record exception attributes.

type EnqueueParams

type EnqueueParams struct {
	EventType      string
	Payload        []byte
	Headers        map[string]string
	IdempotencyKey string
	MaxAttempts    int
	ProcessAt      time.Time
}

EnqueueParams is the input to Storage.Enqueue.

type ErrDuplicate

type ErrDuplicate struct {
	ExistingID MessageID
	EventType  string
	Key        string
}

ErrDuplicate is returned by Client.Enqueue when a message with the same (Type, IdempotencyKey) already exists. The caller can treat duplicates as success by checking errors.As.

func (*ErrDuplicate) Error

func (e *ErrDuplicate) Error() string

type ErrorEvent added in v1.0.0

type ErrorEvent struct {
	Kind      ErrorKind
	EventType string
	MessageID MessageID
	Attempt   int
	Err       error
}

ErrorEvent describes a single error worth alerting on. EventType, MessageID, and Attempt are zero/empty for core errors not tied to a specific message.

type ErrorKind added in v1.0.0

type ErrorKind string

ErrorKind classifies where an ErrorEvent originated.

const (
	// ErrorKindHandler — a handler attempt returned a non-nil error and the
	// message is being retried.
	ErrorKindHandler ErrorKind = "handler"
	// ErrorKindDeadLetter — a message moved to DEAD (max attempts exhausted,
	// DeadLetter sentinel, or WithDeadLetterIf predicate matched).
	ErrorKindDeadLetter ErrorKind = "dead_letter"
	// ErrorKindNoHandler — a claimed message has no registered handler and
	// was dead-lettered.
	ErrorKindNoHandler ErrorKind = "no_handler"
	// ErrorKindClaim — storage.Claim failed.
	ErrorKindClaim ErrorKind = "claim"
	// ErrorKindStorage — a Succeed/Fail/Release write failed after a handler
	// attempt completed.
	ErrorKindStorage ErrorKind = "storage"
	// ErrorKindLease — lease extension errored or was lost mid-attempt.
	ErrorKindLease ErrorKind = "lease"
	// ErrorKindJanitor — the retention janitor failed (leader lock or purge).
	ErrorKindJanitor ErrorKind = "janitor"
	// ErrorKindReclaimer — the lease reclaimer failed (leader lock or reclaim).
	ErrorKindReclaimer ErrorKind = "reclaimer"
)

type ExponentialBackoff

type ExponentialBackoff struct {
	// Base is the delay before the second attempt. Default 1s when zero.
	Base time.Duration
	// Max caps the delay. Default 1h when zero.
	Max time.Duration
	// JitterFraction is the random spread applied to the computed delay,
	// clamped to [0, 1]. Default 0.2.
	JitterFraction float64
	// Rand is an optional random source for deterministic tests. Default
	// uses math/rand/v2's package-level functions.
	Rand func() float64
}

ExponentialBackoff is the default RetryPolicy: delay = min(Max, Base * 2^(attempt-1)) with multiplicative full-jitter in the range [1-JitterFraction, 1+JitterFraction].

func (ExponentialBackoff) NextDelay

func (e ExponentialBackoff) NextDelay(attempt int, _ error) time.Duration

NextDelay implements RetryPolicy.

type FailParams

type FailParams struct {
	MessageID   MessageID
	Attempt     int
	WorkerID    string
	Err         string
	NextRetryAt time.Time
	Dead        bool
}

FailParams describes a single failed attempt.

type Handler

type Handler func(ctx context.Context, msg *InboundMessage) error

Handler processes a single InboundMessage attempt. Returning nil marks the attempt SUCCESS. Returning a non-nil error triggers the retry path or DLQ depending on attempt count and the error's nature (see DeadLetter, RetryAfter, Skip).

Handlers must be safe for concurrent use; the worker may dispatch many invocations of the same Handler in parallel up to the configured concurrency cap.

type HandlerOption

type HandlerOption func(*handlerConfig)

HandlerOption tunes the per-event-type handler behaviour.

func WithAttemptTimeout

func WithAttemptTimeout(d time.Duration) HandlerOption

WithAttemptTimeout sets the per-attempt deadline for a handler. Long timeouts are safe — the engine auto-extends the storage lease for the duration of each attempt.

func WithDeadLetterIf

func WithDeadLetterIf(pred func(error) bool) HandlerOption

WithDeadLetterIf sends the message straight to DEAD whenever the predicate returns true for the handler's returned error. Useful for permanent failures (malformed payloads, business-rule violations) that shouldn't burn the retry budget.

func WithMaxAttempts

func WithMaxAttempts(n int) HandlerOption

WithMaxAttempts overrides the global default for one event type.

func WithMaxBatchSize added in v1.0.0

func WithMaxBatchSize(n int) HandlerOption

WithMaxBatchSize caps the number of messages passed to a BatchHandler in a single invocation. Zero (the default) means "use the whole group claimed for this event type in one poll cycle" — typically up to [WorkerConfig.BatchSize]. Has no effect on single-message handlers.

func WithMaxInflight

func WithMaxInflight(n int) HandlerOption

WithMaxInflight caps the number of concurrently-running attempts of one event type within a single worker process. Fleet-wide limit ≈ this value times the number of worker instances.

func WithRetryPolicy

func WithRetryPolicy(p RetryPolicy) HandlerOption

WithRetryPolicy sets a custom RetryPolicy for one event type.

type HandlerRegistry

type HandlerRegistry struct {
	// contains filtered or unexported fields
}

HandlerRegistry maps event types to Handlers and their options. It is safe for concurrent use; new handlers may be registered before the Worker starts, but mutation after Start is not supported.

func NewRegistry

func NewRegistry() *HandlerRegistry

NewRegistry creates an empty HandlerRegistry.

func (*HandlerRegistry) EventTypes

func (r *HandlerRegistry) EventTypes() []string

EventTypes returns the registered event-type names in unspecified order. Used by the worker to scope the claim query.

func (*HandlerRegistry) On

func (r *HandlerRegistry) On(eventType string, h Handler, opts ...HandlerOption) error

On registers a handler for one event type. Registering the same type twice is an error.

func (*HandlerRegistry) OnBatch added in v1.0.0

func (r *HandlerRegistry) OnBatch(eventType string, h BatchHandler, opts ...HandlerOption) error

OnBatch registers a BatchHandler for one event type. The worker groups same-type messages from each claim cycle and invokes the handler once per chunk (see WithMaxBatchSize). Registering the same event type twice — as single or batch — is an error.

type HistoryPurger added in v1.0.1

type HistoryPurger interface {
	PurgeHistory(ctx context.Context, before time.Time, limit int) (int64, error)
}

HistoryPurger is an optional Storage extension that deletes old tickr_history rows. Adapters that implement it are picked up by the Worker's janitor when RetentionPolicy.History > 0. Adapters that do not implement it silently skip history purge — history grows unbounded until an operator runs DELETE manually.

type InboundMessage

type InboundMessage struct {
	ID             MessageID
	Type           string
	Payload        []byte
	Headers        map[string]string
	IdempotencyKey string

	// Attempt is 1-indexed; it is incremented at claim time before dispatch.
	Attempt     int
	MaxAttempts int

	EnqueuedAt  time.Time
	ScheduledAt time.Time

	// LastError is the error string from the previous attempt, or "" on the
	// first attempt.
	LastError string

	// Status is the row's lifecycle status at the time it was read. On the
	// [Handler] hot path it is always [StatusHandling] (set by the claim)
	// and handlers can safely ignore it.
	Status Status
}

InboundMessage is the read-side view of a message handed to a Handler.

type Logger

type Logger interface {
	Debug(ctx context.Context, msg string, kv ...any)
	Info(ctx context.Context, msg string, kv ...any)
	Warn(ctx context.Context, msg string, kv ...any)
	Error(ctx context.Context, msg string, err error, kv ...any)
}

Logger is the optional structured-logging hook. kv pairs follow slog conventions: alternating keys (strings) and values.

type Message

type Message struct {
	// Type is the routing key matched against the HandlerRegistry.
	Type string

	// Payload is the opaque message body. Encoding is the caller's choice.
	Payload []byte

	// Headers carries optional metadata. Trace propagation headers
	// (W3C traceparent / tracestate) are injected here automatically
	// when a Tracer is configured on the Client.
	Headers map[string]string

	// IdempotencyKey, when set, makes the (Type, IdempotencyKey) pair
	// unique across the outbox. A second Enqueue with the same key returns
	// *ErrDuplicate carrying the existing MessageID.
	IdempotencyKey string

	// ScheduledAt makes the message eligible only after this instant.
	// Zero value means eligible immediately.
	ScheduledAt time.Time

	// MaxAttempts overrides the handler default for this single message.
	// Zero means "inherit the handler's configured MaxAttempts".
	MaxAttempts int
}

Message is what the caller supplies at enqueue time.

type MessageID

type MessageID string

MessageID identifies a tickr message. Storage adapters use UUIDv7 strings so that IDs are lexicographically time-ordered and shard-portable.

type Metrics

type Metrics interface {
	MessageEnqueued(eventType string)
	HandlerStarted(eventType string, attempt int)
	HandlerCompleted(eventType string, attempt int, duration time.Duration, outcome Outcome)
	MessageDead(eventType string)
	QueueDepth(eventType string, status Status, depth int)
	ClaimBatch(claimed, requested int, duration time.Duration)
	LeaseReclaimed(eventType string, count int)
}

Metrics is the optional metrics hook. Implementations must be safe for concurrent use. The Prometheus implementation lives in tickr/metrics/prom.

type Migrator

type Migrator struct {
	// contains filtered or unexported fields
}

Migrator is the schema-management façade. It delegates to the configured Storage adapter, which embeds its own migrations.

func NewMigrator

func NewMigrator(s Storage) (*Migrator, error)

NewMigrator constructs a Migrator that drives the given Storage.

func (*Migrator) Up

func (m *Migrator) Up(ctx context.Context) error

Up applies all pending migrations.

type Notifier added in v1.0.0

type Notifier interface {
	Notify(ctx context.Context, tx Tx, eventType string) error
	Listen(ctx context.Context) (<-chan struct{}, func() error, error)
}

Notifier is an optional capability for Storage adapters. When the configured Storage also implements Notifier, the Client publishes an enqueue notification on every successful insert, and the Worker subscribes so it can wake up from its poll backoff immediately.

Adapters that do not implement Notifier degrade gracefully to pure polling — there is no behavioural change beyond latency.

Semantics:

  • Notify must be called inside the caller's transaction when present, so that the notification is only observable to subscribers if (and when) that transaction commits. This preserves the transactional outbox guarantee.
  • Listen returns a channel that fires (coalesced, non-blocking) every time at least one new message has been enqueued since the last read. Receivers must treat the signal as a hint, not as a count.
  • The cleanup function returned by Listen releases the underlying connection/subscription and must be safe to call multiple times.

type Outcome

type Outcome string

Outcome categorises the result of a single handler attempt for metrics.

const (
	OutcomeSuccess  Outcome = "success"
	OutcomeRetry    Outcome = "retry"
	OutcomeDead     Outcome = "dead"
	OutcomeCanceled Outcome = "canceled"
)

Handler-attempt outcomes used by the Metrics hook.

type RetentionPolicy

type RetentionPolicy struct {
	Success time.Duration // 0 => 24h
	Dead    time.Duration // 0 => 30d; set negative for "never purge DEAD"
	// History is the maximum age of tickr_history rows. 0 => default
	// (max of Success, Dead, 30d) — once the janitor is running there
	// is no reason to let history grow unbounded. Set negative to opt
	// out explicitly. Requires the storage adapter to implement
	// tickr.HistoryPurger; adapters that do not implement it silently
	// skip this phase.
	History    time.Duration
	PurgeBatch int           // 0 => 5000
	PurgeEvery time.Duration // 0 => 1m
}

RetentionPolicy controls how long terminal-state rows are kept.

type RetryPolicy

type RetryPolicy interface {
	// NextDelay returns the delay before the (attempt+1)-th attempt. attempt
	// is the 1-indexed number of attempts already made. err is the failure
	// from the most recent attempt and may be nil.
	NextDelay(attempt int, err error) time.Duration
}

RetryPolicy computes the delay before the next attempt after a failure. Implementations must be safe for concurrent use.

func DefaultRetryPolicy

func DefaultRetryPolicy() RetryPolicy

DefaultRetryPolicy returns the default ExponentialBackoff used when a handler is registered without WithRetryPolicy.

type Stats

type Stats struct {
	ByStatus    map[Status]int
	ByEventType map[string]map[Status]int
	SampledAt   time.Time
}

Stats is a point-in-time snapshot of queue depth.

type StatsPolicy

type StatsPolicy struct {
	Interval time.Duration // 0 disables sampling
}

StatsPolicy controls the QueueDepth sampler used by the metrics hook.

type Status

type Status string

Status is the lifecycle state of a message.

const (
	StatusCreated  Status = "CREATED"
	StatusHandling Status = "HANDLING"
	StatusSuccess  Status = "SUCCESS"
	StatusFailed   Status = "FAILED"
	StatusRetrying Status = "RETRYING"
	StatusDead     Status = "DEAD"
)

Lifecycle states. SUCCESS and DEAD are terminal; the others are transient.

func (Status) Terminal

func (s Status) Terminal() bool

Terminal reports whether a status is terminal (no further transitions except via admin Requeue).

type Storage

type Storage interface {
	Enqueue(ctx context.Context, tx Tx, p EnqueueParams) (*InboundMessage, error)
	EnqueueBatch(ctx context.Context, tx Tx, ps []EnqueueParams) ([]*InboundMessage, error)

	Claim(ctx context.Context, p ClaimParams) ([]*InboundMessage, error)
	Succeed(ctx context.Context, id MessageID, attempt int, workerID string) error
	Fail(ctx context.Context, p FailParams) error
	ReleaseShutdown(ctx context.Context, id MessageID, attempt int, workerID, reason string) error
	Extend(ctx context.Context, id MessageID, workerID string, until time.Time) (bool, error)

	History(ctx context.Context, id MessageID) ([]Transition, error)
	ListDead(ctx context.Context, eventType string, after time.Time, limit int) ([]*InboundMessage, error)
	Requeue(ctx context.Context, id MessageID, processAt time.Time) error

	ReclaimExpired(ctx context.Context, limit int) (int64, error)
	PurgeTerminal(ctx context.Context, before time.Time, limit int) (int64, error)

	Stats(ctx context.Context) (Stats, error)

	ApplyMigrations(ctx context.Context) error
	TryLeaderLock(ctx context.Context, key string) (bool, func(), error)
}

Storage is the backend-agnostic outbox persistence interface. All methods must be safe for concurrent use across goroutines and processes.

type Tracer

type Tracer interface {
	// StartEnqueueSpan wraps a producer-side Enqueue and lets the impl
	// inject trace propagation into the returned context. The Client also
	// reads back the propagation headers via InjectHeaders so they can be
	// written to the outbox row.
	StartEnqueueSpan(ctx context.Context, eventType string) (context.Context, EndSpanFunc)

	// StartClaimSpan wraps a worker-side claim batch.
	StartClaimSpan(ctx context.Context) (context.Context, EndSpanFunc)

	// StartHandlerSpan wraps a single handler attempt. Implementations
	// extract the parent span from msg.Headers (W3C traceparent) so a
	// single trace links producer → outbox → consumer across processes.
	StartHandlerSpan(ctx context.Context, msg *InboundMessage) (context.Context, EndSpanFunc)

	// InjectHeaders writes the current trace context from ctx into the
	// given headers map (creating it if nil). The Client calls this before
	// writing the outbox row so downstream handlers can re-attach.
	InjectHeaders(ctx context.Context, headers map[string]string) map[string]string
}

Tracer is the optional tracing hook. The OpenTelemetry implementation lives in tickr/tracing/otel and handles W3C traceparent propagation through Message.Headers automatically.

type Transition

type Transition struct {
	MessageID MessageID
	Seq       int
	From      Status
	To        Status
	Attempt   int
	Error     string
	WorkerID  string
	At        time.Time
}

Transition is one row in the append-only message history.

type Tx

type Tx struct {
	// contains filtered or unexported fields
}

Tx is the caller's database transaction handed to Storage.Enqueue. It is produced by the concrete adapter's WrapTx helper (e.g. postgres.WrapTx(pgx.Tx)). A zero Tx means "no caller transaction" — the adapter opens its own short-lived auto-commit for the insert.

func MakeTx

func MakeTx(inner any) Tx

MakeTx wraps the adapter's native transaction handle. Storage adapter implementations call this from their WrapTx helper.

func (Tx) Inner

func (t Tx) Inner() any

Inner returns the adapter-specific transaction handle for the adapter to type-assert.

func (Tx) IsZero

func (t Tx) IsZero() bool

IsZero reports whether Tx is the zero value (no caller transaction).

type TypedBatchHandler added in v1.0.0

type TypedBatchHandler[T any] func(ctx context.Context, batch []BatchItem[T]) error

TypedBatchHandler is the typed equivalent of BatchHandler — each item has its payload already JSON-decoded into T. Returning nil marks the whole batch SUCCESS; a non-nil error fails every item (each retries on its own attempt count).

type TypedHandler added in v1.0.0

type TypedHandler[T any] func(ctx context.Context, msg *InboundMessage, body T) error

TypedHandler is a handler whose payload has already been decoded to T. It's the ergonomic equivalent of Handler when payloads are JSON-encoded Go structs, which is the common case.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker runs the claim loop, dispatches handlers, and (when leader) runs the reclaimer and janitor background loops.

func NewWorker

func NewWorker(cfg WorkerConfig) (*Worker, error)

NewWorker constructs a Worker. Storage and Registry are required.

func (*Worker) Start

func (w *Worker) Start(ctx context.Context) error

Start blocks until ctx is cancelled (then drains in-flight work bounded by ShutdownGrace) or a fatal error occurs.

func (*Worker) Stop

func (w *Worker) Stop(_ context.Context) error

Stop cancels the worker's context. The blocking drain happens inside Start. Idempotent.

type WorkerConfig

type WorkerConfig struct {
	Storage  Storage
	Registry *HandlerRegistry

	WorkerID string // defaults to hostname-pid

	PollInterval   time.Duration // 0 => 100ms
	PollMaxBackoff time.Duration // 0 => 2s
	BatchSize      int           // 0 => 100
	PoolSize       int           // 0 => 32
	Lease          time.Duration // 0 => 30s
	ShutdownGrace  time.Duration // 0 => 30s

	Retention RetentionPolicy
	Stats     StatsPolicy

	Logger  Logger
	Metrics Metrics
	Tracer  Tracer
	Alerter Alerter

	ReclaimInterval time.Duration // 0 => 5s

	DisableReclaimer bool
	DisableJanitor   bool
	DisableNotifier  bool
}

WorkerConfig configures a Worker.

Directories

Path Synopsis
cmd
tickrctl command
tickrctl is the admin CLI for tickr.
tickrctl is the admin CLI for tickr.
codec
json
Package json provides typed-handler wrappers that automatically encode and decode message payloads using encoding/json.
Package json provides typed-handler wrappers that automatically encode and decode message payloads using encoding/json.
proto
Package proto provides typed-handler wrappers that encode and decode message payloads using google.golang.org/protobuf.
Package proto provides typed-handler wrappers that encode and decode message payloads using google.golang.org/protobuf.
examples
email-fanout command
email-fanout demonstrates outbox-driven webhook delivery: a producer inserts notify rows inside its own business tx, and a worker dispatches them to a downstream HTTP endpoint with exponential backoff, treating 4xx as permanent (dead-letter) and 5xx/network errors as retryable.
email-fanout demonstrates outbox-driven webhook delivery: a producer inserts notify rows inside its own business tx, and a worker dispatches them to a downstream HTTP endpoint with exponential backoff, treating 4xx as permanent (dead-letter) and 5xx/network errors as retryable.
orders command
orders is an end-to-end demo of the tickr library: an HTTP service accepts order POSTs, writes them transactionally to an orders table together with an outbox row via tickr.Client.Enqueue, and a worker pool in the same binary picks up those outbox rows and runs the "order.created" handler.
orders is an end-to-end demo of the tickr library: an HTTP service accepts order POSTs, writes them transactionally to an orders table together with an outbox row via tickr.Client.Enqueue, and a worker pool in the same binary picks up those outbox rows and runs the "order.created" handler.
saga command
saga shows a three-step orchestration on top of tickr: order.created triggers a payment.charge, whose success triggers a shipment.create, and whose failure triggers an order.refund.
saga shows a three-step orchestration on top of tickr: order.created triggers a payment.charge, whose success triggers a shipment.create, and whose failure triggers an order.refund.
metrics
otel
Package otel provides an OpenTelemetry implementation of tickr.Metrics.
Package otel provides an OpenTelemetry implementation of tickr.Metrics.
prom
Package prom provides a Prometheus implementation of tickr.Metrics.
Package prom provides a Prometheus implementation of tickr.Metrics.
storage
cockroach
Package cockroach is the CockroachDB adapter for tickr.
Package cockroach is the CockroachDB adapter for tickr.
internal/storagetest
Package storagetest provides a portable conformance suite that exercises every method of the tickr.Storage interface.
Package storagetest provides a portable conformance suite that exercises every method of the tickr.Storage interface.
mysql
Package mysql is the MySQL 8.0+ adapter for tickr.
Package mysql is the MySQL 8.0+ adapter for tickr.
postgres
Package postgres is the PostgreSQL adapter for tickr.
Package postgres is the PostgreSQL adapter for tickr.
tracing
otel
Package otel provides an OpenTelemetry implementation of tickr.Tracer.
Package otel provides an OpenTelemetry implementation of tickr.Tracer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL