Documentation
¶
Overview ¶
Package task distributes units of work across an agent swarm using NATS JetStream for durable delivery.
A Dispatcher publishes tasks for a named capability and retrieves the resulting Result. A Worker registers handlers for one or more capabilities and processes tasks as they arrive. Tasks are durable: a dispatched task persists in JetStream until a worker processes it, so dispatchers and workers do not need to be online simultaneously.
Dispatching ¶
disp, _ := task.NewDispatcher(task.Config{
NATS: messaging.NATSConfig{URL: "nats://localhost:4222"},
})
defer disp.Close()
// Synchronous: block until the result arrives.
result, _ := disp.Run(ctx, "coder", task.NewMessage("", map[string]string{
"file": "main.go",
}))
// Asynchronous: start and wait later.
future, _ := disp.Start(ctx, "coder", task.NewMessage("", inputs))
// ... do other work ...
result, _ = future.Wait(ctx)
Handling ¶
worker, _ := task.NewWorker(task.Config{
NATS: messaging.NATSConfig{URL: "nats://localhost:4222"},
})
defer worker.Close()
worker.Handle("coder", func(ctx context.Context, msg *task.Message) (*task.Result, error) {
// ... do the work ...
return task.NewResult(msg.ID, "worker-1", task.StatusSuccess), nil
})
Capabilities and routing ¶
Tasks are routed by capability name. Dispatchers pass the capability to Dispatcher.Run or Dispatcher.Start; workers register a handler for the capability with Worker.Handle. Multiple workers handling the same capability share a JetStream pull consumer and compete for messages — each message is delivered to exactly one worker.
The underlying NATS subjects (work.<capability>.<taskID> and done.<taskID>) and the JetStream stream name are implementation details hidden from the consumer.
Versioning ¶
Every Message and Result carries a Version field set by NewMessage and NewResult. Consumers receiving an envelope with an unknown future version should log and skip rather than panic.
Index ¶
Constants ¶
const Version = 1
Version is the current wire format version for Message and Result. Consumers receiving an envelope with an unknown future version should log and skip rather than panic.
Variables ¶
var ( // ErrClosed is returned when an operation is attempted on a closed // Dispatcher or Worker. ErrClosed = errors.New("task: closed") // ErrInvalidMessage is returned when a Message fails validation // (for example, an empty ID). ErrInvalidMessage = errors.New("task: invalid message") // ErrInvalidCapability is returned when a capability name is empty // or otherwise invalid. ErrInvalidCapability = errors.New("task: invalid capability") // ErrTimeout is returned when Run or Future.Wait exceeds the // caller's deadline before a Result arrives. ErrTimeout = errors.New("task: timeout waiting for result") )
Sentinel errors returned by Dispatcher and Worker.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// NATS controls the NATS connection used to publish and consume
// tasks. The dispatcher and worker dial their own connection using
// this config.
NATS messaging.NATSConfig
}
Config configures a task Dispatcher or Worker.
type Dispatcher ¶
type Dispatcher interface {
// Run dispatches msg to a worker registered for capability and blocks
// until a Result arrives or ctx is done. Returns [ErrTimeout] if the
// caller's deadline passes before the result arrives.
Run(ctx context.Context, capability string, msg *Message) (*Result, error)
// Start dispatches msg to a worker registered for capability and
// returns a [Future] that resolves when the Result arrives. Use
// Start for fire-and-forget plus deferred retrieval.
Start(ctx context.Context, capability string, msg *Message) (Future, error)
// Shutdown waits for outstanding futures to resolve, up to ctx's
// deadline, then closes the dispatcher. Use this for graceful
// shutdown so late-arriving results are not lost. If ctx expires
// before all futures resolve, remaining ones are aborted with
// [ErrClosed] and Shutdown returns ctx.Err().
//
// Shutdown's signature matches agentkit/shutdown.Handler so
// instances can be registered with shutdown.Sequence directly.
Shutdown(ctx context.Context) error
// Close releases the underlying NATS connection and any open
// subscriptions immediately. Outstanding futures are aborted
// with [ErrClosed]. After Close, further operations return
// [ErrClosed]. Use Shutdown for graceful drain.
Close() error
}
Dispatcher publishes tasks and retrieves their results. Safe for concurrent use by multiple goroutines.
func NewDispatcher ¶
func NewDispatcher(cfg Config) (Dispatcher, error)
NewDispatcher creates a Dispatcher that dials NATS using cfg.NATS and ensures the TASKS JetStream stream exists.
type Future ¶
type Future interface {
// ID returns the task ID this Future refers to.
ID() string
// Wait blocks until a Result is available or ctx is done. Returns
// [ErrTimeout] if the wait exceeds the caller's deadline. Calling
// Wait more than once is safe: subsequent calls return the same
// Result.
Wait(ctx context.Context) (*Result, error)
}
Future is a handle to an in-flight task dispatched via Dispatcher.Start. It is resolved when a Result is published on the task's reply subject, or when the caller's context is cancelled.
type Handler ¶
Handler processes a task and returns a Result. If the handler returns a non-nil error, the worker publishes a StatusFailed Result carrying the error message.
type Message ¶
type Message struct {
// Wire format version. Set automatically by NewMessage.
Version int `json:"version"`
// ID uniquely identifies this task instance. NewMessage generates one
// if empty.
ID string `json:"id"`
// Idempotency is an optional deduplication key. Consumers that want
// to deduplicate retried submissions treat equal values as the same
// logical task. This package does not enforce idempotency itself.
Idempotency string `json:"idempotency,omitempty"`
// CorrelationID spans an entire request chain for distributed
// tracing. Propagate it unchanged through child tasks.
CorrelationID string `json:"correlation_id,omitempty"`
// Parent is the ID of the task that spawned this one, if any.
Parent string `json:"parent,omitempty"`
// Root is the ID of the original task that started a chain, if any.
Root string `json:"root,omitempty"`
// Timeout caps execution time in seconds. Zero means the consumer's
// default.
Timeout int `json:"timeout,omitempty"`
// MaxAttempts limits retries. Zero means no retries.
MaxAttempts int `json:"max_attempts,omitempty"`
// Attempt is the current attempt number, one-indexed.
Attempt int `json:"attempt,omitempty"`
// Inputs is the task payload.
Inputs map[string]string `json:"inputs"`
// Prior carries outputs from upstream tasks in a multi-step workflow,
// keyed by the upstream task's identifier.
Prior map[string]json.RawMessage `json:"prior,omitempty"`
// CreatedAt is when the message was created by the caller.
CreatedAt time.Time `json:"created_at"`
// SubmittedAt is when the message was submitted for dispatch.
SubmittedAt time.Time `json:"submitted_at"`
// SubmittedBy identifies the submitter (orchestrator, user, etc.).
SubmittedBy string `json:"submitted_by,omitempty"`
// Tags are free-form labels for filtering.
Tags []string `json:"tags,omitempty"`
// Metadata is arbitrary key-value context.
Metadata map[string]string `json:"metadata,omitempty"`
}
Message is the payload envelope dispatched to a worker. It carries identity, correlation, execution control, inputs, and metadata.
Routing (which worker receives the message) is determined by the capability string passed to Dispatcher.Run or Dispatcher.Start — it is not a field on Message.
func NewMessage ¶
NewMessage constructs a Message with the required ID and inputs set and the current wire version stamped.
func UnmarshalMessage ¶
UnmarshalMessage deserializes a message from JSON.
func (*Message) Validate ¶
Validate reports whether the message has the minimum required fields. Returns ErrInvalidMessage if ID is empty. Initializes a nil Inputs map to an empty one as a convenience.
type Result ¶
type Result struct {
// Wire format version. Set automatically by NewResult.
Version int `json:"version"`
// ID is the task ID this Result corresponds to.
ID string `json:"id"`
// CorrelationID propagates the correlation from the originating
// Message unchanged.
CorrelationID string `json:"correlation_id,omitempty"`
// Status is the outcome of the task.
Status Status `json:"status"`
// Outputs is the worker's structured result payload.
Outputs any `json:"outputs,omitempty"`
// Error carries the failure reason when Status is StatusFailed.
Error string `json:"error,omitempty"`
// Agent identifies the worker that produced this Result.
Agent string `json:"agent"`
// Attempt is the attempt number this Result was produced on,
// one-indexed.
Attempt int `json:"attempt"`
// Duration is the execution time in milliseconds.
Duration int64 `json:"duration"`
// CompletedAt is when the Result was produced.
CompletedAt time.Time `json:"completed_at"`
// Metadata is arbitrary key-value context.
Metadata map[string]string `json:"metadata,omitempty"`
}
Result is the response envelope returned after a Message is executed.
func NewResult ¶
NewResult constructs a Result with required fields set and the current wire version stamped.
func UnmarshalResult ¶
UnmarshalResult deserializes a result from JSON.
type Worker ¶
type Worker interface {
// Handle registers fn as the worker for capability and starts
// consuming tasks. Registration returns immediately; tasks are
// processed asynchronously until Close or Shutdown. Multiple
// Handle calls for different capabilities may coexist on the
// same Worker.
Handle(capability string, fn Handler) error
// Shutdown stops accepting new tasks and waits for in-flight
// handlers to finish, up to ctx's deadline. After Shutdown
// returns, the underlying NATS connection is closed regardless
// of whether handlers completed in time. If ctx expires before
// handlers finish, Shutdown returns ctx.Err() and the connection
// is closed immediately, killing any still-running handlers.
//
// Use Shutdown for graceful drain (e.g. SIGTERM with a 30s
// deadline). Use Close for immediate stop.
//
// Shutdown's signature matches agentkit/shutdown.Handler so
// instances can be registered with shutdown.Sequence directly.
Shutdown(ctx context.Context) error
// Close stops all consumers and releases the NATS connection
// immediately. After Close, further calls to Handle return
// [ErrClosed]. In-flight handlers are allowed to finish (Close
// blocks until they do); use Shutdown to bound the wait.
Close() error
}
Worker subscribes to a JetStream pull consumer for a given capability and invokes the registered Handler for each message delivered.