Documentation
¶
Overview ¶
Package bus exposes the application-facing API of OpenAgentIO: Publish, Subscribe, Invoke, StreamInvoke, and the corresponding handler registrations for service implementations.
The runtime is backed by a transport.Transport and a codec.Codec; both are provided via functional options when constructing the Bus.
Index ¶
- Constants
- Variables
- func DLQSink(prefix string, codec codec.Codec, tr transport.Transport) middleware.DLQSink
- type Bus
- type EnvelopePreparer
- type HandleOption
- type Handler
- type InvokeHandler
- type InvokeOption
- type Option
- func WithAgentID(id string) Option
- func WithCodec(c codec.Codec) Option
- func WithDefaultTimeout(d time.Duration) Option
- func WithEnvelopePreparer(p ...EnvelopePreparer) Option
- func WithLogger(l *slog.Logger) Option
- func WithMiddleware(mw ...middleware.Middleware) Option
- func WithSubjectPrefix(p string) Option
- func WithTenant(t string) Option
- func WithTransport(t transport.Transport) Option
- type Options
- type Stream
- type StreamHandler
- type StreamWriter
- type SubOption
- type Subscription
Constants ¶
const DefaultSubjectPrefix = "acp.v1"
DefaultSubjectPrefix is used when WithSubjectPrefix is not supplied. Override it via bus.WithSubjectPrefix("...") to coexist with legacy `agent.*` namespaces.
Variables ¶
var ErrIdleTimeout = errors.New("bus: stream idle timeout")
ErrIdleTimeout is reported by a Stream when the gap between two received frames exceeds the InvokeOption WithIdleTimeout deadline.
var ErrNotImplemented = errors.New("bus: not implemented (v0.1 skeleton)")
ErrNotImplemented is returned by methods whose bodies are reserved for the upcoming v0.1 implementation milestones. Callers can `errors.Is(err, bus.ErrNotImplemented)` to detect skeleton calls in tests.
Functions ¶
func DLQSink ¶
DLQSink returns a middleware.DLQSink that publishes a cloned envelope onto the dead-letter subject `{prefix}.dlq.{event_type}` using the supplied codec and transport. This is the canonical NATS Core–based DLQ implementation; it does not depend on JetStream.
The sink stamps two metadata keys on the clone before publishing:
- acp.dlq.original_event_type — the original envelope.EventType
- acp.dlq.last_error — the string representation of lastErr
Types ¶
type Bus ¶
type Bus interface {
Publish(ctx context.Context, e *event.Envelope) error
Subscribe(ctx context.Context, eventType string, h Handler, opts ...SubOption) (Subscription, error)
Invoke(ctx context.Context, target string, payload any, opts ...InvokeOption) (*event.Envelope, error)
StreamInvoke(ctx context.Context, target string, payload any, opts ...InvokeOption) (Stream, error)
HandleInvoke(target string, h InvokeHandler, opts ...HandleOption) error
HandleStream(target string, h StreamHandler, opts ...HandleOption) error
Close() error
}
Bus is the contract every runtime implementation satisfies.
type EnvelopePreparer ¶
EnvelopePreparer mutates an outbound envelope just before it is encoded onto the wire. The chain runs for every Bus.Publish / Invoke / StreamInvoke call, in registration order, so a later preparer observes the mutations of earlier ones. The canonical use case is the OpenTelemetry bridge in pkg/middleware/otel, which writes the active span's traceparent into envelope.Traceparent.
Preparers must be cheap and non-blocking — they run on the publishing goroutine inside the hot path. Errors are not propagated; if a preparer needs to bail, it should leave the envelope unchanged.
type HandleOption ¶
type HandleOption func(*handleOpts)
HandleOption configures HandleInvoke / HandleStream.
func WithHandleQueue ¶
func WithHandleQueue(q string) HandleOption
WithHandleQueue places the handler in a queue group so multiple replicas load-balance the work.
type Handler ¶
Handler consumes a delivered envelope. Returning an error is allowed but the runtime currently treats handler errors as terminal for the message; retry semantics live in middleware.
type InvokeHandler ¶
InvokeHandler is the server-side counterpart of Bus.Invoke: it returns a single value that the runtime wraps into a final response envelope.
type InvokeOption ¶
type InvokeOption func(*invokeOpts)
InvokeOption configures Bus.Invoke / Bus.StreamInvoke.
func WithIdleTimeout ¶
func WithIdleTimeout(d time.Duration) InvokeOption
WithIdleTimeout sets the maximum gap between two streaming frames.
func WithTimeout ¶
func WithTimeout(d time.Duration) InvokeOption
WithTimeout sets the overall deadline for the invocation.
type Option ¶
type Option func(*Options)
Option mutates Options.
func WithAgentID ¶
func WithDefaultTimeout ¶
func WithEnvelopePreparer ¶
func WithEnvelopePreparer(p ...EnvelopePreparer) Option
WithEnvelopePreparer registers one or more preparers that run on every outbound envelope just before it is encoded. See EnvelopePreparer.
func WithLogger ¶
func WithMiddleware ¶
func WithMiddleware(mw ...middleware.Middleware) Option
func WithSubjectPrefix ¶
func WithTenant ¶
func WithTransport ¶
type Options ¶
type Options struct {
AgentID string
Tenant string
SubjectPrefix string
Codec codec.Codec
Transport transport.Transport
Logger *slog.Logger
Middleware []middleware.Middleware
EnvelopePreparers []EnvelopePreparer
DefaultTimeout time.Duration
}
Options bundles every Bus-level setting.
type Stream ¶
Stream is the client side of a streaming response. Range over Events() to consume incoming envelopes; Close cancels the underlying inbox.
type StreamHandler ¶
StreamHandler is the server-side counterpart of Bus.StreamInvoke. It receives a StreamWriter that lets it emit started/delta/final/error frames in order.
type StreamWriter ¶
type StreamWriter interface {
Started(meta any) error
Delta(chunk any) error
Final(result any) error
Error(err error) error
}
StreamWriter is the server side of a streaming response. Started/Final/Error must each be called at most once per stream; Final and Error are mutually exclusive.
type Subscription ¶
type Subscription interface {
Unsubscribe() error
}
Subscription represents a live subscription. Closing it stops further deliveries; idempotent.