Documentation
¶
Overview ¶
Package results provides task result publication and retrieval for agent coordination.
The ResultPublisher interface enables agents to publish task outputs to a shared store, allowing other agents to subscribe to and retrieve results. This decouples result production from consumption and enables async task pipelines.
Two implementations are provided:
- MemoryPublisher: In-memory storage for testing and single-process scenarios
- BusPublisher: Bus-backed storage with pub/sub notification for distributed systems
Basic Usage ¶
// Create a memory publisher for testing
pub := results.NewMemoryPublisher()
// Publish a result
err := pub.Publish(ctx, "task-123", results.Result{
TaskID: "task-123",
Status: results.StatusSuccess,
Output: []byte(`{"answer": 42}`),
Metadata: map[string]string{"model": "gpt-4"},
})
// Retrieve a result
result, err := pub.Get(ctx, "task-123")
// Subscribe to result updates
ch, err := pub.Subscribe("task-123")
for result := range ch {
fmt.Printf("Result received: %s\n", result.Status)
}
Bus-based Publisher ¶
For distributed systems, use the bus-backed publisher:
nbus, _ := bus.NewNATSBus(bus.NATSConfig{URL: "nats://localhost:4222"})
pub := results.NewBusPublisher(nbus, results.BusPublisherConfig{
SubjectPrefix: "results",
})
// Results are stored in memory but notifications go over the bus
pub.Publish(ctx, "task-123", result)
// Remote subscribers receive notifications via bus
ch, _ := pub.Subscribe("task-123")
The bus publisher enables agents to be notified of result availability without polling, supporting efficient distributed task coordination.
Index ¶
- Variables
- func ValidateResult(r Result) error
- func ValidateTaskID(taskID string) error
- type BusPublisher
- func (p *BusPublisher) Close() error
- func (p *BusPublisher) Delete(ctx context.Context, taskID string) error
- func (p *BusPublisher) Get(ctx context.Context, taskID string) (*Result, error)
- func (p *BusPublisher) List(filter ResultFilter) ([]*Result, error)
- func (p *BusPublisher) Publish(ctx context.Context, taskID string, result Result) error
- func (p *BusPublisher) Subscribe(taskID string) (<-chan *Result, error)
- type BusPublisherConfig
- type MemoryPublisher
- func (p *MemoryPublisher) Close() error
- func (p *MemoryPublisher) Delete(ctx context.Context, taskID string) error
- func (p *MemoryPublisher) Get(ctx context.Context, taskID string) (*Result, error)
- func (p *MemoryPublisher) List(filter ResultFilter) ([]*Result, error)
- func (p *MemoryPublisher) Publish(ctx context.Context, taskID string, result Result) error
- func (p *MemoryPublisher) Subscribe(taskID string) (<-chan *Result, error)
- type Result
- type ResultFilter
- type ResultPublisher
- type ResultStatus
- type Subscription
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotFound = errors.New("result not found") ErrAlreadyExists = errors.New("result already exists") ErrClosed = errors.New("publisher closed") ErrInvalidTaskID = errors.New("invalid task ID") ErrInvalidStatus = errors.New("invalid result status") )
Common errors.
Functions ¶
func ValidateResult ¶
ValidateResult checks if a result is valid for publishing.
func ValidateTaskID ¶
ValidateTaskID checks if a task ID is valid.
Types ¶
type BusPublisher ¶
type BusPublisher struct {
// contains filtered or unexported fields
}
BusPublisher implements ResultPublisher using a message bus for notifications. Results are stored in memory, but updates are broadcast over the bus, enabling distributed subscribers to receive notifications.
func NewBusPublisher ¶
func NewBusPublisher(mb bus.MessageBus, cfg BusPublisherConfig) *BusPublisher
NewBusPublisher creates a new bus-backed result publisher.
func (*BusPublisher) Delete ¶
func (p *BusPublisher) Delete(ctx context.Context, taskID string) error
Delete removes a result by task ID.
func (*BusPublisher) List ¶
func (p *BusPublisher) List(filter ResultFilter) ([]*Result, error)
List returns results matching the filter criteria.
type BusPublisherConfig ¶
type BusPublisherConfig struct {
// SubjectPrefix is the prefix for result subjects.
// Default: "results"
SubjectPrefix string
// BufferSize for subscription channels.
// Default: 16
BufferSize int
}
BusPublisherConfig configures the bus-backed result publisher.
func DefaultBusPublisherConfig ¶
func DefaultBusPublisherConfig() BusPublisherConfig
DefaultBusPublisherConfig returns configuration with sensible defaults.
type MemoryPublisher ¶
type MemoryPublisher struct {
// contains filtered or unexported fields
}
MemoryPublisher implements ResultPublisher using in-memory storage. Useful for testing and single-process scenarios.
func NewMemoryPublisher ¶
func NewMemoryPublisher() *MemoryPublisher
NewMemoryPublisher creates a new in-memory result publisher.
func (*MemoryPublisher) Close ¶
func (p *MemoryPublisher) Close() error
Close shuts down the publisher.
func (*MemoryPublisher) Delete ¶
func (p *MemoryPublisher) Delete(ctx context.Context, taskID string) error
Delete removes a result by task ID.
func (*MemoryPublisher) List ¶
func (p *MemoryPublisher) List(filter ResultFilter) ([]*Result, error)
List returns results matching the filter criteria.
type Result ¶
type Result struct {
// TaskID uniquely identifies the task.
TaskID string
// Status indicates the current state of the result.
Status ResultStatus
// Output contains the task's output data.
// Empty for pending or failed tasks.
Output []byte
// Error contains the error message if Status is StatusFailed.
Error string
// Metadata contains additional key-value data about the result.
Metadata map[string]string
// CreatedAt is when the result was first created.
CreatedAt time.Time
// UpdatedAt is when the result was last updated.
UpdatedAt time.Time
}
Result represents a task's output.
type ResultFilter ¶
type ResultFilter struct {
// Status filters by result status. Empty means all statuses.
Status ResultStatus
// TaskIDPrefix filters by task ID prefix.
TaskIDPrefix string
// CreatedAfter filters results created after this time.
CreatedAfter time.Time
// CreatedBefore filters results created before this time.
CreatedBefore time.Time
// Limit caps the number of results returned. 0 means no limit.
Limit int
// Metadata filters by metadata key-value pairs (all must match).
Metadata map[string]string
}
ResultFilter specifies criteria for listing results.
func (ResultFilter) Matches ¶
func (f ResultFilter) Matches(r *Result) bool
Matches returns true if the result matches the filter criteria.
type ResultPublisher ¶
type ResultPublisher interface {
// Publish stores or updates a task result.
// If the result already exists, it is updated.
Publish(ctx context.Context, taskID string, result Result) error
// Get retrieves a result by task ID.
// Returns ErrNotFound if the result doesn't exist.
Get(ctx context.Context, taskID string) (*Result, error)
// Subscribe returns a channel that receives updates for a task.
// The channel is closed when the task reaches a terminal state
// or when the subscription is cancelled.
// If the result already exists, it is sent immediately.
Subscribe(taskID string) (<-chan *Result, error)
// List returns results matching the filter criteria.
List(filter ResultFilter) ([]*Result, error)
// Delete removes a result by task ID.
// Returns ErrNotFound if the result doesn't exist.
Delete(ctx context.Context, taskID string) error
// Close shuts down the publisher and releases resources.
Close() error
}
ResultPublisher provides result storage, retrieval, and subscription.
type ResultStatus ¶
type ResultStatus string
ResultStatus represents the state of a task result.
const ( // StatusPending indicates the task is still in progress. StatusPending ResultStatus = "pending" // StatusSuccess indicates the task completed successfully. StatusSuccess ResultStatus = "success" // StatusFailed indicates the task failed. StatusFailed ResultStatus = "failed" )
func (ResultStatus) IsTerminal ¶
func (s ResultStatus) IsTerminal() bool
IsTerminal returns true if the status represents a final state.
func (ResultStatus) Valid ¶
func (s ResultStatus) Valid() bool
Valid returns true if the status is a known value.
type Subscription ¶
type Subscription interface {
// Results returns the channel for incoming result updates.
Results() <-chan *Result
// Cancel cancels the subscription.
Cancel() error
}
Subscription represents an active result subscription.