goflux

package module
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: MIT Imports: 14 Imported by: 0

README

Build Status Go Report Card GoDoc

goflux

goflux

Generic, transport-agnostic messaging patterns for Go.

Write business logic against core interfaces. Swap transports without touching handler code.

Architecture

Layer What it provides
Core Interfaces Publisher[T], Subscriber[T], Requester[Req, Resp], Responder[Req, Resp], Message[T], Handler[T]
Transports Channel (in-process), NATS, JetStream, HTTP — each implements the core interfaces
Middleware Chain, AutoAck, RetryAck, InjectMessageID, InjectHeader, ForwardMessageID
Pipeline Operators pipe.New, pipe.NewMap, pipe.NewFlatMap, ToChan, bridge.ToStream, bridge.FromStream, BindPublisher, RetryPublisher
Stream Processing Fan-out, fan-in, round-robin, filtering, dedup, throttling via goflow
Lifecycle Group — coordinated startup, fail-fast shutdown for multiple handlers
Telemetry OpenTelemetry tracing and metrics built into every transport

Supported Patterns

  • Fire & Forget — publish with no delivery guarantee (channels, NATS core)
  • At-Least-Once — ack/nak with auto-ack or manual control (JetStream)
  • Pull Consumer — JetStream pull consumers via Subscriber[T] with middleware composition (JetStream)
  • Request-Reply — typed request/response (NATS, HTTP)
  • Queue Groups — competing consumers (NATS)
  • Stream Processing — bridge to goflow via bridge.ToStream/bridge.FromStream for bounded concurrency, filtering, dedup, fan-out/fan-in, and more
  • Fan-Out / Fan-In — broadcast, merge, round-robin via goflow stream operators

Transport Feature Matrix

Interface Channel NATS JetStream HTTP
Publisher[T] yes yes yes yes
Subscriber[T] yes yes yes yes
Requester[Req, Resp] - yes - yes
Responder[Req, Resp] - yes - yes

Installation

go get github.com/foomo/goflux

Quick Start

package main

import (
  "context"
  "fmt"

  "github.com/foomo/goflux"
  "github.com/foomo/goflux/transport/channel"
)

func main() {
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()

  bus := channel.NewBus[string]()
  pub := channel.NewPublisher(bus)
  sub, _ := channel.NewSubscriber(bus, 1)

  go sub.Subscribe(ctx, "greetings", func(_ context.Context, msg goflux.Message[string]) error {
    fmt.Println(msg.Subject, msg.Payload)
    cancel()
    return nil
  })

  _ = pub.Publish(ctx, "greetings", "Hello, goflux!")
  <-ctx.Done()
}

Swap to NATS by changing the import and constructor — the handler stays the same. See the Getting Started guide.

Documentation

Full documentation: https://foomo.github.io/goflux/

Contributing

make check   # tidy + generate + lint + test + audit (full CI flow)

See CONTRIBUTING.md for details.

Contributors

License

Distributed under MIT License, see LICENSE for details.

Made with ♥ foomo by bestbytes

Documentation

Index

Examples

Constants

View Source
const MessageIDHeader = "X-Message-ID"

MessageIDHeader is the HTTP header name used to propagate a message ID across the HTTP transport.

Variables

View Source
var (
	// ErrPublish indicates a failure in the publish path.
	ErrPublish = errors.New("publish")
	// ErrSubscribe indicates a failure in the subscribe path.
	ErrSubscribe = errors.New("subscribe")
	// ErrEncode indicates a serialization failure.
	ErrEncode = errors.New("encode")
	// ErrDecode indicates a deserialization failure.
	ErrDecode = errors.New("decode")
	// ErrTransport indicates a transport-level failure (network, protocol).
	ErrTransport = errors.New("transport")
)

Sentinel errors for classifying failures. Transports join these with the causal error via errors.Join, so callers can inspect with errors.Is:

if errors.Is(err, goflux.ErrEncode) { /* codec problem */ }
if errors.Is(err, goflux.ErrPublish) { /* publish-path failure */ }
View Source
var ErrNonRetryable = errors.New("non-retryable")

ErrNonRetryable is a sentinel error that marks a handler failure as permanent. When a [RetryPolicy] encounters this error (via errors.Is), it returns [RetryTerm] so that the message is terminated rather than redelivered.

Functions

func IsNonRetryable added in v0.2.0

func IsNonRetryable(err error) bool

IsNonRetryable reports whether err (or any error in its chain) is marked as non-retryable.

func MessageID

func MessageID(ctx context.Context) string

MessageID returns the message ID stored in ctx, or "" if none is set.

func NonRetryable added in v0.2.0

func NonRetryable(err error) error

NonRetryable wraps err so that errors.Is(err, ErrNonRetryable) returns true. Use this to signal that a handler error is permanent and the message should not be retried.

Example

ExampleNonRetryable demonstrates wrapping and detecting non-retryable errors.

package main

import (
	"errors"
	"fmt"

	"github.com/foomo/goflux"
)

func main() {
	orig := errors.New("invalid payload")
	wrapped := goflux.NonRetryable(orig)

	fmt.Println("is non-retryable:", goflux.IsNonRetryable(wrapped))
	fmt.Println("is original:", errors.Is(wrapped, orig))
	fmt.Println("plain error:", goflux.IsNonRetryable(errors.New("transient")))
}
Output:
is non-retryable: true
is original: true
plain error: false

func ToChan

func ToChan[T any](ctx context.Context, sub Subscriber[T], subject string, bufSize int) <-chan Message[T]

ToChan bridges a Subscriber into a plain channel. It launches Subscribe in a goroutine and forwards each message (including acker) into a buffered channel. The returned channel closes when ctx is cancelled.

bufSize controls backpressure: a full buffer blocks the subscriber's handler until the consumer catches up.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/foomo/goflux"
	"github.com/foomo/goflux/transport/channel"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	bus := channel.NewBus[string]()
	pub := channel.NewPublisher(bus)

	sub, _ := channel.NewSubscriber(bus, 1)

	ch := goflux.ToChan[string](ctx, sub, "test", 4)

	time.Sleep(10 * time.Millisecond)

	_ = pub.Publish(ctx, "test", "alpha")
	_ = pub.Publish(ctx, "test", "bravo")

	fmt.Println((<-ch).Payload)
	fmt.Println((<-ch).Payload)
}
Output:
alpha
bravo

func WithHeader

func WithHeader(ctx context.Context, h Header) context.Context

WithHeader returns a copy of ctx with the given header attached. Transports read this header during Publish and merge it into the outgoing transport headers.

func WithMessageID

func WithMessageID(ctx context.Context, id string) context.Context

WithMessageID returns a copy of ctx with the given message ID attached. The ID is purely opt-in: if set, transports propagate it via headers and RecordPublish / RecordProcess attach it as the goflux.message.id span attribute.

Types

type Acker

type Acker interface {
	Ack() error
	Nak() error
}

Acker is the minimal acknowledgment interface. Transports that support at-least-once delivery implement this on their message wrapper.

type BackoffFunc added in v0.2.0

type BackoffFunc func(attempt int) time.Duration

BackoffFunc returns the delay before the next retry attempt. attempt starts at 0 for the first retry (i.e. the second overall call).

type BoundPublisher

type BoundPublisher[T any] interface {
	// Publish serializes v and delivers it to the bound nats.
	Publish(ctx context.Context, v T) error
	// Close releases any underlying connections.
	Close() error
}

BoundPublisher publishes to a fixed nats. No nats param needed.

func BindPublisher added in v0.4.0

func BindPublisher[T any](pub Publisher[T], subject string) BoundPublisher[T]

BindPublisher wraps a Publisher with a fixed nats.

Example

ExampleBindPublisher demonstrates creating a BoundPublisher that fixes the nats. Callers only need to provide the payload — the nats is always "orders".

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bus := channel.NewBus[Event]()
pub := channel.NewPublisher(bus)

sub, err := channel.NewSubscriber(bus, 1)
if err != nil {
	panic(err)
}

// BindPublisher fixes the nats to "orders".
bound := goflux.BindPublisher[Event](pub, "orders")

gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
	ready()

	return sub.Subscribe(ctx, "orders", func(_ context.Context, msg goflux.Message[Event]) error {
		fmt.Println(msg.Subject, msg.Payload.Name)
		cancel()

		return nil
	})
}, gofuncy.WithName("subscriber"))

// Allow subscriber to register.
time.Sleep(10 * time.Millisecond)

// No nats argument — bound publisher always uses "orders".
if err := bound.Publish(ctx, Event{ID: "1", Name: "widget"}); err != nil {
	panic(err)
}

<-ctx.Done()
Output:
orders widget

type BoundSubscriber added in v0.3.0

type BoundSubscriber[T any] interface {
	// Subscribe registers handler for the bound nats. The call blocks until
	// ctx is canceled or the implementation encounters a fatal error.
	Subscribe(ctx context.Context, handler Handler[T]) error
	// Close unsubscribes and releases resources.
	Close() error
}

BoundSubscriber subscribes to a fixed nats. No nats param needed.

func BindSubscriber added in v0.3.0

func BindSubscriber[T any](sub Subscriber[T], subject string) BoundSubscriber[T]

BindSubscriber wraps a Subscriber with a fixed nats.

Example

ExampleBindSubscriber demonstrates creating a BoundSubscriber that fixes the nats. Callers only need to provide the handler — the nats is always "orders".

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bus := channel.NewBus[Event]()
pub := channel.NewPublisher(bus)

sub, err := channel.NewSubscriber(bus, 1)
if err != nil {
	panic(err)
}

// BindSubscriber fixes the nats to "orders".
bound := goflux.BindSubscriber[Event](sub, "orders")

gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
	ready()

	// No nats argument — bound subscriber always uses "orders".
	return bound.Subscribe(ctx, func(_ context.Context, msg goflux.Message[Event]) error {
		fmt.Println(msg.Subject, msg.Payload.Name)
		cancel()

		return nil
	})
}, gofuncy.WithName("subscriber"))

// Allow subscriber to register.
time.Sleep(10 * time.Millisecond)

if err := pub.Publish(ctx, "orders", Event{ID: "1", Name: "widget"}); err != nil {
	panic(err)
}

<-ctx.Done()
Output:
orders widget

type BoundTopic added in v0.4.0

type BoundTopic[T any] struct {
	BoundPublisher[T]
	BoundSubscriber[T]
}

BoundTopic bundles a BoundPublisher and BoundSubscriber sharing the same fixed nats. Use it when a service needs to both produce and consume the same message type on a known nats.

func BindTopic added in v0.4.0

func BindTopic[T any](pub Publisher[T], sub Subscriber[T], subject string) *BoundTopic[T]

BindTopic wraps a Publisher and Subscriber with a fixed nats.

type DelayedNaker

type DelayedNaker interface {
	Acker
	NakWithDelay(d time.Duration) error
}

DelayedNaker extends Acker with delayed negative acknowledgment, causing the message to be redelivered after the given delay.

type Handler

type Handler[T any] func(ctx context.Context, msg Message[T]) error

Handler is the callback signature used by Subscriber.Subscribe. Returning a non-nil error signals the subscriber to nack / requeue the message (behavior is implementation-specific).

type Header map[string][]string

Header carries message metadata (trace context, message ID, custom KV). Keys are case-sensitive, values are slices — matching http.Header semantics.

func HeaderFromContext

func HeaderFromContext(ctx context.Context) Header

HeaderFromContext returns the header stored in ctx, or nil if none is set.

func (Header) Add

func (h Header) Add(key, value string)

Add appends a value for the given key.

func (Header) Clone

func (h Header) Clone() Header

Clone returns a deep copy of the header.

func (Header) Del

func (h Header) Del(key string)

Del removes the given key.

func (Header) Get

func (h Header) Get(key string) string

Get returns the first value for the given key, or "" if not present.

func (Header) Set

func (h Header) Set(key, value string)

Set replaces any existing values for the given key.

type Message

type Message[T any] struct {
	Subject string `json:"nats"`
	Payload T      `json:"payload"`
	Header  Header `json:"header,omitempty"`
	// contains filtered or unexported fields
}

Message is the unit passed to every Handler. Subject carries the routing key (e.g. a NATS nats or HTTP path); Payload holds the decoded value; Header carries optional metadata; acker provides acknowledgment controls.

func NewMessage

func NewMessage[T any](subject string, payload T) Message[T]

NewMessage creates a new Message.

func NewMessageWithHeader

func NewMessageWithHeader[T any](subject string, payload T, header Header) Message[T]

NewMessageWithHeader creates a new Message with the given header.

func (Message[T]) Ack

func (m Message[T]) Ack() error

Ack acknowledges successful processing. No-op if the transport does not support acknowledgments.

func (Message[T]) HasAcker

func (m Message[T]) HasAcker() bool

HasAcker reports whether the message carries acknowledgment controls.

func (Message[T]) Nak

func (m Message[T]) Nak() error

Nak signals processing failure; the message should be redelivered. No-op if the transport does not support acknowledgments.

func (Message[T]) NakWithDelay

func (m Message[T]) NakWithDelay(d time.Duration) error

NakWithDelay signals processing failure with a redelivery delay hint. Falls back to Nak if the transport does not support delayed redelivery.

func (Message[T]) Term

func (m Message[T]) Term() error

Term terminates processing — the message will not be redelivered. Falls back to Ack if the transport does not support terminal rejection, to prevent infinite redelivery loops.

func (Message[T]) WithAcker

func (m Message[T]) WithAcker(a Acker) Message[T]

WithAcker returns a copy of the message with the given acker attached. This is intended for transport implementations, not application code.

type Middleware

type Middleware[T any] func(Handler[T]) Handler[T]

Middleware wraps a Handler[T] to add cross-cutting behaviour such as logging, rate-limiting, or circuit-breaking.

func Chain

func Chain[T any](mws ...Middleware[T]) Middleware[T]

Chain composes middlewares left-to-right: the first middleware in the list is the outermost wrapper. Chain(a, b)(h) is equivalent to a(b(h)).

Example

ExampleChain demonstrates composing handler middlewares. Middlewares execute left-to-right (outermost first).

var trace []string

mwA := func(next goflux.Handler[Event]) goflux.Handler[Event] {
	return func(ctx context.Context, msg goflux.Message[Event]) error {
		trace = append(trace, "A-before")
		err := next(ctx, msg)

		trace = append(trace, "A-after")

		return err
	}
}

mwB := func(next goflux.Handler[Event]) goflux.Handler[Event] {
	return func(ctx context.Context, msg goflux.Message[Event]) error {
		trace = append(trace, "B-before")
		err := next(ctx, msg)

		trace = append(trace, "B-after")

		return err
	}
}

base := func(_ context.Context, msg goflux.Message[Event]) error {
	trace = append(trace, "handler:"+msg.Payload.Name)
	return nil
}

handler := goflux.Chain[Event](mwA, mwB)(base)

msg := goflux.NewMessage("events", Event{ID: "1", Name: "hello"})
_ = handler(context.Background(), msg)

for _, s := range trace {
	fmt.Println(s)
}
Output:
A-before
B-before
handler:hello
B-after
A-after

type ProcessOption

type ProcessOption func(*processConfig)

ProcessOption configures Telemetry.RecordProcess.

func WithRemoteSpanContext

func WithRemoteSpanContext(sc trace.SpanContext) ProcessOption

WithRemoteSpanContext attaches the given span context as a span link instead of using it as the parent. Use this for async transports (e.g. NATS) where the producer and consumer are temporally decoupled — the consumer span becomes a root span linked to the producer, rather than a child of it.

type Publisher

type Publisher[T any] interface {
	// Publish serializes v via the bound Codec and delivers it to the nats.
	Publish(ctx context.Context, subject string, v T) error
	// Close releases any underlying connections.
	Close() error
}

Publisher sends encoded messages to a nats/topic.

func RetryPublisher added in v0.2.0

func RetryPublisher[T any](pub Publisher[T], maxAttempts int, backoff BackoffFunc) Publisher[T]

RetryPublisher wraps a Publisher with retry logic. On publish failure, it retries up to maxAttempts times with delays determined by backoff. Context cancellation aborts the retry loop immediately.

If all attempts fail, the last error is returned.

Example

ExampleRetryPublisher demonstrates wrapping a publisher with retry logic. Transient errors are retried with the given backoff until success.

package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/foomo/goflux"
)

// ExampleRetryPublisher demonstrates wrapping a publisher with retry logic.
// Transient errors are retried with the given backoff until success.
func main() {
	// A publisher that fails twice then succeeds.
	inner := &countingPublisher{failUntil: 2}

	pub := goflux.RetryPublisher[string](inner, 3, func(_ int) time.Duration {
		return time.Millisecond // fast backoff for example
	})

	err := pub.Publish(context.Background(), "events", "hello")

	fmt.Println("error:", err)
	fmt.Println("attempts:", inner.attempts)
}

// countingPublisher fails the first failUntil attempts, then succeeds.
type countingPublisher struct {
	failUntil int
	attempts  int
}

func (p *countingPublisher) Publish(_ context.Context, _ string, _ string) error {
	p.attempts++

	if p.attempts <= p.failUntil {
		return errors.New("transient error")
	}

	return nil
}

func (p *countingPublisher) Close() error { return nil }
Output:
error: <nil>
attempts: 3

type PublisherMiddleware added in v0.2.0

type PublisherMiddleware[T any] func(Publisher[T]) Publisher[T]

PublisherMiddleware wraps a Publisher[T] to add cross-cutting behaviour such as retry, rate-limiting, or circuit-breaking on the publish path.

type RequestHandler

type RequestHandler[Req, Resp any] func(ctx context.Context, req Req) (Resp, error)

RequestHandler processes a request and returns a response.

type Requester

type Requester[Req, Resp any] interface {
	// Request sends req to the given nats and returns the response.
	Request(ctx context.Context, subject string, req Req) (Resp, error)
	// Close releases any underlying resources.
	Close() error
}

Requester sends a typed request and waits for a typed response.

type Responder

type Responder[Req, Resp any] interface {
	// Serve registers the handler for the given nats. The call blocks
	// until ctx is cancelled or a fatal error occurs.
	Serve(ctx context.Context, subject string, handler RequestHandler[Req, Resp]) error
	// Close releases any underlying resources.
	Close() error
}

Responder handles incoming requests and produces typed responses.

type Subscriber

type Subscriber[T any] interface {
	// Subscribe registers handler for the nats. The call blocks until ctx is
	// canceled or the implementation encounters a fatal error.
	Subscribe(ctx context.Context, subject string, handler Handler[T]) error
	// Close unsubscribes and releases resources.
	Close() error
}

Subscriber listens on one or more subjects and dispatches decoded messages to a Handler.

type Telemetry

type Telemetry struct {
	// contains filtered or unexported fields
}

Telemetry holds OTel instruments (tracer, metrics, propagator) for a single transport instance. Construct with NewTelemetry.

func DefaultTelemetry added in v0.2.0

func DefaultTelemetry(tel *Telemetry) *Telemetry

DefaultTelemetry returns tel if non-nil, otherwise creates a new Telemetry from OTel globals. If that fails, it falls back to a noop implementation. This is the standard fallback logic used by all transports.

func NewNoopTelemetry added in v0.2.0

func NewNoopTelemetry() *Telemetry

NewNoopTelemetry returns a Telemetry backed by OTel's noop implementations. All Record* calls are safe but produce no spans or metrics.

func NewTelemetry

func NewTelemetry(opts ...TelemetryOption) (*Telemetry, error)

NewTelemetry creates a Telemetry instance. Without options it reads from the current OTel globals, so callers that have already called otel.SetTracerProvider / otel.SetMeterProvider need not pass anything.

func (*Telemetry) ExtractContext

func (t *Telemetry) ExtractContext(ctx context.Context, carrier propagation.TextMapCarrier) context.Context

ExtractContext extracts span context from carrier and returns an enriched context with the remote span as parent. Use this for synchronous transports (e.g. HTTP) where parent-child relationship is appropriate.

func (*Telemetry) ExtractSpanContext

func (t *Telemetry) ExtractSpanContext(ctx context.Context, carrier propagation.TextMapCarrier) trace.SpanContext

ExtractSpanContext extracts the remote span context from carrier without injecting it as parent into ctx. Use this with WithRemoteSpanContext for async transports where the consumer span should link to (not be a child of) the producer span.

func (*Telemetry) InjectContext

func (t *Telemetry) InjectContext(ctx context.Context, carrier propagation.TextMapCarrier)

InjectContext injects the span context from ctx into the carrier. Transports call this on the publish side to propagate trace context across wire boundaries (e.g. NATS headers, HTTP headers).

func (*Telemetry) RecordAckOutcome added in v0.2.0

func (t *Telemetry) RecordAckOutcome(ctx context.Context, action, subject string, err error)

RecordAckOutcome records an acknowledgment outcome (ack, nak, nak_with_delay, term) with an optional error label when the ack operation itself fails.

func (*Telemetry) RecordFetch

func (t *Telemetry) RecordFetch(ctx context.Context, subject string, system semconvmsg.SystemAttr, count int, fn func(context.Context) error) error

RecordFetch opens a consumer span for a pull-based fetch operation.

func (*Telemetry) RecordProcess

func (t *Telemetry) RecordProcess(ctx context.Context, subject string, system semconvmsg.SystemAttr, fn func(context.Context) error, opts ...ProcessOption) error

RecordProcess opens a consumer span, calls fn, records duration and counter. Pass WithRemoteSpanContext to attach the producer span as a link rather than a parent (recommended for async transports like NATS).

func (*Telemetry) RecordPublish

func (t *Telemetry) RecordPublish(ctx context.Context, subject string, system semconvmsg.SystemAttr, fn func(context.Context) error) error

RecordPublish opens a producer span, calls fn, records duration and counter.

func (*Telemetry) RecordRequest

func (t *Telemetry) RecordRequest(ctx context.Context, subject string, system semconvmsg.SystemAttr, fn func(context.Context) error) error

RecordRequest opens a client span for a request-reply call.

func (*Telemetry) RegisterLag

func (t *Telemetry) RegisterLag(subject string, lagFn func() int64) (metric.Int64ObservableGauge, error)

RegisterLag registers the goflux.consumer.lag observable gauge. Uses the meter provider that was passed to NewTelemetry.

type TelemetryOption

type TelemetryOption func(*telemetryConfig)

TelemetryOption configures a Telemetry instance.

func WithMeterProvider

func WithMeterProvider(mp metric.MeterProvider) TelemetryOption

WithMeterProvider sets the meter provider. Defaults to otel.GetMeterProvider.

func WithPropagator

WithPropagator sets the text-map propagator. Defaults to otel.GetTextMapPropagator.

func WithTracerProvider

func WithTracerProvider(tp trace.TracerProvider) TelemetryOption

WithTracerProvider sets the tracer provider. Defaults to otel.GetTracerProvider.

type Terminator

type Terminator interface {
	Acker
	Term() error
}

Terminator extends Acker with terminal rejection — the message will not be redelivered. Use for dead-letter patterns where handler errors are non-retryable.

type Topic

type Topic[T any] struct {
	Publisher[T]
	Subscriber[T]
}

Topic bundles a Publisher and Subscriber sharing the same Codec. Use it when a service needs to both produce and consume the same message type.

Example

ExampleTopic demonstrates bundling a Publisher and Subscriber into a single Topic value. This is useful when a service needs to both produce and consume the same message type.

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bus := channel.NewBus[Event]()

sub, err := channel.NewSubscriber(bus, 1)
if err != nil {
	panic(err)
}

topic := goflux.Topic[Event]{
	Publisher:  channel.NewPublisher(bus),
	Subscriber: sub,
}

gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
	ready()

	return topic.Subscribe(ctx, "events", func(_ context.Context, msg goflux.Message[Event]) error {
		fmt.Println(msg.Payload.Name)
		cancel()

		return nil
	})
}, gofuncy.WithName("subscriber"))

time.Sleep(10 * time.Millisecond)

if err := topic.Publish(ctx, "events", Event{ID: "1", Name: "bundled"}); err != nil {
	panic(err)
}

<-ctx.Done()
Output:
bundled

Directories

Path Synopsis
bridge module
subject
transport
jetstream module
nats module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL