bus

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package bus provides message bus clients for agent-to-agent communication.

The MessageBus interface enables pub/sub and request/reply patterns over various backends (NATS, in-memory). All implementations use channel-based APIs for Go-idiomatic concurrent use.

Package bus provides message bus clients for agent-to-agent communication.

Overview

The MessageBus interface enables pub/sub and request/reply patterns for distributed agent communication. All implementations use channel-based APIs for Go-idiomatic concurrent use.

Available Implementations

  • NATSBus: Production-grade messaging using NATS
  • MemoryBus: In-memory implementation for testing and single-process use

Patterns

Pub/Sub - broadcast to all subscribers:

bus.Publish("events.user", data)
sub, _ := bus.Subscribe("events.user")
for msg := range sub.Messages() {
    // Handle message
}

Queue Groups - load balanced across workers:

sub, _ := bus.QueueSubscribe("tasks", "workers")
// Only one worker in the group receives each message

Request/Reply - synchronous RPC:

// Responder
sub, _ := bus.Subscribe("service")
for msg := range sub.Messages() {
    bus.Publish(msg.Reply, response)
}

// Requester
reply, _ := bus.Request("service", data, timeout)

Queue Groups for Swarm Agents

Queue subscriptions enable load balancing across agent instances:

  • Multiple agents subscribe to same subject with same queue name
  • Each message delivered to exactly one agent in the queue group
  • Natural scaling: add more agents to handle more load
  • No coordination needed between agents

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrClosed         = errors.New("bus closed")
	ErrTimeout        = errors.New("request timeout")
	ErrNoResponders   = errors.New("no responders")
	ErrInvalidSubject = errors.New("invalid subject")
)

Common errors.

Functions

func ValidateSubject

func ValidateSubject(subject string) error

ValidateSubject checks if a subject is valid.

Types

type Config

type Config struct {
	// BufferSize for subscription channels.
	// Default: 256
	BufferSize int
}

Config holds common bus configuration.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns configuration with sensible defaults.

type MemoryBus

type MemoryBus struct {
	// contains filtered or unexported fields
}

MemoryBus implements MessageBus using in-memory channels. Useful for testing and single-process scenarios.

func NewMemoryBus

func NewMemoryBus(cfg Config) *MemoryBus

NewMemoryBus creates a new in-memory message bus.

func (*MemoryBus) Close

func (b *MemoryBus) Close() error

Close shuts down the bus.

func (*MemoryBus) Publish

func (b *MemoryBus) Publish(subject string, data []byte) error

Publish sends a message to all subscribers.

func (*MemoryBus) QueueSubscribe

func (b *MemoryBus) QueueSubscribe(subject, queue string) (Subscription, error)

QueueSubscribe creates a queue subscription.

func (*MemoryBus) Request

func (b *MemoryBus) Request(subject string, data []byte, timeout time.Duration) (*Message, error)

Request sends a request and waits for reply.

func (*MemoryBus) Subscribe

func (b *MemoryBus) Subscribe(subject string) (Subscription, error)

Subscribe creates a subscription to a subject.

type Message

type Message struct {
	// Subject the message was published to.
	Subject string

	// Data is the message payload.
	Data []byte

	// Reply is the reply subject for request/reply pattern.
	// Empty for regular pub/sub messages.
	Reply string
}

Message represents a message received from the bus.

type MessageBus

type MessageBus interface {
	// Publish sends a message to all subscribers of a subject.
	Publish(subject string, data []byte) error

	// Subscribe creates a subscription to a subject.
	// All subscribers receive all messages.
	Subscribe(subject string) (Subscription, error)

	// QueueSubscribe creates a queue subscription.
	// Messages are load-balanced across queue members.
	QueueSubscribe(subject, queue string) (Subscription, error)

	// Request sends a request and waits for a single reply.
	// Returns ErrTimeout if no reply within timeout.
	Request(subject string, data []byte, timeout time.Duration) (*Message, error)

	// Close shuts down the bus connection.
	Close() error
}

MessageBus provides pub/sub and request/reply messaging.

type NATSBus

type NATSBus struct {
	// contains filtered or unexported fields
}

NATSBus implements MessageBus using NATS.

func NewNATSBus

func NewNATSBus(cfg NATSConfig) (*NATSBus, error)

NewNATSBus creates a new NATS message bus.

func NewNATSBusFromConn

func NewNATSBusFromConn(conn *nats.Conn, cfg NATSConfig) *NATSBus

NewNATSBusFromConn creates a NATSBus from an existing connection.

func (*NATSBus) Close

func (b *NATSBus) Close() error

Close shuts down the NATS connection.

func (*NATSBus) Conn

func (b *NATSBus) Conn() *nats.Conn

Conn returns the underlying NATS connection for advanced use.

func (*NATSBus) Publish

func (b *NATSBus) Publish(subject string, data []byte) error

Publish sends a message to a subject.

func (*NATSBus) QueueSubscribe

func (b *NATSBus) QueueSubscribe(subject, queue string) (Subscription, error)

QueueSubscribe creates a queue subscription.

func (*NATSBus) Request

func (b *NATSBus) Request(subject string, data []byte, timeout time.Duration) (*Message, error)

Request sends a request and waits for reply.

func (*NATSBus) Subscribe

func (b *NATSBus) Subscribe(subject string) (Subscription, error)

Subscribe creates a subscription to a subject.

type NATSConfig

type NATSConfig struct {
	Config // Embed base config

	// URL is the NATS server URL (e.g., "nats://localhost:4222").
	URL string

	// Name is the client name for identification.
	Name string

	// Token for token-based auth.
	Token string

	// User and Password for basic auth.
	User     string
	Password string

	// ReconnectWait is the time to wait between reconnection attempts.
	ReconnectWait time.Duration

	// MaxReconnects is the maximum number of reconnection attempts.
	// -1 = unlimited
	MaxReconnects int

	// ConnectTimeout for initial connection.
	ConnectTimeout time.Duration
}

NATSConfig holds NATS connection configuration.

func DefaultNATSConfig

func DefaultNATSConfig() NATSConfig

DefaultNATSConfig returns configuration with sensible defaults.

type Subscription

type Subscription interface {
	// Messages returns the channel for incoming messages.
	// Channel is closed when subscription ends.
	Messages() <-chan *Message

	// Unsubscribe cancels the subscription.
	Unsubscribe() error
}

Subscription represents an active subscription.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL