task

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2026 License: Apache-2.0 Imports: 16 Imported by: 0

README

task

Distributed task dispatch for agent swarms, built on NATS JetStream.

task is the swarmkit primitive for "send a unit of work to an agent and get the result back." A Dispatcher publishes tasks; a Worker registers handlers for one or more capabilities and processes tasks as they arrive. Tasks are durable — once published, they persist in JetStream until a worker processes them, so dispatchers and workers do not need to be online at the same time.

The two roles

Dispatcher                               Worker
    │                                        │
    │─── Run("coder", msg) ─────────┐        │
    │                               ▼        │
    │                       JetStream stream │
    │                               │        │
    │                               └──────▶ │
    │                                        │ Handle("coder", fn)
    │                                        │   invokes fn
    │                                        │
    │◀──────── done.<taskID> ────────────────│
    │                                        │
 Result                                  Result
  • Dispatcher = publishes tasks, receives results. Use when you are the code deciding what work needs doing.
  • Worker = subscribes to a capability, processes tasks, returns results. Use when you are the code that does the work.

Both roles are independent constructors — an application that both dispatches and handles tasks creates both a Dispatcher and a Worker.

Quickstart

Dispatcher
import (
    "context"
    "github.com/vinayprograms/swarmkit/messaging"
    "github.com/vinayprograms/swarmkit/task"
)

disp, err := task.NewDispatcher(task.Config{
    NATS: messaging.NATSConfig{URL: "nats://localhost:4222"},
})
if err != nil { /* ... */ }
defer disp.Close()

// Synchronous: block until the result arrives.
result, err := disp.Run(ctx, "coder", task.NewMessage("", map[string]string{
    "file": "main.go",
}))
Worker
worker, err := task.NewWorker(task.Config{
    NATS: messaging.NATSConfig{URL: "nats://localhost:4222", Name: "worker-1"},
})
if err != nil { /* ... */ }
defer worker.Close()

worker.Handle("coder", func(ctx context.Context, msg *task.Message) (*task.Result, error) {
    // ... do the work using msg.Inputs ...
    result := task.NewResult(msg.ID, "worker-1", task.StatusSuccess)
    result.Outputs = map[string]any{"diff": "..."}
    return result, nil
})

// Worker runs in the background until Close is called.

Dispatch modes

Synchronous — Run

Run blocks until a Result arrives or the caller's context is done. Use it when the caller has nothing useful to do until the task completes.

result, err := disp.Run(ctx, "coder", msg)
Asynchronous — Start + Future

Start publishes the task and returns a Future. The caller can do other work and later call Future.Wait(ctx) to retrieve the result. Use this when you need to dispatch multiple tasks concurrently or overlap dispatch with other work.

future, err := disp.Start(ctx, "coder", msg)
// ... do other work ...
result, err := future.Wait(ctx)

future.ID() // returns the task ID

The pair mirrors os/exec.Cmd.Run and os/exec.Cmd.Start/Wait.

Capabilities

A capability is a string naming a category of work — "coder", "reviewer", "encode", whatever your application finds meaningful. Dispatchers pass a capability to each Run/Start. Workers register handlers per capability via Handle.

Multiple workers that register for the same capability share a JetStream pull consumer — each message is delivered to exactly one of them (competing consumers). You can scale worker count up or down without any coordination.

// Three workers for "coder" — tasks distributed round-robin across them.
w1, _ := task.NewWorker(cfg); w1.Handle("coder", fn)
w2, _ := task.NewWorker(cfg); w2.Handle("coder", fn)
w3, _ := task.NewWorker(cfg); w3.Handle("coder", fn)

A single worker can register for multiple capabilities:

worker.Handle("coder", coderFn)
worker.Handle("reviewer", reviewerFn)

Handler semantics

A Handler returns either (*Result, nil) on success or (nil, error) on failure.

  • Success: the handler's result is published to the dispatcher. If fields like ID, Agent, CompletedAt, or Duration are zero in the returned Result, task fills them in automatically.
  • Failure: task publishes a StatusFailed result with the error message in Result.Error. The dispatcher's Run call returns the Result, not a Go error — a handler failure is still a valid reply.

Handler context honors msg.Timeout (in seconds) if set.

Wire format

Message and Result are JSON envelopes with a Version field set automatically by NewMessage and NewResult. See message.go for the full schema. Versioning lets the wire format evolve without breaking older consumers — consumers seeing an unknown future version should log and skip rather than panic.

Errors

Error Meaning
ErrClosed Operation attempted on a closed Dispatcher or Worker.
ErrInvalidMessage Message.Validate failed (currently: empty ID, though NewMessage and Start auto-generate an ID if empty).
ErrInvalidCapability Empty capability string passed to Run, Start, or Handle.
ErrTimeout Run or Future.Wait exceeded the caller's deadline before a result arrived.

Use errors.Is to check.

What task does not do

  • Task lifecycle persistence beyond JetStream. task does not maintain a "completed tasks" history or a database of past work. JetStream's retention policy is the only durability layer.
  • Idempotency enforcement. Message has an Idempotency field for consumers that want to dedupe, but task itself does not check it.
  • Retry policies beyond JetStream's MaxDeliver. If a handler fails, the result is published as StatusFailed; task does not automatically retry.
  • Cross-capability routing (workflows / DAGs). A task is dispatched to one capability. Chaining multiple tasks into a workflow is the application's responsibility — Message.Parent and Message.Prior are there to help track chains but task does not orchestrate them.

Applications that need any of the above layer them on top of the primitive, not inside it.

Documentation

Overview

Package task distributes units of work across an agent swarm using NATS JetStream for durable delivery.

A Dispatcher publishes tasks for a named capability and retrieves the resulting Result. A Worker registers handlers for one or more capabilities and processes tasks as they arrive. Tasks are durable: a dispatched task persists in JetStream until a worker processes it, so dispatchers and workers do not need to be online simultaneously.

Dispatching

disp, _ := task.NewDispatcher(task.Config{
    NATS: messaging.NATSConfig{URL: "nats://localhost:4222"},
})
defer disp.Close()

// Synchronous: block until the result arrives.
result, _ := disp.Run(ctx, "coder", task.NewMessage("", map[string]string{
    "file": "main.go",
}))

// Asynchronous: start and wait later.
future, _ := disp.Start(ctx, "coder", task.NewMessage("", inputs))
// ... do other work ...
result, _ = future.Wait(ctx)

Handling

worker, _ := task.NewWorker(task.Config{
    NATS: messaging.NATSConfig{URL: "nats://localhost:4222"},
})
defer worker.Close()

worker.Handle("coder", func(ctx context.Context, msg *task.Message) (*task.Result, error) {
    // ... do the work ...
    return task.NewResult(msg.ID, "worker-1", task.StatusSuccess), nil
})

Capabilities and routing

Tasks are routed by capability name. Dispatchers pass the capability to Dispatcher.Run or Dispatcher.Start; workers register a handler for the capability with Worker.Handle. Multiple workers handling the same capability share a JetStream pull consumer and compete for messages — each message is delivered to exactly one worker.

The underlying NATS subjects (work.<capability>.<taskID> and done.<taskID>) and the JetStream stream name are implementation details hidden from the consumer.

Versioning

Every Message and Result carries a Version field set by NewMessage and NewResult. Consumers receiving an envelope with an unknown future version should log and skip rather than panic.

Index

Constants

View Source
const Version = 1

Version is the current wire format version for Message and Result. Consumers receiving an envelope with an unknown future version should log and skip rather than panic.

Variables

View Source
var (
	// ErrClosed is returned when an operation is attempted on a closed
	// Dispatcher or Worker.
	ErrClosed = errors.New("task: closed")

	// ErrInvalidMessage is returned when a Message fails validation
	// (for example, an empty ID).
	ErrInvalidMessage = errors.New("task: invalid message")

	// ErrInvalidCapability is returned when a capability name is empty
	// or otherwise invalid.
	ErrInvalidCapability = errors.New("task: invalid capability")

	// ErrTimeout is returned when Run or Future.Wait exceeds the
	// caller's deadline before a Result arrives.
	ErrTimeout = errors.New("task: timeout waiting for result")
)

Sentinel errors returned by Dispatcher and Worker.

Functions

This section is empty.

Types

type Config

type Config struct {
	// NATS controls the NATS connection used to publish and consume
	// tasks. The dispatcher and worker dial their own connection using
	// this config.
	NATS messaging.NATSConfig
}

Config configures a task Dispatcher or Worker.

type Dispatcher

type Dispatcher interface {
	// Run dispatches msg to a worker registered for capability and blocks
	// until a Result arrives or ctx is done. Returns [ErrTimeout] if the
	// caller's deadline passes before the result arrives.
	Run(ctx context.Context, capability string, msg *Message) (*Result, error)

	// Start dispatches msg to a worker registered for capability and
	// returns a [Future] that resolves when the Result arrives. Use
	// Start for fire-and-forget plus deferred retrieval.
	Start(ctx context.Context, capability string, msg *Message) (Future, error)

	// Shutdown waits for outstanding futures to resolve, up to ctx's
	// deadline, then closes the dispatcher. Use this for graceful
	// shutdown so late-arriving results are not lost. If ctx expires
	// before all futures resolve, remaining ones are aborted with
	// [ErrClosed] and Shutdown returns ctx.Err().
	//
	// Shutdown's signature matches agentkit/shutdown.Handler so
	// instances can be registered with shutdown.Sequence directly.
	Shutdown(ctx context.Context) error

	// Close releases the underlying NATS connection and any open
	// subscriptions immediately. Outstanding futures are aborted
	// with [ErrClosed]. After Close, further operations return
	// [ErrClosed]. Use Shutdown for graceful drain.
	Close() error
}

Dispatcher publishes tasks and retrieves their results. Safe for concurrent use by multiple goroutines.

func NewDispatcher

func NewDispatcher(cfg Config) (Dispatcher, error)

NewDispatcher creates a Dispatcher that dials NATS using cfg.NATS and ensures the TASKS JetStream stream exists.

type Future

type Future interface {
	// ID returns the task ID this Future refers to.
	ID() string

	// Wait blocks until a Result is available or ctx is done. Returns
	// [ErrTimeout] if the wait exceeds the caller's deadline. Calling
	// Wait more than once is safe: subsequent calls return the same
	// Result.
	Wait(ctx context.Context) (*Result, error)
}

Future is a handle to an in-flight task dispatched via Dispatcher.Start. It is resolved when a Result is published on the task's reply subject, or when the caller's context is cancelled.

type Handler

type Handler func(ctx context.Context, msg *Message) (*Result, error)

Handler processes a task and returns a Result. If the handler returns a non-nil error, the worker publishes a StatusFailed Result carrying the error message.

type Message

type Message struct {
	// Wire format version. Set automatically by NewMessage.
	Version int `json:"version"`

	// ID uniquely identifies this task instance. NewMessage generates one
	// if empty.
	ID string `json:"id"`

	// Idempotency is an optional deduplication key. Consumers that want
	// to deduplicate retried submissions treat equal values as the same
	// logical task. This package does not enforce idempotency itself.
	Idempotency string `json:"idempotency,omitempty"`

	// CorrelationID spans an entire request chain for distributed
	// tracing. Propagate it unchanged through child tasks.
	CorrelationID string `json:"correlation_id,omitempty"`

	// Parent is the ID of the task that spawned this one, if any.
	Parent string `json:"parent,omitempty"`

	// Root is the ID of the original task that started a chain, if any.
	Root string `json:"root,omitempty"`

	// Timeout caps execution time in seconds. Zero means the consumer's
	// default.
	Timeout int `json:"timeout,omitempty"`

	// MaxAttempts limits retries. Zero means no retries.
	MaxAttempts int `json:"max_attempts,omitempty"`

	// Attempt is the current attempt number, one-indexed.
	Attempt int `json:"attempt,omitempty"`

	// Inputs is the task payload.
	Inputs map[string]string `json:"inputs"`

	// Prior carries outputs from upstream tasks in a multi-step workflow,
	// keyed by the upstream task's identifier.
	Prior map[string]json.RawMessage `json:"prior,omitempty"`

	// CreatedAt is when the message was created by the caller.
	CreatedAt time.Time `json:"created_at"`

	// SubmittedAt is when the message was submitted for dispatch.
	SubmittedAt time.Time `json:"submitted_at"`

	// SubmittedBy identifies the submitter (orchestrator, user, etc.).
	SubmittedBy string `json:"submitted_by,omitempty"`

	// Tags are free-form labels for filtering.
	Tags []string `json:"tags,omitempty"`

	// Metadata is arbitrary key-value context.
	Metadata map[string]string `json:"metadata,omitempty"`
}

Message is the payload envelope dispatched to a worker. It carries identity, correlation, execution control, inputs, and metadata.

Routing (which worker receives the message) is determined by the capability string passed to Dispatcher.Run or Dispatcher.Start — it is not a field on Message.

func NewMessage

func NewMessage(id string, inputs map[string]string) *Message

NewMessage constructs a Message with the required ID and inputs set and the current wire version stamped.

func UnmarshalMessage

func UnmarshalMessage(data []byte) (*Message, error)

UnmarshalMessage deserializes a message from JSON.

func (*Message) Marshal

func (m *Message) Marshal() ([]byte, error)

Marshal serializes the message to JSON.

func (*Message) Validate

func (m *Message) Validate() error

Validate reports whether the message has the minimum required fields. Returns ErrInvalidMessage if ID is empty. Initializes a nil Inputs map to an empty one as a convenience.

type Result

type Result struct {
	// Wire format version. Set automatically by NewResult.
	Version int `json:"version"`

	// ID is the task ID this Result corresponds to.
	ID string `json:"id"`

	// CorrelationID propagates the correlation from the originating
	// Message unchanged.
	CorrelationID string `json:"correlation_id,omitempty"`

	// Status is the outcome of the task.
	Status Status `json:"status"`

	// Outputs is the worker's structured result payload.
	Outputs any `json:"outputs,omitempty"`

	// Error carries the failure reason when Status is StatusFailed.
	Error string `json:"error,omitempty"`

	// Agent identifies the worker that produced this Result.
	Agent string `json:"agent"`

	// Attempt is the attempt number this Result was produced on,
	// one-indexed.
	Attempt int `json:"attempt"`

	// Duration is the execution time in milliseconds.
	Duration int64 `json:"duration"`

	// CompletedAt is when the Result was produced.
	CompletedAt time.Time `json:"completed_at"`

	// Metadata is arbitrary key-value context.
	Metadata map[string]string `json:"metadata,omitempty"`
}

Result is the response envelope returned after a Message is executed.

func NewResult

func NewResult(id, agent string, status Status) *Result

NewResult constructs a Result with required fields set and the current wire version stamped.

func UnmarshalResult

func UnmarshalResult(data []byte) (*Result, error)

UnmarshalResult deserializes a result from JSON.

func (*Result) Marshal

func (r *Result) Marshal() ([]byte, error)

Marshal serializes the result to JSON.

type Status

type Status string

Status is the outcome of a task execution as reported by a worker.

const (
	StatusSuccess Status = "success"
	StatusFailed  Status = "failed"
	StatusTimeout Status = "timeout"
)

type Worker

type Worker interface {
	// Handle registers fn as the worker for capability and starts
	// consuming tasks. Registration returns immediately; tasks are
	// processed asynchronously until Close or Shutdown. Multiple
	// Handle calls for different capabilities may coexist on the
	// same Worker.
	Handle(capability string, fn Handler) error

	// Shutdown stops accepting new tasks and waits for in-flight
	// handlers to finish, up to ctx's deadline. After Shutdown
	// returns, the underlying NATS connection is closed regardless
	// of whether handlers completed in time. If ctx expires before
	// handlers finish, Shutdown returns ctx.Err() and the connection
	// is closed immediately, killing any still-running handlers.
	//
	// Use Shutdown for graceful drain (e.g. SIGTERM with a 30s
	// deadline). Use Close for immediate stop.
	//
	// Shutdown's signature matches agentkit/shutdown.Handler so
	// instances can be registered with shutdown.Sequence directly.
	Shutdown(ctx context.Context) error

	// Close stops all consumers and releases the NATS connection
	// immediately. After Close, further calls to Handle return
	// [ErrClosed]. In-flight handlers are allowed to finish (Close
	// blocks until they do); use Shutdown to bound the wait.
	Close() error
}

Worker subscribes to a JetStream pull consumer for a given capability and invokes the registered Handler for each message delivered.

func NewWorker

func NewWorker(cfg Config) (Worker, error)

NewWorker creates a Worker that dials NATS using cfg.NATS.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL