Documentation
¶
Index ¶
- Constants
- Variables
- func IsNonRetryable(err error) bool
- func MessageID(ctx context.Context) string
- func NonRetryable(err error) error
- func ToChan[T any](ctx context.Context, sub Subscriber[T], subject string, bufSize int) <-chan Message[T]
- func WithHeader(ctx context.Context, h Header) context.Context
- func WithMessageID(ctx context.Context, id string) context.Context
- type Acker
- type BackoffFunc
- type BoundPublisher
- type BoundSubscriber
- type BoundTopic
- type DelayedNaker
- type Handler
- type Header
- type Message
- type Middleware
- type ProcessOption
- type Publisher
- type PublisherMiddleware
- type RequestHandler
- type Requester
- type Responder
- type Subscriber
- type Telemetry
- func (t *Telemetry) ExtractContext(ctx context.Context, carrier propagation.TextMapCarrier) context.Context
- func (t *Telemetry) ExtractSpanContext(ctx context.Context, carrier propagation.TextMapCarrier) trace.SpanContext
- func (t *Telemetry) InjectContext(ctx context.Context, carrier propagation.TextMapCarrier)
- func (t *Telemetry) RecordAckOutcome(ctx context.Context, action, subject string, err error)
- func (t *Telemetry) RecordFetch(ctx context.Context, subject string, system semconvmsg.SystemAttr, count int, ...) error
- func (t *Telemetry) RecordProcess(ctx context.Context, subject string, system semconvmsg.SystemAttr, ...) error
- func (t *Telemetry) RecordPublish(ctx context.Context, subject string, system semconvmsg.SystemAttr, ...) error
- func (t *Telemetry) RecordRequest(ctx context.Context, subject string, system semconvmsg.SystemAttr, ...) error
- func (t *Telemetry) RegisterLag(subject string, lagFn func() int64) (metric.Int64ObservableGauge, error)
- type TelemetryOption
- type Terminator
- type Topic
Examples ¶
Constants ¶
const MessageIDHeader = "X-Message-ID"
MessageIDHeader is the HTTP header name used to propagate a message ID across the HTTP transport.
Variables ¶
var ( // ErrPublish indicates a failure in the publish path. ErrPublish = errors.New("publish") // ErrSubscribe indicates a failure in the subscribe path. ErrSubscribe = errors.New("subscribe") // ErrEncode indicates a serialization failure. ErrEncode = errors.New("encode") // ErrDecode indicates a deserialization failure. ErrDecode = errors.New("decode") // ErrTransport indicates a transport-level failure (network, protocol). ErrTransport = errors.New("transport") )
Sentinel errors for classifying failures. Transports join these with the causal error via errors.Join, so callers can inspect with errors.Is:
if errors.Is(err, goflux.ErrEncode) { /* codec problem */ }
if errors.Is(err, goflux.ErrPublish) { /* publish-path failure */ }
var ErrNonRetryable = errors.New("non-retryable")
ErrNonRetryable is a sentinel error that marks a handler failure as permanent. When a [RetryPolicy] encounters this error (via errors.Is), it returns [RetryTerm] so that the message is terminated rather than redelivered.
Functions ¶
func IsNonRetryable ¶ added in v0.2.0
IsNonRetryable reports whether err (or any error in its chain) is marked as non-retryable.
func NonRetryable ¶ added in v0.2.0
NonRetryable wraps err so that errors.Is(err, ErrNonRetryable) returns true. Use this to signal that a handler error is permanent and the message should not be retried.
Example ¶
ExampleNonRetryable demonstrates wrapping and detecting non-retryable errors.
package main
import (
"errors"
"fmt"
"github.com/foomo/goflux"
)
func main() {
orig := errors.New("invalid payload")
wrapped := goflux.NonRetryable(orig)
fmt.Println("is non-retryable:", goflux.IsNonRetryable(wrapped))
fmt.Println("is original:", errors.Is(wrapped, orig))
fmt.Println("plain error:", goflux.IsNonRetryable(errors.New("transient")))
}
Output: is non-retryable: true is original: true plain error: false
func ToChan ¶
func ToChan[T any](ctx context.Context, sub Subscriber[T], subject string, bufSize int) <-chan Message[T]
ToChan bridges a Subscriber into a plain channel. It launches Subscribe in a goroutine and forwards each message (including acker) into a buffered channel. The returned channel closes when ctx is cancelled.
bufSize controls backpressure: a full buffer blocks the subscriber's handler until the consumer catches up.
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/foomo/goflux"
"github.com/foomo/goflux/transport/channel"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bus := channel.NewBus[string]()
pub := channel.NewPublisher(bus)
sub, _ := channel.NewSubscriber(bus, 1)
ch := goflux.ToChan[string](ctx, sub, "test", 4)
time.Sleep(10 * time.Millisecond)
_ = pub.Publish(ctx, "test", "alpha")
_ = pub.Publish(ctx, "test", "bravo")
fmt.Println((<-ch).Payload)
fmt.Println((<-ch).Payload)
}
Output: alpha bravo
func WithHeader ¶
WithHeader returns a copy of ctx with the given header attached. Transports read this header during Publish and merge it into the outgoing transport headers.
func WithMessageID ¶
WithMessageID returns a copy of ctx with the given message ID attached. The ID is purely opt-in: if set, transports propagate it via headers and RecordPublish / RecordProcess attach it as the goflux.message.id span attribute.
Types ¶
type Acker ¶
Acker is the minimal acknowledgment interface. Transports that support at-least-once delivery implement this on their message wrapper.
type BackoffFunc ¶ added in v0.2.0
BackoffFunc returns the delay before the next retry attempt. attempt starts at 0 for the first retry (i.e. the second overall call).
type BoundPublisher ¶
type BoundPublisher[T any] interface { // Publish serializes v and delivers it to the bound nats. Publish(ctx context.Context, v T) error // Close releases any underlying connections. Close() error }
BoundPublisher publishes to a fixed nats. No nats param needed.
func BindPublisher ¶ added in v0.4.0
func BindPublisher[T any](pub Publisher[T], subject string) BoundPublisher[T]
BindPublisher wraps a Publisher with a fixed nats.
Example ¶
ExampleBindPublisher demonstrates creating a BoundPublisher that fixes the nats. Callers only need to provide the payload — the nats is always "orders".
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bus := channel.NewBus[Event]()
pub := channel.NewPublisher(bus)
sub, err := channel.NewSubscriber(bus, 1)
if err != nil {
panic(err)
}
// BindPublisher fixes the nats to "orders".
bound := goflux.BindPublisher[Event](pub, "orders")
gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
ready()
return sub.Subscribe(ctx, "orders", func(_ context.Context, msg goflux.Message[Event]) error {
fmt.Println(msg.Subject, msg.Payload.Name)
cancel()
return nil
})
}, gofuncy.WithName("subscriber"))
// Allow subscriber to register.
time.Sleep(10 * time.Millisecond)
// No nats argument — bound publisher always uses "orders".
if err := bound.Publish(ctx, Event{ID: "1", Name: "widget"}); err != nil {
panic(err)
}
<-ctx.Done()
Output: orders widget
type BoundSubscriber ¶ added in v0.3.0
type BoundSubscriber[T any] interface { // Subscribe registers handler for the bound nats. The call blocks until // ctx is canceled or the implementation encounters a fatal error. Subscribe(ctx context.Context, handler Handler[T]) error // Close unsubscribes and releases resources. Close() error }
BoundSubscriber subscribes to a fixed nats. No nats param needed.
func BindSubscriber ¶ added in v0.3.0
func BindSubscriber[T any](sub Subscriber[T], subject string) BoundSubscriber[T]
BindSubscriber wraps a Subscriber with a fixed nats.
Example ¶
ExampleBindSubscriber demonstrates creating a BoundSubscriber that fixes the nats. Callers only need to provide the handler — the nats is always "orders".
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bus := channel.NewBus[Event]()
pub := channel.NewPublisher(bus)
sub, err := channel.NewSubscriber(bus, 1)
if err != nil {
panic(err)
}
// BindSubscriber fixes the nats to "orders".
bound := goflux.BindSubscriber[Event](sub, "orders")
gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
ready()
// No nats argument — bound subscriber always uses "orders".
return bound.Subscribe(ctx, func(_ context.Context, msg goflux.Message[Event]) error {
fmt.Println(msg.Subject, msg.Payload.Name)
cancel()
return nil
})
}, gofuncy.WithName("subscriber"))
// Allow subscriber to register.
time.Sleep(10 * time.Millisecond)
if err := pub.Publish(ctx, "orders", Event{ID: "1", Name: "widget"}); err != nil {
panic(err)
}
<-ctx.Done()
Output: orders widget
type BoundTopic ¶ added in v0.4.0
type BoundTopic[T any] struct { BoundPublisher[T] BoundSubscriber[T] }
BoundTopic bundles a BoundPublisher and BoundSubscriber sharing the same fixed nats. Use it when a service needs to both produce and consume the same message type on a known nats.
func BindTopic ¶ added in v0.4.0
func BindTopic[T any](pub Publisher[T], sub Subscriber[T], subject string) *BoundTopic[T]
BindTopic wraps a Publisher and Subscriber with a fixed nats.
type DelayedNaker ¶
DelayedNaker extends Acker with delayed negative acknowledgment, causing the message to be redelivered after the given delay.
type Handler ¶
Handler is the callback signature used by Subscriber.Subscribe. Returning a non-nil error signals the subscriber to nack / requeue the message (behavior is implementation-specific).
type Header ¶
Header carries message metadata (trace context, message ID, custom KV). Keys are case-sensitive, values are slices — matching http.Header semantics.
func HeaderFromContext ¶
HeaderFromContext returns the header stored in ctx, or nil if none is set.
type Message ¶
type Message[T any] struct { Subject string `json:"nats"` Payload T `json:"payload"` Header Header `json:"header,omitempty"` // contains filtered or unexported fields }
Message is the unit passed to every Handler. Subject carries the routing key (e.g. a NATS nats or HTTP path); Payload holds the decoded value; Header carries optional metadata; acker provides acknowledgment controls.
func NewMessage ¶
NewMessage creates a new Message.
func NewMessageWithHeader ¶
NewMessageWithHeader creates a new Message with the given header.
func (Message[T]) Ack ¶
Ack acknowledges successful processing. No-op if the transport does not support acknowledgments.
func (Message[T]) Nak ¶
Nak signals processing failure; the message should be redelivered. No-op if the transport does not support acknowledgments.
func (Message[T]) NakWithDelay ¶
NakWithDelay signals processing failure with a redelivery delay hint. Falls back to Nak if the transport does not support delayed redelivery.
type Middleware ¶
Middleware wraps a Handler[T] to add cross-cutting behaviour such as logging, rate-limiting, or circuit-breaking.
func Chain ¶
func Chain[T any](mws ...Middleware[T]) Middleware[T]
Chain composes middlewares left-to-right: the first middleware in the list is the outermost wrapper. Chain(a, b)(h) is equivalent to a(b(h)).
Example ¶
ExampleChain demonstrates composing handler middlewares. Middlewares execute left-to-right (outermost first).
var trace []string
mwA := func(next goflux.Handler[Event]) goflux.Handler[Event] {
return func(ctx context.Context, msg goflux.Message[Event]) error {
trace = append(trace, "A-before")
err := next(ctx, msg)
trace = append(trace, "A-after")
return err
}
}
mwB := func(next goflux.Handler[Event]) goflux.Handler[Event] {
return func(ctx context.Context, msg goflux.Message[Event]) error {
trace = append(trace, "B-before")
err := next(ctx, msg)
trace = append(trace, "B-after")
return err
}
}
base := func(_ context.Context, msg goflux.Message[Event]) error {
trace = append(trace, "handler:"+msg.Payload.Name)
return nil
}
handler := goflux.Chain[Event](mwA, mwB)(base)
msg := goflux.NewMessage("events", Event{ID: "1", Name: "hello"})
_ = handler(context.Background(), msg)
for _, s := range trace {
fmt.Println(s)
}
Output: A-before B-before handler:hello B-after A-after
type ProcessOption ¶
type ProcessOption func(*processConfig)
ProcessOption configures Telemetry.RecordProcess.
func WithRemoteSpanContext ¶
func WithRemoteSpanContext(sc trace.SpanContext) ProcessOption
WithRemoteSpanContext attaches the given span context as a span link instead of using it as the parent. Use this for async transports (e.g. NATS) where the producer and consumer are temporally decoupled — the consumer span becomes a root span linked to the producer, rather than a child of it.
type Publisher ¶
type Publisher[T any] interface { // Publish serializes v via the bound Codec and delivers it to the nats. Publish(ctx context.Context, subject string, v T) error // Close releases any underlying connections. Close() error }
Publisher sends encoded messages to a nats/topic.
func RetryPublisher ¶ added in v0.2.0
func RetryPublisher[T any](pub Publisher[T], maxAttempts int, backoff BackoffFunc) Publisher[T]
RetryPublisher wraps a Publisher with retry logic. On publish failure, it retries up to maxAttempts times with delays determined by backoff. Context cancellation aborts the retry loop immediately.
If all attempts fail, the last error is returned.
Example ¶
ExampleRetryPublisher demonstrates wrapping a publisher with retry logic. Transient errors are retried with the given backoff until success.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/foomo/goflux"
)
// ExampleRetryPublisher demonstrates wrapping a publisher with retry logic.
// Transient errors are retried with the given backoff until success.
func main() {
// A publisher that fails twice then succeeds.
inner := &countingPublisher{failUntil: 2}
pub := goflux.RetryPublisher[string](inner, 3, func(_ int) time.Duration {
return time.Millisecond // fast backoff for example
})
err := pub.Publish(context.Background(), "events", "hello")
fmt.Println("error:", err)
fmt.Println("attempts:", inner.attempts)
}
// countingPublisher fails the first failUntil attempts, then succeeds.
type countingPublisher struct {
failUntil int
attempts int
}
func (p *countingPublisher) Publish(_ context.Context, _ string, _ string) error {
p.attempts++
if p.attempts <= p.failUntil {
return errors.New("transient error")
}
return nil
}
func (p *countingPublisher) Close() error { return nil }
Output: error: <nil> attempts: 3
type PublisherMiddleware ¶ added in v0.2.0
PublisherMiddleware wraps a Publisher[T] to add cross-cutting behaviour such as retry, rate-limiting, or circuit-breaking on the publish path.
type RequestHandler ¶
RequestHandler processes a request and returns a response.
type Requester ¶
type Requester[Req, Resp any] interface { // Request sends req to the given nats and returns the response. Request(ctx context.Context, subject string, req Req) (Resp, error) // Close releases any underlying resources. Close() error }
Requester sends a typed request and waits for a typed response.
type Responder ¶
type Responder[Req, Resp any] interface { // Serve registers the handler for the given nats. The call blocks // until ctx is cancelled or a fatal error occurs. Serve(ctx context.Context, subject string, handler RequestHandler[Req, Resp]) error // Close releases any underlying resources. Close() error }
Responder handles incoming requests and produces typed responses.
type Subscriber ¶
type Subscriber[T any] interface { // Subscribe registers handler for the nats. The call blocks until ctx is // canceled or the implementation encounters a fatal error. Subscribe(ctx context.Context, subject string, handler Handler[T]) error // Close unsubscribes and releases resources. Close() error }
Subscriber listens on one or more subjects and dispatches decoded messages to a Handler.
type Telemetry ¶
type Telemetry struct {
// contains filtered or unexported fields
}
Telemetry holds OTel instruments (tracer, metrics, propagator) for a single transport instance. Construct with NewTelemetry.
func DefaultTelemetry ¶ added in v0.2.0
DefaultTelemetry returns tel if non-nil, otherwise creates a new Telemetry from OTel globals. If that fails, it falls back to a noop implementation. This is the standard fallback logic used by all transports.
func NewNoopTelemetry ¶ added in v0.2.0
func NewNoopTelemetry() *Telemetry
NewNoopTelemetry returns a Telemetry backed by OTel's noop implementations. All Record* calls are safe but produce no spans or metrics.
func NewTelemetry ¶
func NewTelemetry(opts ...TelemetryOption) (*Telemetry, error)
NewTelemetry creates a Telemetry instance. Without options it reads from the current OTel globals, so callers that have already called otel.SetTracerProvider / otel.SetMeterProvider need not pass anything.
func (*Telemetry) ExtractContext ¶
func (t *Telemetry) ExtractContext(ctx context.Context, carrier propagation.TextMapCarrier) context.Context
ExtractContext extracts span context from carrier and returns an enriched context with the remote span as parent. Use this for synchronous transports (e.g. HTTP) where parent-child relationship is appropriate.
func (*Telemetry) ExtractSpanContext ¶
func (t *Telemetry) ExtractSpanContext(ctx context.Context, carrier propagation.TextMapCarrier) trace.SpanContext
ExtractSpanContext extracts the remote span context from carrier without injecting it as parent into ctx. Use this with WithRemoteSpanContext for async transports where the consumer span should link to (not be a child of) the producer span.
func (*Telemetry) InjectContext ¶
func (t *Telemetry) InjectContext(ctx context.Context, carrier propagation.TextMapCarrier)
InjectContext injects the span context from ctx into the carrier. Transports call this on the publish side to propagate trace context across wire boundaries (e.g. NATS headers, HTTP headers).
func (*Telemetry) RecordAckOutcome ¶ added in v0.2.0
RecordAckOutcome records an acknowledgment outcome (ack, nak, nak_with_delay, term) with an optional error label when the ack operation itself fails.
func (*Telemetry) RecordFetch ¶
func (t *Telemetry) RecordFetch(ctx context.Context, subject string, system semconvmsg.SystemAttr, count int, fn func(context.Context) error) error
RecordFetch opens a consumer span for a pull-based fetch operation.
func (*Telemetry) RecordProcess ¶
func (t *Telemetry) RecordProcess(ctx context.Context, subject string, system semconvmsg.SystemAttr, fn func(context.Context) error, opts ...ProcessOption) error
RecordProcess opens a consumer span, calls fn, records duration and counter. Pass WithRemoteSpanContext to attach the producer span as a link rather than a parent (recommended for async transports like NATS).
func (*Telemetry) RecordPublish ¶
func (t *Telemetry) RecordPublish(ctx context.Context, subject string, system semconvmsg.SystemAttr, fn func(context.Context) error) error
RecordPublish opens a producer span, calls fn, records duration and counter.
func (*Telemetry) RecordRequest ¶
func (t *Telemetry) RecordRequest(ctx context.Context, subject string, system semconvmsg.SystemAttr, fn func(context.Context) error) error
RecordRequest opens a client span for a request-reply call.
func (*Telemetry) RegisterLag ¶
func (t *Telemetry) RegisterLag(subject string, lagFn func() int64) (metric.Int64ObservableGauge, error)
RegisterLag registers the goflux.consumer.lag observable gauge. Uses the meter provider that was passed to NewTelemetry.
type TelemetryOption ¶
type TelemetryOption func(*telemetryConfig)
TelemetryOption configures a Telemetry instance.
func WithMeterProvider ¶
func WithMeterProvider(mp metric.MeterProvider) TelemetryOption
WithMeterProvider sets the meter provider. Defaults to otel.GetMeterProvider.
func WithPropagator ¶
func WithPropagator(p propagation.TextMapPropagator) TelemetryOption
WithPropagator sets the text-map propagator. Defaults to otel.GetTextMapPropagator.
func WithTracerProvider ¶
func WithTracerProvider(tp trace.TracerProvider) TelemetryOption
WithTracerProvider sets the tracer provider. Defaults to otel.GetTracerProvider.
type Terminator ¶
Terminator extends Acker with terminal rejection — the message will not be redelivered. Use for dead-letter patterns where handler errors are non-retryable.
type Topic ¶
type Topic[T any] struct { Publisher[T] Subscriber[T] }
Topic bundles a Publisher and Subscriber sharing the same Codec. Use it when a service needs to both produce and consume the same message type.
Example ¶
ExampleTopic demonstrates bundling a Publisher and Subscriber into a single Topic value. This is useful when a service needs to both produce and consume the same message type.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bus := channel.NewBus[Event]()
sub, err := channel.NewSubscriber(bus, 1)
if err != nil {
panic(err)
}
topic := goflux.Topic[Event]{
Publisher: channel.NewPublisher(bus),
Subscriber: sub,
}
gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
ready()
return topic.Subscribe(ctx, "events", func(_ context.Context, msg goflux.Message[Event]) error {
fmt.Println(msg.Payload.Name)
cancel()
return nil
})
}, gofuncy.WithName("subscriber"))
time.Sleep(10 * time.Millisecond)
if err := topic.Publish(ctx, "events", Event{ID: "1", Name: "bundled"}); err != nil {
panic(err)
}
<-ctx.Done()
Output: bundled