bus

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package bus provides a message bus abstraction for agent communication. It supports publish/subscribe, request/reply, and task queue patterns. The default implementation uses NATS, with an in-memory option for testing.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrTimeout is returned when a request times out waiting for a response.
	ErrTimeout = errors.New("request timeout")

	// ErrNoResponders is returned when no subscribers are available to handle a request.
	ErrNoResponders = errors.New("no responders available")

	// ErrClosed is returned when operating on a closed bus or subscription.
	ErrClosed = errors.New("bus or subscription closed")

	// ErrQueueEmpty is returned when pulling from an empty queue with no waiters.
	ErrQueueEmpty = errors.New("queue empty")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// URL is the NATS server URL (e.g., "nats://localhost:4222").
	// Ignored for in-memory bus.
	URL string

	// Name is a client identifier for debugging/monitoring.
	Name string

	// Timeout is the default timeout for operations.
	Timeout time.Duration
}

Config holds configuration for creating a MessageBus.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with sensible defaults.

type MemoryBus

type MemoryBus struct {
	// contains filtered or unexported fields
}

MemoryBus is an in-memory implementation of MessageBus for testing. It supports wildcards and request/reply but does not persist messages.

func NewMemoryBus

func NewMemoryBus() *MemoryBus

NewMemoryBus creates a new in-memory message bus.

func (*MemoryBus) Close

func (b *MemoryBus) Close() error

func (*MemoryBus) Publish

func (b *MemoryBus) Publish(ctx context.Context, subject string, data []byte) error

func (*MemoryBus) Queue

func (b *MemoryBus) Queue(name string) TaskQueue

func (*MemoryBus) QueueSubscribe

func (b *MemoryBus) QueueSubscribe(ctx context.Context, subject, queue string, handler MessageHandler) (Subscription, error)

func (*MemoryBus) Request

func (b *MemoryBus) Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error)

func (*MemoryBus) Subscribe

func (b *MemoryBus) Subscribe(ctx context.Context, subject string, handler MessageHandler) (Subscription, error)

type Message

type Message struct {
	Subject string
	Data    []byte
	ReplyTo string // Set if sender expects a response
}

Message represents an incoming message from the bus.

type MessageBus

type MessageBus interface {
	// Publish sends a message to all subscribers of the given subject.
	// Returns immediately; does not wait for message delivery.
	Publish(ctx context.Context, subject string, data []byte) error

	// Subscribe registers a handler for messages on the given subject.
	// The handler is called in a separate goroutine for each message.
	// Supports wildcards: "buckley.agent.*" matches "buckley.agent.abc".
	Subscribe(ctx context.Context, subject string, handler MessageHandler) (Subscription, error)

	// Request sends a message and waits for a single response (request/reply pattern).
	// Useful for synchronous agent-to-agent communication.
	Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error)

	// QueueSubscribe creates a queue subscription where messages are load-balanced
	// across subscribers in the same queue group.
	QueueSubscribe(ctx context.Context, subject, queue string, handler MessageHandler) (Subscription, error)

	// Queue returns a TaskQueue for the given name, backed by this bus.
	Queue(name string) TaskQueue

	// Close shuts down the bus and all subscriptions.
	Close() error
}

MessageBus is the core interface for agent communication. Implementations must be safe for concurrent use.

type MessageHandler

type MessageHandler func(msg *Message) []byte

MessageHandler processes incoming messages. For request/reply, return data to send as response; return nil for no response.

type NATSBus

type NATSBus struct {
	// contains filtered or unexported fields
}

NATSBus implements MessageBus using NATS.

func NewNATSBus

func NewNATSBus(cfg Config) (*NATSBus, error)

NewNATSBus creates a new NATS-backed message bus.

func NewNATSBusFromConn

func NewNATSBusFromConn(conn *nats.Conn) (*NATSBus, error)

NewNATSBusFromConn creates a NATSBus from an existing connection. Useful for testing with embedded NATS server.

func (*NATSBus) Close

func (b *NATSBus) Close() error

func (*NATSBus) Conn

func (b *NATSBus) Conn() *nats.Conn

Conn returns the underlying NATS connection. Useful for advanced operations not exposed by MessageBus.

func (*NATSBus) JetStream

func (b *NATSBus) JetStream() jetstream.JetStream

JetStream returns the JetStream context.

func (*NATSBus) Publish

func (b *NATSBus) Publish(ctx context.Context, subject string, data []byte) error

func (*NATSBus) Queue

func (b *NATSBus) Queue(name string) TaskQueue

func (*NATSBus) QueueSubscribe

func (b *NATSBus) QueueSubscribe(ctx context.Context, subject, queue string, handler MessageHandler) (Subscription, error)

func (*NATSBus) Request

func (b *NATSBus) Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error)

func (*NATSBus) Subscribe

func (b *NATSBus) Subscribe(ctx context.Context, subject string, handler MessageHandler) (Subscription, error)

type Subscription

type Subscription interface {
	// Unsubscribe stops receiving messages and cleans up resources.
	Unsubscribe() error

	// Subject returns the subject pattern this subscription is for.
	Subject() string
}

Subscription represents an active subscription that can be cancelled.

type Task

type Task struct {
	ID   string
	Data []byte
}

Task represents a unit of work pulled from a TaskQueue.

type TaskQueue

type TaskQueue interface {
	// Push adds a task to the queue.
	Push(ctx context.Context, data []byte) error

	// Pull retrieves the next task from the queue.
	// Blocks until a task is available or context is cancelled.
	Pull(ctx context.Context) (*Task, error)

	// Ack acknowledges successful processing of a task.
	Ack(ctx context.Context, taskID string) error

	// Nack negatively acknowledges a task, returning it to the queue for retry.
	Nack(ctx context.Context, taskID string) error

	// Len returns the approximate number of pending tasks.
	Len(ctx context.Context) (int, error)

	// Name returns the queue name.
	Name() string
}

TaskQueue provides a persistent work queue for task distribution. Tasks are distributed to workers and must be explicitly acknowledged.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL