bus

package
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package bus exposes the application-facing API of OpenAgentIO: Publish, Subscribe, Invoke, StreamInvoke, and the corresponding handler registrations for service implementations.

The runtime is backed by a transport.Transport and a codec.Codec; both are provided via functional options when constructing the Bus.

Index

Constants

View Source
const DefaultSubjectPrefix = "acp.v1"

DefaultSubjectPrefix is used when WithSubjectPrefix is not supplied. Override it via bus.WithSubjectPrefix("...") to coexist with legacy `agent.*` namespaces.

Variables

View Source
var ErrIdleTimeout = errors.New("bus: stream idle timeout")

ErrIdleTimeout is reported by a Stream when the gap between two received frames exceeds the InvokeOption WithIdleTimeout deadline.

View Source
var ErrNotImplemented = errors.New("bus: not implemented (v0.1 skeleton)")

ErrNotImplemented is returned by methods whose bodies are reserved for the upcoming v0.1 implementation milestones. Callers can `errors.Is(err, bus.ErrNotImplemented)` to detect skeleton calls in tests.

Functions

func DLQSink

func DLQSink(prefix string, codec codec.Codec, tr transport.Transport) middleware.DLQSink

DLQSink returns a middleware.DLQSink that publishes a cloned envelope onto the dead-letter subject `{prefix}.dlq.{event_type}` using the supplied codec and transport. This is the canonical NATS Core–based DLQ implementation; it does not depend on JetStream.

The sink stamps two metadata keys on the clone before publishing:

  • acp.dlq.original_event_type — the original envelope.EventType
  • acp.dlq.last_error — the string representation of lastErr

Types

type Bus

type Bus interface {
	Publish(ctx context.Context, e *event.Envelope) error
	Subscribe(ctx context.Context, eventType string, h Handler, opts ...SubOption) (Subscription, error)

	Invoke(ctx context.Context, target string, payload any, opts ...InvokeOption) (*event.Envelope, error)
	StreamInvoke(ctx context.Context, target string, payload any, opts ...InvokeOption) (Stream, error)

	HandleInvoke(target string, h InvokeHandler, opts ...HandleOption) error
	HandleStream(target string, h StreamHandler, opts ...HandleOption) error

	Close() error
}

Bus is the contract every runtime implementation satisfies.

func New

func New(opts ...Option) (Bus, error)

New constructs a Bus from the supplied options. It validates the required inputs (Transport, AgentID) and connects the underlying transport.

type EnvelopePreparer

type EnvelopePreparer func(ctx context.Context, e *event.Envelope)

EnvelopePreparer mutates an outbound envelope just before it is encoded onto the wire. The chain runs for every Bus.Publish / Invoke / StreamInvoke call, in registration order, so a later preparer observes the mutations of earlier ones. The canonical use case is the OpenTelemetry bridge in pkg/middleware/otel, which writes the active span's traceparent into envelope.Traceparent.

Preparers must be cheap and non-blocking — they run on the publishing goroutine inside the hot path. Errors are not propagated; if a preparer needs to bail, it should leave the envelope unchanged.

type HandleOption

type HandleOption func(*handleOpts)

HandleOption configures HandleInvoke / HandleStream.

func WithHandleQueue

func WithHandleQueue(q string) HandleOption

WithHandleQueue places the handler in a queue group so multiple replicas load-balance the work.

type Handler

type Handler func(ctx context.Context, e *event.Envelope) error

Handler consumes a delivered envelope. Returning an error is allowed but the runtime currently treats handler errors as terminal for the message; retry semantics live in middleware.

type InvokeHandler

type InvokeHandler func(ctx context.Context, e *event.Envelope) (any, error)

InvokeHandler is the server-side counterpart of Bus.Invoke: it returns a single value that the runtime wraps into a final response envelope.

type InvokeOption

type InvokeOption func(*invokeOpts)

InvokeOption configures Bus.Invoke / Bus.StreamInvoke.

func WithIdleTimeout

func WithIdleTimeout(d time.Duration) InvokeOption

WithIdleTimeout sets the maximum gap between two streaming frames.

func WithTimeout

func WithTimeout(d time.Duration) InvokeOption

WithTimeout sets the overall deadline for the invocation.

type Option

type Option func(*Options)

Option mutates Options.

func WithAgentID

func WithAgentID(id string) Option

func WithCodec

func WithCodec(c codec.Codec) Option

func WithDefaultTimeout

func WithDefaultTimeout(d time.Duration) Option

func WithEnvelopePreparer

func WithEnvelopePreparer(p ...EnvelopePreparer) Option

WithEnvelopePreparer registers one or more preparers that run on every outbound envelope just before it is encoded. See EnvelopePreparer.

func WithLogger

func WithLogger(l *slog.Logger) Option

func WithMiddleware

func WithMiddleware(mw ...middleware.Middleware) Option

func WithSubjectPrefix

func WithSubjectPrefix(p string) Option

func WithTenant

func WithTenant(t string) Option

func WithTransport

func WithTransport(t transport.Transport) Option

type Options

type Options struct {
	AgentID           string
	Tenant            string
	SubjectPrefix     string
	Codec             codec.Codec
	Transport         transport.Transport
	Logger            *slog.Logger
	Middleware        []middleware.Middleware
	EnvelopePreparers []EnvelopePreparer
	DefaultTimeout    time.Duration
}

Options bundles every Bus-level setting.

type Stream

type Stream interface {
	Events() iter.Seq2[*event.Envelope, error]
	Close() error
}

Stream is the client side of a streaming response. Range over Events() to consume incoming envelopes; Close cancels the underlying inbox.

type StreamHandler

type StreamHandler func(ctx context.Context, e *event.Envelope, w StreamWriter) error

StreamHandler is the server-side counterpart of Bus.StreamInvoke. It receives a StreamWriter that lets it emit started/delta/final/error frames in order.

type StreamWriter

type StreamWriter interface {
	Started(meta any) error
	Delta(chunk any) error
	Final(result any) error
	Error(err error) error
}

StreamWriter is the server side of a streaming response. Started/Final/Error must each be called at most once per stream; Final and Error are mutually exclusive.

type SubOption

type SubOption func(*subOpts)

SubOption configures Bus.Subscribe.

func WithQueue

func WithQueue(q string) SubOption

WithQueue puts the subscriber into a queue group; messages on the subject are load-balanced across all members of the same queue.

type Subscription

type Subscription interface {
	Unsubscribe() error
}

Subscription represents a live subscription. Closing it stops further deliveries; idempotent.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL