natasks

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2026 License: MIT Imports: 12 Imported by: 0

README

NATaskS

Logo

Test Lint Go Version Go Reference Go Report Card

NATaskS is a small Go task queue library built on top of NATS JetStream.

It focuses on two things:

  • dispatching tasks
  • processing tasks with workers

Features

  • task dispatch on top of NATS JetStream
  • immediate, delayed, and scheduled task delivery
  • publish deduplication via Nats-Msg-Id
  • worker-based task processing with configurable concurrencyr
  • automatic stream and consumer provisioning
  • retries with backoff and dead-letter queues
  • graceful worker shutdown
  • lease renewal for long-running handlers via InProgress
  • dispatch and processing middleware
  • OpenTelemetry context propagation
  • Prometheus metrics middleware

Installation

go get github.com/hexpande/natasks

Requirements

  • Go 1.24.0+
  • NATS with JetStream enabled

Quick Start

package main

import (
	"context"
	"encoding/json"
	"log"
	"time"

	"github.com/hexpande/natasks"
	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
)

type SendEmailPayload struct {
	UserID int    `json:"user_id"`
	Email  string `json:"email"`
}

func main() {
	nc, err := nats.Connect(
		"nats://127.0.0.1:4222",
		nats.Name("natasks-example"),
		nats.RetryOnFailedConnect(true),
		nats.MaxReconnects(-1),
		nats.ReconnectWait(2*time.Second),
		nats.ReconnectJitter(250*time.Millisecond, 2*time.Second),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	js, err := jetstream.New(nc)
	if err != nil {
		log.Fatal(err)
	}

	client, err := natasks.NewClient(js)
	if err != nil {
		log.Fatal(err)
	}

	worker, err := natasks.NewWorker(
		js,
		"emails",
		natasks.WithConcurrency(8),
		natasks.WithMaxRetries(3),
		natasks.WithRetryBackoff(500*time.Millisecond, time.Second, 2*time.Second),
	)
	if err != nil {
		log.Fatal(err)
	}

	worker.Handle("emails.send", func(ctx context.Context, task *natasks.Task) error {
		var payload SendEmailPayload
		if err := task.Unmarshal(&payload); err != nil {
			return natasks.NoRetry(err)
		}

		log.Printf("send email to %s for user %d", payload.Email, payload.UserID)
		return nil
	})

	payload := SendEmailPayload{
		UserID: 42,
		Email:  "user@example.com",
	}

	body, err := json.Marshal(payload)
	if err != nil {
		log.Fatal(err)
	}

	task, err := natasks.NewTask("emails.send", body)
	if err != nil {
		log.Fatal(err)
	}

	if err := client.Dispatch(context.Background(), task, "emails"); err != nil {
		log.Fatal(err)
	}

	if err := worker.Run(context.Background()); err != nil {
		log.Fatal(err)
	}
}

For production workers and dispatchers, prefer explicit reconnect settings and callbacks instead of bare nats.Connect(url):

  • RetryOnFailedConnect(true) so process startup can wait for NATS instead of failing immediately.
  • MaxReconnects(-1) so a long-lived worker keeps trying until your process decides to exit.
  • ReconnectWait(...) and ReconnectJitter(...) to avoid reconnect storms.
  • DisconnectErrHandler, ReconnectHandler, ClosedHandler, and ErrorHandler for observability.

natasks uses the connection you provide. It does not create or own nats.Conn, so reconnect policy should be defined at the nats.Connect(...) layer.

Examples

Runnable examples live in examples/:

See examples/README.md for a short index and usage notes.

API

Main constructors:

  • NewClient(js jetstream.JetStream, opts ...Option)
  • NewWorker(js jetstream.JetStream, queue string, opts ...WorkerOption)

Main methods:

  • client.Dispatch(ctx, task, queue)
  • client.DispatchIn(ctx, task, queue, delay)
  • client.DispatchAt(ctx, task, queue, at)
  • client.IsReady()
  • worker.Handle(name, handler)
  • worker.Run(ctx)
  • worker.IsReady()
  • task.WithMessageID(id)

Connection Loss Behavior

worker.Run(ctx) treats temporary NATS disconnects as recoverable runtime events.

  • While the underlying connection is reconnecting, the worker pauses fetches.
  • After reconnect, it ensures the stream and consumer still exist and then resumes processing.
  • Run returns only when ctx is canceled, the NATS connection is permanently closed, or a non-recoverable fetch error occurs.

Use worker.IsReady() or client.IsReady() when you need a simple readiness check.

Retry and DLQ

If a handler returns an error, the worker can retry the task and eventually move it to a dead-letter queue.

If a handler should fail without retries and without DLQ publication, return natasks.NoRetry(err).

worker.Handle("emails.send", func(ctx context.Context, task *natasks.Task) error {
	if err := validate(task); err != nil {
		return natasks.NoRetry(err)
	}

	return sendEmail(ctx, task)
})

Worker options:

  • WithConcurrency(n)
  • WithMaxRetries(n)
  • WithRetryBackoff(delays...)
  • WithDLQSuffix(suffix)

Defaults:

  • concurrency: 1
  • max retries: -1 (unlimited)
  • retry backoff: none
  • dlq suffix: -dlq

DLQ messages keep the original payload and include these headers:

  • Natasks-Original-Queue
  • Natasks-Attempts
  • Natasks-Last-Error

Middleware

Core middleware types:

  • DispatchMiddleware
  • ProcessMiddleware

Task headers are available directly in middleware through Task:

natasks.WithDispatchMiddleware(func(next natasks.DispatchFunc) natasks.DispatchFunc {
	return func(ctx context.Context, task *natasks.Task, queue string) error {
		task.SetHeader("X-Request-ID", "req-42")
		return next(ctx, task, queue)
	}
})

natasks.WithProcessMiddleware(func(next natasks.Handler) natasks.Handler {
	return func(ctx context.Context, task *natasks.Task) error {
		requestID := task.Header("X-Request-ID")
		_ = requestID
		return next(ctx, task)
	}
})

Use WithPropagator(...) when you want to map values between context.Context and headers automatically.

Observability packages:

  • github.com/hexpande/natasks/middleware/otel
  • github.com/hexpande/natasks/middleware/prometheus

OpenTelemetry requires two pieces:

  • otel.Middleware for dispatch and process spans
  • natasks.WithPropagator(otelMiddleware) for trace-context propagation through message headers

Example:

otelMiddleware := otel.New(otel.Options{})

client, err := natasks.NewClient(
	js,
	natasks.WithPropagator(otelMiddleware),
	natasks.WithDispatchMiddleware(otelMiddleware.DispatchMiddleware()),
)
if err != nil {
	log.Fatal(err)
}

worker, err := natasks.NewWorker(
	js,
	"emails",
	natasks.WithPropagator(otelMiddleware),
	natasks.WithProcessMiddleware(otelMiddleware.ProcessMiddleware("emails")),
)
if err != nil {
	log.Fatal(err)
}

Configuration

Shared options:

  • WithStreamName(name)
  • WithSubjectPrefix(prefix)
  • WithDispatchMiddleware(middleware...)
  • WithPropagator(propagator)

Worker options:

  • WithConsumerPrefix(prefix)
  • WithDurable(name)
  • WithConcurrency(n)
  • WithFetchBatch(size)
  • WithFetchTimeout(timeout)
  • WithIdleWait(wait)
  • WithTaskTimeout(timeout)
  • WithAckWait(wait)
  • WithProgressInterval(interval)
  • WithMaxAckPending(n)
  • WithMaxRetries(n)
  • WithRetryBackoff(delays...)
  • WithDLQSuffix(suffix)
  • WithProcessMiddleware(middleware...)

Testing

go test ./...
make docker-test

Integration tests use a real NATS JetStream instance via Docker Compose.

Benchmarking

Run in-process microbenchmarks:

go test -run '^$' -bench . -benchmem ./...

For more stable numbers, prefer a longer bench time and multiple runs:

go test -run '^$' -bench . -benchmem -benchtime=2s -count=5 ./...

Live NATS integration/perf benchmarks are available for real JetStream dispatch and end-to-end worker processing. They require a reachable NATS server and NATASKS_NATS_URL:

NATASKS_NATS_URL=nats://127.0.0.1:4222 go test -run '^$' -bench 'Integration' -benchmem ./...

For more reliable integration numbers, vary CPU and run multiple samples:

NATASKS_NATS_URL=nats://127.0.0.1:4222 go test -run '^$' -bench 'Integration' -benchmem -benchtime=2s -count=5 -cpu=1,8 ./...

Example live integration results on Apple M2 with local NATS:

Benchmark ns/op B/op allocs/op
IntegrationDispatch-8 48849 2217 30
IntegrationDispatchParallel-8 12352 2249 30
IntegrationEndToEnd/serial-8 136070 7107 94
IntegrationEndToEnd/parallel_8-8 101821 4982 60

These numbers are environment-specific, but they show the expected shape: parallel dispatch improves throughput, and a worker with WithConcurrency(8) outperforms serial end-to-end processing.

To compare changes between revisions, save results and use benchstat:

go test -run '^$' -bench . -benchmem -benchtime=3s -count=10 ./... > before.txt
go test -run '^$' -bench . -benchmem -benchtime=3s -count=10 ./... > after.txt
benchstat before.txt after.txt

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptyTaskName = errors.New("natasks: task name is required")
View Source
var ErrHandlerNotFound = errors.New("natasks: handler not found")
View Source
var ErrNoRetry = errors.New("natasks: no retry")

ErrNoRetry marks a handler error as non-retriable.

Functions

func NoRetry

func NoRetry(err error) error

NoRetry wraps err so the worker acknowledges the message without retries or DLQ.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client dispatches tasks into JetStream.

func NewClient

func NewClient(js jetstream.JetStream, opts ...Option) (*Client, error)

NewClient constructs a dispatch client and ensures the stream exists.

func (*Client) Dispatch

func (c *Client) Dispatch(ctx context.Context, task *Task, queue string) error

Dispatch publishes a task to the queue.

func (*Client) DispatchAt

func (c *Client) DispatchAt(ctx context.Context, task *Task, queue string, at time.Time) error

DispatchAt publishes a task that should become visible at the given time.

func (*Client) DispatchIn

func (c *Client) DispatchIn(ctx context.Context, task *Task, queue string, delay time.Duration) error

DispatchIn publishes a task that should become visible after the given delay.

func (*Client) IsReady

func (c *Client) IsReady() bool

IsReady reports whether the underlying NATS connection is ready to accept new work. Ready means the connection is currently in CONNECTED state.

func (*Client) WithLogger

func (c *Client) WithLogger(logger *slog.Logger) *Client

WithLogger replaces the client logger.

type DispatchFunc

type DispatchFunc func(context.Context, *Task, string) error

DispatchFunc publishes a task to a queue.

type DispatchMiddleware

type DispatchMiddleware func(DispatchFunc) DispatchFunc

DispatchMiddleware wraps task publishing.

type Handler

type Handler func(context.Context, *Task) error

Handler processes an incoming task.

type MessagePropagator

type MessagePropagator interface {
	Inject(context.Context, TextMapCarrier)
	Extract(context.Context, TextMapCarrier) context.Context
}

MessagePropagator injects and extracts context values into message headers.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option configures a client or worker.

func WithDispatchMiddleware

func WithDispatchMiddleware(middlewares ...DispatchMiddleware) Option

WithDispatchMiddleware registers middleware for task publication.

type ProcessMiddleware

type ProcessMiddleware func(Handler) Handler

ProcessMiddleware wraps task processing.

type SharedOption

type SharedOption interface {
	Option
	WorkerOption
}

SharedOption can be passed to both client and worker constructors.

func WithPropagator

func WithPropagator(propagator MessagePropagator) SharedOption

WithPropagator configures message context propagation for both client and worker.

func WithStreamName

func WithStreamName(name string) SharedOption

WithStreamName overrides the JetStream stream name.

func WithSubjectPrefix

func WithSubjectPrefix(prefix string) SharedOption

WithSubjectPrefix overrides the publish subject prefix.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task is a serializable unit of work. It intentionally mirrors the simple "type + payload" model used by queue systems such as Laravel queue jobs and asynq tasks.

func NewTask

func NewTask(name string, payload []byte) (*Task, error)

NewTask constructs a task with a raw payload.

func (*Task) AddHeader

func (t *Task) AddHeader(key, value string) *Task

AddHeader appends a task header value.

func (*Task) Header

func (t *Task) Header(key string) string

Header returns the first value for the given task header key.

func (*Task) Headers

func (t *Task) Headers() nats.Header

Headers returns a copy of task headers.

func (*Task) MessageID

func (t *Task) MessageID() string

MessageID returns the configured JetStream message ID.

func (*Task) Name

func (t *Task) Name() string

Name returns the task name.

func (*Task) Payload

func (t *Task) Payload() []byte

Payload returns a copy of the raw payload.

func (*Task) SetHeader

func (t *Task) SetHeader(key, value string) *Task

SetHeader sets a task header value.

func (*Task) Unmarshal

func (t *Task) Unmarshal(dst any) error

Unmarshal decodes the raw payload into dst.

func (*Task) WithMessageID

func (t *Task) WithMessageID(id string) *Task

WithMessageID sets the JetStream message ID used for publish deduplication.

type TextMapCarrier

type TextMapCarrier interface {
	Get(string) string
	Set(string, string)
	Keys() []string
}

TextMapCarrier is a minimal carrier interface for context propagation.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker consumes tasks from a queue and dispatches them to registered handlers.

func NewWorker

func NewWorker(js jetstream.JetStream, queue string, opts ...WorkerOption) (*Worker, error)

NewWorker constructs a worker for a single queue and ensures the required stream and consumer exist.

func (*Worker) Handle

func (w *Worker) Handle(name string, handler Handler)

Handle registers a handler for a task name.

func (*Worker) IsReady

func (w *Worker) IsReady() bool

IsReady reports whether the underlying NATS connection is ready to accept new work. Ready means the connection is currently in CONNECTED state.

func (*Worker) Run

func (w *Worker) Run(ctx context.Context) error

Run starts the fetch loop and blocks until ctx is canceled or the underlying NATS connection is permanently closed. Temporary disconnects are treated as recoverable: the worker waits for the connection to return, ensures the stream and consumer exist again, and then resumes fetching.

func (*Worker) WithLogger

func (w *Worker) WithLogger(logger *slog.Logger) *Worker

WithLogger replaces the worker logger.

type WorkerOption

type WorkerOption interface {
	// contains filtered or unexported methods
}

WorkerOption configures a worker.

func WithAckWait

func WithAckWait(wait time.Duration) WorkerOption

WithAckWait overrides the consumer AckWait setting.

func WithConcurrency

func WithConcurrency(n int) WorkerOption

WithConcurrency overrides the number of tasks processed in parallel by the worker.

func WithConsumerPrefix

func WithConsumerPrefix(prefix string) WorkerOption

WithConsumerPrefix overrides the consumer name prefix.

func WithDLQSuffix

func WithDLQSuffix(suffix string) WorkerOption

WithDLQSuffix overrides the suffix used for dead-letter queues.

func WithDurable

func WithDurable(name string) WorkerOption

WithDurable overrides the durable consumer name.

func WithFetchBatch

func WithFetchBatch(size int) WorkerOption

WithFetchBatch overrides the worker fetch batch size.

func WithFetchTimeout

func WithFetchTimeout(timeout time.Duration) WorkerOption

WithFetchTimeout overrides the worker fetch timeout.

func WithIdleWait

func WithIdleWait(wait time.Duration) WorkerOption

WithIdleWait overrides the delay used after an empty poll.

func WithMaxAckPending

func WithMaxAckPending(n int) WorkerOption

WithMaxAckPending overrides the consumer MaxAckPending setting.

func WithMaxRetries

func WithMaxRetries(n int) WorkerOption

WithMaxRetries overrides the maximum number of retries after the first failed processing attempt. -1 means unlimited retries.

func WithProcessMiddleware

func WithProcessMiddleware(middlewares ...ProcessMiddleware) WorkerOption

WithProcessMiddleware registers middleware for task processing.

func WithProgressInterval

func WithProgressInterval(interval time.Duration) WorkerOption

WithProgressInterval overrides how often the worker sends InProgress while a handler is still running.

func WithRetryBackoff

func WithRetryBackoff(delays ...time.Duration) WorkerOption

WithRetryBackoff configures retry delays. When the number of retries exceeds the provided delays, the last delay is reused.

func WithTaskTimeout

func WithTaskTimeout(timeout time.Duration) WorkerOption

WithTaskTimeout sets the maximum time allowed for a single handler execution. A zero value disables the timeout.

Directories

Path Synopsis
examples
basic command
delayed command
otel command
prometheus command
retries-dlq command
middleware

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL