Documentation
¶
Index ¶
- Variables
- func NoRetry(err error) error
- type Client
- func (c *Client) Dispatch(ctx context.Context, task *Task, queue string) error
- func (c *Client) DispatchAt(ctx context.Context, task *Task, queue string, at time.Time) error
- func (c *Client) DispatchIn(ctx context.Context, task *Task, queue string, delay time.Duration) error
- func (c *Client) IsReady() bool
- func (c *Client) WithLogger(logger *slog.Logger) *Client
- type DispatchFunc
- type DispatchMiddleware
- type Handler
- type MessagePropagator
- type Option
- type ProcessMiddleware
- type SharedOption
- type Task
- func (t *Task) AddHeader(key, value string) *Task
- func (t *Task) Header(key string) string
- func (t *Task) Headers() nats.Header
- func (t *Task) MessageID() string
- func (t *Task) Name() string
- func (t *Task) Payload() []byte
- func (t *Task) SetHeader(key, value string) *Task
- func (t *Task) Unmarshal(dst any) error
- func (t *Task) WithMessageID(id string) *Task
- type TextMapCarrier
- type Worker
- type WorkerOption
- func WithAckWait(wait time.Duration) WorkerOption
- func WithConcurrency(n int) WorkerOption
- func WithConsumerPrefix(prefix string) WorkerOption
- func WithDLQSuffix(suffix string) WorkerOption
- func WithDurable(name string) WorkerOption
- func WithFetchBatch(size int) WorkerOption
- func WithFetchTimeout(timeout time.Duration) WorkerOption
- func WithIdleWait(wait time.Duration) WorkerOption
- func WithMaxAckPending(n int) WorkerOption
- func WithMaxRetries(n int) WorkerOption
- func WithProcessMiddleware(middlewares ...ProcessMiddleware) WorkerOption
- func WithProgressInterval(interval time.Duration) WorkerOption
- func WithRetryBackoff(delays ...time.Duration) WorkerOption
- func WithTaskTimeout(timeout time.Duration) WorkerOption
Constants ¶
This section is empty.
Variables ¶
var ErrEmptyTaskName = errors.New("natasks: task name is required")
var ErrHandlerNotFound = errors.New("natasks: handler not found")
var ErrNoRetry = errors.New("natasks: no retry")
ErrNoRetry marks a handler error as non-retriable.
Functions ¶
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client dispatches tasks into JetStream.
func (*Client) DispatchAt ¶
DispatchAt publishes a task that should become visible at the given time.
func (*Client) DispatchIn ¶
func (c *Client) DispatchIn(ctx context.Context, task *Task, queue string, delay time.Duration) error
DispatchIn publishes a task that should become visible after the given delay.
type DispatchFunc ¶
DispatchFunc publishes a task to a queue.
type DispatchMiddleware ¶
type DispatchMiddleware func(DispatchFunc) DispatchFunc
DispatchMiddleware wraps task publishing.
type MessagePropagator ¶
type MessagePropagator interface {
Inject(context.Context, TextMapCarrier)
Extract(context.Context, TextMapCarrier) context.Context
}
MessagePropagator injects and extracts context values into message headers.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option configures a client or worker.
func WithDispatchMiddleware ¶
func WithDispatchMiddleware(middlewares ...DispatchMiddleware) Option
WithDispatchMiddleware registers middleware for task publication.
type ProcessMiddleware ¶
ProcessMiddleware wraps task processing.
type SharedOption ¶
type SharedOption interface {
Option
WorkerOption
}
SharedOption can be passed to both client and worker constructors.
func WithPropagator ¶
func WithPropagator(propagator MessagePropagator) SharedOption
WithPropagator configures message context propagation for both client and worker.
func WithStreamName ¶
func WithStreamName(name string) SharedOption
WithStreamName overrides the JetStream stream name.
func WithSubjectPrefix ¶
func WithSubjectPrefix(prefix string) SharedOption
WithSubjectPrefix overrides the publish subject prefix.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task is a serializable unit of work. It intentionally mirrors the simple "type + payload" model used by queue systems such as Laravel queue jobs and asynq tasks.
func (*Task) WithMessageID ¶
WithMessageID sets the JetStream message ID used for publish deduplication.
type TextMapCarrier ¶
TextMapCarrier is a minimal carrier interface for context propagation.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker consumes tasks from a queue and dispatches them to registered handlers.
func NewWorker ¶
NewWorker constructs a worker for a single queue and ensures the required stream and consumer exist.
func (*Worker) IsReady ¶
IsReady reports whether the underlying NATS connection is ready to accept new work. Ready means the connection is currently in CONNECTED state.
func (*Worker) Run ¶
Run starts the fetch loop and blocks until ctx is canceled or the underlying NATS connection is permanently closed. Temporary disconnects are treated as recoverable: the worker waits for the connection to return, ensures the stream and consumer exist again, and then resumes fetching.
type WorkerOption ¶
type WorkerOption interface {
// contains filtered or unexported methods
}
WorkerOption configures a worker.
func WithAckWait ¶
func WithAckWait(wait time.Duration) WorkerOption
WithAckWait overrides the consumer AckWait setting.
func WithConcurrency ¶
func WithConcurrency(n int) WorkerOption
WithConcurrency overrides the number of tasks processed in parallel by the worker.
func WithConsumerPrefix ¶
func WithConsumerPrefix(prefix string) WorkerOption
WithConsumerPrefix overrides the consumer name prefix.
func WithDLQSuffix ¶
func WithDLQSuffix(suffix string) WorkerOption
WithDLQSuffix overrides the suffix used for dead-letter queues.
func WithDurable ¶
func WithDurable(name string) WorkerOption
WithDurable overrides the durable consumer name.
func WithFetchBatch ¶
func WithFetchBatch(size int) WorkerOption
WithFetchBatch overrides the worker fetch batch size.
func WithFetchTimeout ¶
func WithFetchTimeout(timeout time.Duration) WorkerOption
WithFetchTimeout overrides the worker fetch timeout.
func WithIdleWait ¶
func WithIdleWait(wait time.Duration) WorkerOption
WithIdleWait overrides the delay used after an empty poll.
func WithMaxAckPending ¶
func WithMaxAckPending(n int) WorkerOption
WithMaxAckPending overrides the consumer MaxAckPending setting.
func WithMaxRetries ¶
func WithMaxRetries(n int) WorkerOption
WithMaxRetries overrides the maximum number of retries after the first failed processing attempt. -1 means unlimited retries.
func WithProcessMiddleware ¶
func WithProcessMiddleware(middlewares ...ProcessMiddleware) WorkerOption
WithProcessMiddleware registers middleware for task processing.
func WithProgressInterval ¶
func WithProgressInterval(interval time.Duration) WorkerOption
WithProgressInterval overrides how often the worker sends InProgress while a handler is still running.
func WithRetryBackoff ¶
func WithRetryBackoff(delays ...time.Duration) WorkerOption
WithRetryBackoff configures retry delays. When the number of retries exceeds the provided delays, the last delay is reused.
func WithTaskTimeout ¶
func WithTaskTimeout(timeout time.Duration) WorkerOption
WithTaskTimeout sets the maximum time allowed for a single handler execution. A zero value disables the timeout.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
basic
command
|
|
|
delayed
command
|
|
|
middleware-headers
command
|
|
|
otel
command
|
|
|
prometheus
command
|
|
|
retries-dlq
command
|
|
|
middleware
|
|
