Documentation
¶
Overview ¶
Package bus provides a message bus abstraction for agent communication. It supports publish/subscribe, request/reply, and task queue patterns. The default implementation uses NATS, with an in-memory option for testing.
Index ¶
- Variables
- type Config
- type MemoryBus
- func (b *MemoryBus) Close() error
- func (b *MemoryBus) Publish(ctx context.Context, subject string, data []byte) error
- func (b *MemoryBus) Queue(name string) TaskQueue
- func (b *MemoryBus) QueueSubscribe(ctx context.Context, subject, queue string, handler MessageHandler) (Subscription, error)
- func (b *MemoryBus) Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error)
- func (b *MemoryBus) Subscribe(ctx context.Context, subject string, handler MessageHandler) (Subscription, error)
- type Message
- type MessageBus
- type MessageHandler
- type NATSBus
- func (b *NATSBus) Close() error
- func (b *NATSBus) Conn() *nats.Conn
- func (b *NATSBus) JetStream() jetstream.JetStream
- func (b *NATSBus) Publish(ctx context.Context, subject string, data []byte) error
- func (b *NATSBus) Queue(name string) TaskQueue
- func (b *NATSBus) QueueSubscribe(ctx context.Context, subject, queue string, handler MessageHandler) (Subscription, error)
- func (b *NATSBus) Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error)
- func (b *NATSBus) Subscribe(ctx context.Context, subject string, handler MessageHandler) (Subscription, error)
- type Subscription
- type Task
- type TaskQueue
Constants ¶
This section is empty.
Variables ¶
var ( // ErrTimeout is returned when a request times out waiting for a response. ErrTimeout = errors.New("request timeout") // ErrNoResponders is returned when no subscribers are available to handle a request. ErrNoResponders = errors.New("no responders available") // ErrClosed is returned when operating on a closed bus or subscription. ErrClosed = errors.New("bus or subscription closed") // ErrQueueEmpty is returned when pulling from an empty queue with no waiters. ErrQueueEmpty = errors.New("queue empty") )
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// URL is the NATS server URL (e.g., "nats://localhost:4222").
// Ignored for in-memory bus.
URL string
// Name is a client identifier for debugging/monitoring.
Name string
// Timeout is the default timeout for operations.
Timeout time.Duration
}
Config holds configuration for creating a MessageBus.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config with sensible defaults.
type MemoryBus ¶
type MemoryBus struct {
// contains filtered or unexported fields
}
MemoryBus is an in-memory implementation of MessageBus for testing. It supports wildcards and request/reply but does not persist messages.
func NewMemoryBus ¶
func NewMemoryBus() *MemoryBus
NewMemoryBus creates a new in-memory message bus.
func (*MemoryBus) QueueSubscribe ¶
func (b *MemoryBus) QueueSubscribe(ctx context.Context, subject, queue string, handler MessageHandler) (Subscription, error)
func (*MemoryBus) Subscribe ¶
func (b *MemoryBus) Subscribe(ctx context.Context, subject string, handler MessageHandler) (Subscription, error)
type Message ¶
type Message struct {
Subject string
Data []byte
ReplyTo string // Set if sender expects a response
}
Message represents an incoming message from the bus.
type MessageBus ¶
type MessageBus interface {
// Publish sends a message to all subscribers of the given subject.
// Returns immediately; does not wait for message delivery.
Publish(ctx context.Context, subject string, data []byte) error
// Subscribe registers a handler for messages on the given subject.
// The handler is called in a separate goroutine for each message.
// Supports wildcards: "buckley.agent.*" matches "buckley.agent.abc".
Subscribe(ctx context.Context, subject string, handler MessageHandler) (Subscription, error)
// Request sends a message and waits for a single response (request/reply pattern).
// Useful for synchronous agent-to-agent communication.
Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error)
// QueueSubscribe creates a queue subscription where messages are load-balanced
// across subscribers in the same queue group.
QueueSubscribe(ctx context.Context, subject, queue string, handler MessageHandler) (Subscription, error)
// Queue returns a TaskQueue for the given name, backed by this bus.
Queue(name string) TaskQueue
// Close shuts down the bus and all subscriptions.
Close() error
}
MessageBus is the core interface for agent communication. Implementations must be safe for concurrent use.
type MessageHandler ¶
MessageHandler processes incoming messages. For request/reply, return data to send as response; return nil for no response.
type NATSBus ¶
type NATSBus struct {
// contains filtered or unexported fields
}
NATSBus implements MessageBus using NATS.
func NewNATSBus ¶
NewNATSBus creates a new NATS-backed message bus.
func NewNATSBusFromConn ¶
NewNATSBusFromConn creates a NATSBus from an existing connection. Useful for testing with embedded NATS server.
func (*NATSBus) Conn ¶
Conn returns the underlying NATS connection. Useful for advanced operations not exposed by MessageBus.
func (*NATSBus) QueueSubscribe ¶
func (b *NATSBus) QueueSubscribe(ctx context.Context, subject, queue string, handler MessageHandler) (Subscription, error)
func (*NATSBus) Subscribe ¶
func (b *NATSBus) Subscribe(ctx context.Context, subject string, handler MessageHandler) (Subscription, error)
type Subscription ¶
type Subscription interface {
// Unsubscribe stops receiving messages and cleans up resources.
Unsubscribe() error
// Subject returns the subject pattern this subscription is for.
Subject() string
}
Subscription represents an active subscription that can be cancelled.
type TaskQueue ¶
type TaskQueue interface {
// Push adds a task to the queue.
Push(ctx context.Context, data []byte) error
// Pull retrieves the next task from the queue.
// Blocks until a task is available or context is cancelled.
Pull(ctx context.Context) (*Task, error)
// Ack acknowledges successful processing of a task.
Ack(ctx context.Context, taskID string) error
// Nack negatively acknowledges a task, returning it to the queue for retry.
Nack(ctx context.Context, taskID string) error
// Len returns the approximate number of pending tasks.
Len(ctx context.Context) (int, error)
// Name returns the queue name.
Name() string
}
TaskQueue provides a persistent work queue for task distribution. Tasks are distributed to workers and must be explicitly acknowledged.