Documentation
¶
Overview ¶
Package bus provides message bus clients for agent-to-agent communication.
The MessageBus interface enables pub/sub and request/reply patterns over various backends (NATS, in-memory). All implementations use channel-based APIs for Go-idiomatic concurrent use.
Package bus provides message bus clients for agent-to-agent communication.
Overview ¶
The MessageBus interface enables pub/sub and request/reply patterns for distributed agent communication. All implementations use channel-based APIs for Go-idiomatic concurrent use.
Available Implementations ¶
- NATSBus: Production-grade messaging using NATS
- MemoryBus: In-memory implementation for testing and single-process use
Patterns ¶
Pub/Sub - broadcast to all subscribers:
bus.Publish("events.user", data)
sub, _ := bus.Subscribe("events.user")
for msg := range sub.Messages() {
// Handle message
}
Queue Groups - load balanced across workers:
sub, _ := bus.QueueSubscribe("tasks", "workers")
// Only one worker in the group receives each message
Request/Reply - synchronous RPC:
// Responder
sub, _ := bus.Subscribe("service")
for msg := range sub.Messages() {
bus.Publish(msg.Reply, response)
}
// Requester
reply, _ := bus.Request("service", data, timeout)
Queue Groups for Swarm Agents ¶
Queue subscriptions enable load balancing across agent instances:
- Multiple agents subscribe to same subject with same queue name
- Each message delivered to exactly one agent in the queue group
- Natural scaling: add more agents to handle more load
- No coordination needed between agents
Index ¶
- Variables
- func ValidateSubject(subject string) error
- type Config
- type MemoryBus
- func (b *MemoryBus) Close() error
- func (b *MemoryBus) Publish(subject string, data []byte) error
- func (b *MemoryBus) QueueSubscribe(subject, queue string) (Subscription, error)
- func (b *MemoryBus) Request(subject string, data []byte, timeout time.Duration) (*Message, error)
- func (b *MemoryBus) Subscribe(subject string) (Subscription, error)
- type Message
- type MessageBus
- type NATSBus
- func (b *NATSBus) Close() error
- func (b *NATSBus) Conn() *nats.Conn
- func (b *NATSBus) Publish(subject string, data []byte) error
- func (b *NATSBus) QueueSubscribe(subject, queue string) (Subscription, error)
- func (b *NATSBus) Request(subject string, data []byte, timeout time.Duration) (*Message, error)
- func (b *NATSBus) Subscribe(subject string) (Subscription, error)
- type NATSConfig
- type Subscription
Constants ¶
This section is empty.
Variables ¶
var ( ErrClosed = errors.New("bus closed") ErrTimeout = errors.New("request timeout") ErrNoResponders = errors.New("no responders") ErrInvalidSubject = errors.New("invalid subject") )
Common errors.
Functions ¶
func ValidateSubject ¶
ValidateSubject checks if a subject is valid.
Types ¶
type Config ¶
type Config struct {
// BufferSize for subscription channels.
// Default: 256
BufferSize int
}
Config holds common bus configuration.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns configuration with sensible defaults.
type MemoryBus ¶
type MemoryBus struct {
// contains filtered or unexported fields
}
MemoryBus implements MessageBus using in-memory channels. Useful for testing and single-process scenarios.
func NewMemoryBus ¶
NewMemoryBus creates a new in-memory message bus.
func (*MemoryBus) QueueSubscribe ¶
func (b *MemoryBus) QueueSubscribe(subject, queue string) (Subscription, error)
QueueSubscribe creates a queue subscription.
type Message ¶
type Message struct {
// Subject the message was published to.
Subject string
// Data is the message payload.
Data []byte
// Reply is the reply subject for request/reply pattern.
// Empty for regular pub/sub messages.
Reply string
}
Message represents a message received from the bus.
type MessageBus ¶
type MessageBus interface {
// Publish sends a message to all subscribers of a subject.
Publish(subject string, data []byte) error
// Subscribe creates a subscription to a subject.
// All subscribers receive all messages.
Subscribe(subject string) (Subscription, error)
// QueueSubscribe creates a queue subscription.
// Messages are load-balanced across queue members.
QueueSubscribe(subject, queue string) (Subscription, error)
// Request sends a request and waits for a single reply.
// Returns ErrTimeout if no reply within timeout.
Request(subject string, data []byte, timeout time.Duration) (*Message, error)
// Close shuts down the bus connection.
Close() error
}
MessageBus provides pub/sub and request/reply messaging.
type NATSBus ¶
type NATSBus struct {
// contains filtered or unexported fields
}
NATSBus implements MessageBus using NATS.
func NewNATSBus ¶
func NewNATSBus(cfg NATSConfig) (*NATSBus, error)
NewNATSBus creates a new NATS message bus.
func NewNATSBusFromConn ¶
func NewNATSBusFromConn(conn *nats.Conn, cfg NATSConfig) *NATSBus
NewNATSBusFromConn creates a NATSBus from an existing connection.
func (*NATSBus) QueueSubscribe ¶
func (b *NATSBus) QueueSubscribe(subject, queue string) (Subscription, error)
QueueSubscribe creates a queue subscription.
type NATSConfig ¶
type NATSConfig struct {
Config // Embed base config
// URL is the NATS server URL (e.g., "nats://localhost:4222").
URL string
// Name is the client name for identification.
Name string
// Token for token-based auth.
Token string
// User and Password for basic auth.
User string
Password string
// ReconnectWait is the time to wait between reconnection attempts.
ReconnectWait time.Duration
// MaxReconnects is the maximum number of reconnection attempts.
// -1 = unlimited
MaxReconnects int
// ConnectTimeout for initial connection.
ConnectTimeout time.Duration
}
NATSConfig holds NATS connection configuration.
func DefaultNATSConfig ¶
func DefaultNATSConfig() NATSConfig
DefaultNATSConfig returns configuration with sensible defaults.
type Subscription ¶
type Subscription interface {
// Messages returns the channel for incoming messages.
// Channel is closed when subscription ends.
Messages() <-chan *Message
// Unsubscribe cancels the subscription.
Unsubscribe() error
}
Subscription represents an active subscription.