message

package module
v0.17.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: MIT Imports: 18 Imported by: 0

README

message

CloudEvents message handling with type-based routing.

Overview

The message package provides:

  • Message - CloudEvents-aligned message with typed data and attributes
  • Engine - Orchestrates message flow between inputs, handlers, and outputs
  • Handler - Type-safe command/event handlers with automatic marshaling

Engine Architecture

RawInput₁ → Unmarshal ─┐
RawInput₂ → Unmarshal ─┼─→ Merger → Router → Distributor
TypedInput ────────────┘                            │
                                         ┌──────────┴──────────┐
                                   TypedOutput            Marshal
                                                             ↓
                                                         RawOutput

The Engine uses a single merger for all message flows:

  • Merger combines typed inputs and unmarshaled raw inputs
  • Router routes messages to handlers by CE type
  • Distributor routes output to consumers using first-match-wins semantics
  • TypedOutput bypasses marshaling (for internal use)
  • RawOutput marshals to bytes (for broker integration)

Usage

Raw I/O (Broker Integration)
engine := message.NewEngine(message.EngineConfig{
    Marshaler: message.NewJSONMarshaler(),
})

// Register handlers
handler := message.NewCommandHandler(
    func(ctx context.Context, cmd OrderCommand) ([]OrderEvent, error) {
        return []OrderEvent{{ID: cmd.ID, Status: "created"}}, nil
    },
    message.CommandHandlerConfig{
        Source: "/orders",
        Naming: message.KebabNaming,
    },
)
engine.AddHandler("orders", nil, handler)

// Add raw inputs and outputs (for broker integration)
input := make(chan *message.RawMessage, 100)
engine.AddRawInput("orders-in", nil, input)
output, _ := engine.AddRawOutput("orders-out", nil)

// Start engine
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done, _ := engine.Start(ctx)

// Send/receive raw messages (bytes)
input <- &message.RawMessage{
    Data:       []byte(`{"id": "123"}`),
    Attributes: message.Attributes{"type": "order.command"},
}

out := <-output
// out.Data contains marshaled OrderEvent as []byte
Typed I/O (Internal Use / Testing)
engine := message.NewEngine(message.EngineConfig{
    Marshaler: message.NewJSONMarshaler(),
})

// Register handlers
handler := message.NewCommandHandler(
    func(ctx context.Context, cmd OrderCommand) ([]OrderEvent, error) {
        return []OrderEvent{{ID: cmd.ID, Status: "created"}}, nil
    },
    message.CommandHandlerConfig{
        Source: "/orders",
        Naming: message.KebabNaming,
    },
)
engine.AddHandler("orders", nil, handler)

// Add typed inputs and outputs (no marshal/unmarshal)
input := make(chan *message.Message, 100)
engine.AddInput("orders-in", nil, input)
output, _ := engine.AddOutput("orders-out", nil)

// Start engine
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done, _ := engine.Start(ctx)

// Send/receive typed messages directly
input <- &message.Message{
    Data:       OrderCommand{ID: "123"},
    Attributes: message.Attributes{"type": "order.command"},
}

out := <-output
// out.Data contains OrderEvent as typed struct (any)
event := out.Data.(OrderEvent)

Dynamic Input/Output

Inputs and outputs can be added after Start():

engine.Start(ctx)

// Add new raw input dynamically (broker integration)
newRawInput := make(chan *message.RawMessage, 100)
engine.AddRawInput("new-raw-input", nil, newRawInput)

// Add new typed input dynamically (internal use)
newTypedInput := make(chan *message.Message, 100)
engine.AddInput("new-typed-input", nil, newTypedInput)

// Add new outputs dynamically
newRawOutput, _ := engine.AddRawOutput("orders-out", match.Types("order.%"))
newTypedOutput, _ := engine.AddOutput("internal-out", match.Types("internal.%"))

Message Types

RawMessage

Raw bytes with CloudEvents attributes:

type RawMessage = TypedMessage[[]byte]
Message

Typed message with unmarshaled data:

msg := &message.Message{
    Data:       myStruct,
    Attributes: message.Attributes{
        "type":   "order.created",
        "source": "/orders",
    },
}
Attributes

CloudEvents-aligned attribute keys:

const (
    AttrID              = "id"
    AttrType            = "type"
    AttrSource          = "source"
    AttrSubject         = "subject"
    AttrTime            = "time"
    AttrDataContentType = "datacontenttype"
    AttrDataSchema      = "dataschema"
    AttrSpecVersion     = "specversion"
)

Handlers

CommandHandler

Processes commands and returns events:

handler := message.NewCommandHandler(
    func(ctx context.Context, cmd CreateOrder) ([]OrderCreated, error) {
        return []OrderCreated{{OrderID: cmd.ID}}, nil
    },
    message.CommandHandlerConfig{
        Source: "/orders",
        Naming: message.KebabNaming,
    },
)
Handler Interface
type Handler interface {
    EventType() string
    NewInput() any
    Handle(ctx context.Context, msg *Message) ([]*Message, error)
}

Documentation

Overview

Package message provides CloudEvents-aligned message handling with type-based routing.

This package is part of gopipe, a composable data pipeline toolkit for Go. The gopipe family includes:

  • channel — Stateless transforms, filters, fan-in/out
  • pipe — Stateful components with lifecycle management
  • message (this package) — CloudEvents message routing with type-based handlers

The package centers around the Engine, which orchestrates message flow between inputs, handlers, and outputs. Messages follow the CloudEvents specification with typed data payloads and context attributes.

Quick Start

engine := message.NewEngine(message.EngineConfig{
	Marshaler: message.NewJSONMarshaler(),
})

handler := message.NewCommandHandler(
	func(ctx context.Context, cmd OrderCmd) ([]OrderEvent, error) {
		return []OrderEvent{{ID: cmd.ID, Status: "created"}}, nil
	},
	message.CommandHandlerConfig{Source: "/orders", Naming: message.KebabNaming},
)
engine.AddHandler("orders", nil, handler)

engine.AddRawInput("in", nil, inputCh)
output, _ := engine.AddRawOutput("out", nil)

done, _ := engine.Start(ctx)

Architecture

The engine uses a single merger for all inputs. Each raw input has its own unmarshal pipe that feeds typed messages into the shared merger. Typed inputs feed directly into the merger, then route to handlers via the router.

See README.md in this package for detailed architecture diagrams.

Design Notes

Handler is self-describing via [Handler.EventType] and [Handler.NewInput], eliminating the need for a central type registry. The engine reads these methods to route messages and create instances for unmarshaling.

[Matcher.Match] uses Attributes instead of *Message because all matchers only access attributes, avoiding allocation when matching raw messages.

Add* methods take direct parameters (name, matcher) rather than config structs for simplicity. Config structs are reserved for constructors where extensibility is more important.

For rejected alternatives and common mistakes, see AGENTS.md in the repository root.

Acknowledgment

Messages support acknowledgment callbacks for broker integration. All pipeline components auto-nack on failure (unmarshal error, no handler, handler error, distribution failure, shutdown). Acking on success is explicit.

Basic pattern - ack after output delivery:

// Input source sets up acking
raw := message.NewRaw(data, attrs, message.NewAcking(
	func() { broker.Ack(msgID) },
	func(err error) { broker.Nack(msgID) },
))

// Output consumer acks after successful delivery
for msg := range output {
	if err := broker.Publish(msg); err == nil {
		msg.Ack()
	}
}

For automatic ack-on-handler-success, use middleware.AutoAck:

engine.Use(middleware.AutoAck())

Batch Processing

When flattening batches (1 input → N outputs), use NewSharedAcking for all-or-nothing semantics:

func flatten(batch *Message) []*Message {
	items := extractItems(batch.Data)
	shared := message.NewSharedAcking(
		func() { batch.Ack() },
		func(err error) { batch.Nack(err) },
		len(items),
	)
	outputs := make([]*Message, len(items))
	for i, item := range items {
		outputs[i] = message.New(item, batch.Attributes, shared)
	}
	return outputs
}

Alternative strategies for batches:

  • Ack immediately, track failures via metrics/DLQ (high throughput)
  • Route failures to dead-letter queue, ack individual items
  • Threshold-based: nack batch only if failure rate exceeds threshold

Copy shares the acking pointer, preserving acknowledgment through transforms.

Message Types

TypedMessage is the generic base type. Message (any data) and RawMessage ([]byte data) are type aliases for common use cases.

Subpackages

  • cloudevents: Integration with CloudEvents SDK protocol bindings
  • match: Matchers for filtering messages by attributes
  • middleware: Cross-cutting concerns (correlation ID, logging)
  • plugin: Reusable engine plugins

Index

Constants

View Source
const (
	// AttrID is required by CloudEvents. Unique event identifier.
	AttrID = "id"
	// AttrType is required by CloudEvents. Event type (e.g., "order.created").
	AttrType = "type"
	// AttrSource is required by CloudEvents. Event source URI.
	AttrSource = "source"
	// AttrSpecVersion is required by CloudEvents. Spec version (default "1.0").
	AttrSpecVersion = "specversion"
	// AttrSubject is optional in CloudEvents. Event subject/context.
	AttrSubject = "subject"
	// AttrTime is optional in CloudEvents. Event timestamp (RFC3339).
	AttrTime = "time"
	// AttrDataContentType is optional in CloudEvents. Data content type.
	AttrDataContentType = "datacontenttype"
	// AttrDataSchema is optional in CloudEvents. Data schema URI.
	AttrDataSchema = "dataschema"
)

CloudEvents attribute keys for use in Attributes map literals.

View Source
const (
	// AttrCorrelationID correlates related events. Propagated by middleware.CorrelationID.
	AttrCorrelationID = "correlationid"

	// AttrExpiryTime is the event expiration timestamp. Used by middleware.Deadline.
	AttrExpiryTime = "expirytime"
)

CloudEvents extension attribute keys. Extensions are custom attributes not defined in the core CloudEvents spec. Extension names follow CloudEvents naming rules: lowercase a-z, 0-9 only, max 20 chars.

Variables

View Source
var (
	// ErrAlreadyStarted is returned when Start() is called on a running engine.
	ErrAlreadyStarted = errors.New("engine already started")

	// ErrInputRejected is returned when a message is rejected by input matcher.
	ErrInputRejected = errors.New("message rejected by input matcher")

	// ErrNoHandler is returned when no handler exists for a message type.
	ErrNoHandler = errors.New("no handler for message type")

	// ErrHandlerRejected is returned when a message is rejected by handler matcher.
	ErrHandlerRejected = errors.New("message rejected by handler matcher")

	// ErrUnknownType is returned when unmarshaling a message with unknown type.
	ErrUnknownType = errors.New("unknown message type")

	// ErrHandlerExists is returned when registering a handler for an event type that already has one.
	ErrHandlerExists = errors.New("handler already registered for event type")
)

Functions

func IsExtensionAttr added in v0.13.0

func IsExtensionAttr(key string) bool

IsExtensionAttr returns true if key is a CloudEvents extension attribute (not defined in the core CloudEvents spec). Examples: correlationid, expirytime.

func IsOptionalAttr added in v0.13.0

func IsOptionalAttr(key string) bool

IsOptionalAttr returns true if key is an optional CloudEvents context attribute per CloudEvents 1.0 spec: subject, time, datacontenttype, dataschema.

func IsRequiredAttr added in v0.13.0

func IsRequiredAttr(key string) bool

IsRequiredAttr returns true if key is a required CloudEvents context attribute per CloudEvents 1.0 spec: id, source, specversion, type.

func NewID added in v0.12.0

func NewID() string

NewID generates a new unique message ID (UUID v4).

Types

type AckStrategy added in v0.17.0

type AckStrategy int

AckStrategy determines how the router handles message acknowledgment.

const (
	// AckOnSuccess automatically acks on successful processing
	// and nacks on error. This is the default.
	AckOnSuccess AckStrategy = iota

	// AckManual means the handler is responsible for acking/nacking.
	// Provides maximum control but requires explicit ack/nack calls.
	AckManual

	// AckForward forwards acknowledgment to output messages.
	// The input is acked only when ALL outputs are acked.
	// If ANY output nacks, the input is immediately nacked.
	// Useful for event sourcing where a command should only be
	// acked after all resulting events are processed.
	AckForward
)

func (AckStrategy) String added in v0.17.0

func (s AckStrategy) String() string

String implements fmt.Stringer.

type Acking

type Acking struct {
	// contains filtered or unexported fields
}

Acking coordinates acknowledgment across one or more messages. When expectedAckCount messages call Ack(), the ack callback is invoked. If any message calls Nack(), the nack callback is invoked immediately, and the done channel is closed. Acking is thread-safe and can be shared between multiple messages.

func NewAcking

func NewAcking(ack func(), nack func(error)) *Acking

NewAcking creates an Acking for a single message. Returns nil if either callback is nil.

The ack and nack callbacks must not panic. If they do, the panic will propagate after cleanup (the done channel will still be closed to prevent resource leaks, but the program will crash).

func NewSharedAcking

func NewSharedAcking(ack func(), nack func(error), expectedCount int) *Acking

NewSharedAcking creates an Acking shared across multiple messages. The ack callback is invoked after expectedCount Ack() calls. If any message nacks, all sibling messages' done channels are closed. Returns nil if expectedCount <= 0 or if either callback is nil.

The ack and nack callbacks must not panic. If they do, the panic will propagate after cleanup (the done channel will still be closed to prevent resource leaks, but the program will crash).

type Attributes

type Attributes map[string]any

Attributes is a map of message context attributes per CloudEvents spec. CloudEvents defines attributes as the metadata that describes the event.

Thread safety: Attributes is not safe for concurrent read/write access. Handlers receive a single message at a time, so concurrent access is rare. If sharing attributes between goroutines, use external synchronization.

func AttributesFromContext

func AttributesFromContext(ctx context.Context) Attributes

AttributesFromContext retrieves message attributes from context. Returns nil if no attributes are present.

type CommandHandlerConfig

type CommandHandlerConfig struct {
	Source     string          // required, CE source attribute
	Naming     EventTypeNaming // optional, derives CE types for input and output
	Attributes Attributes      // optional, merged into all output messages
}

CommandHandlerConfig configures a command handler.

type Distributor added in v0.16.0

type Distributor struct {
	// contains filtered or unexported fields
}

Distributor routes messages to multiple outputs based on matchers. Automatically nacks messages on errors and provides consistent logging.

func NewDistributor added in v0.16.0

func NewDistributor(cfg DistributorConfig) *Distributor

NewDistributor creates a new message distributor. Messages are automatically nacked on distribution failures.

func (*Distributor) AddOutput added in v0.16.0

func (d *Distributor) AddOutput(matcher Matcher) (<-chan *Message, error)

AddOutput registers an output with the given matcher. Returns a channel that receives messages matching the criteria. Can be called before or after Distribute().

func (*Distributor) Distribute added in v0.16.0

func (d *Distributor) Distribute(ctx context.Context, in <-chan *Message) (<-chan struct{}, error)

Distribute starts distributing messages from the input to registered outputs. Returns a done channel that closes when distribution is complete.

type DistributorConfig added in v0.16.0

type DistributorConfig struct {
	// BufferSize is the per-output channel buffer size (default: 100).
	BufferSize int
	// ShutdownTimeout controls shutdown behavior on context cancellation.
	// If <= 0, forces immediate shutdown (no grace period).
	// If > 0, waits up to this duration for natural completion, then forces shutdown.
	ShutdownTimeout time.Duration
	// Logger for distributor events (default: slog.Default()).
	Logger Logger
	// ErrorHandler is called on distribution errors after auto-nack (optional).
	ErrorHandler ErrorHandler
}

DistributorConfig configures the message distributor.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine orchestrates message flow between inputs, handlers, and outputs. Uses a single merger for all inputs (typed and unmarshaled raw) and a single distributor for all outputs (typed and pre-marshal raw).

func NewEngine

func NewEngine(cfg EngineConfig) *Engine

NewEngine creates a new message engine.

func (*Engine) AddHandler

func (e *Engine) AddHandler(name string, matcher Matcher, h Handler) error

AddHandler registers a handler to the default pool. The optional matcher is applied after type matching.

func (*Engine) AddInput

func (e *Engine) AddInput(name string, matcher Matcher, ch <-chan *Message) (<-chan struct{}, error)

AddInput registers a typed input channel. Typed inputs go directly to the merger, bypassing unmarshaling. Use for internal messaging, testing, or when data is already typed. Can be called before or after Start().

func (*Engine) AddOutput

func (e *Engine) AddOutput(name string, matcher Matcher) (<-chan *Message, error)

AddOutput registers a typed output and returns the channel to consume from. Typed outputs receive messages directly from the distributor without marshaling. Use for internal messaging or when you need typed access to messages. Can be called before or after Start().

func (*Engine) AddPlugin

func (e *Engine) AddPlugin(plugins ...Plugin) error

AddPlugin registers plugins that configure the engine. Plugins can add handlers, middleware, inputs, outputs, etc. Must be called before Start().

Ordering: Plugins that add outputs are matched first-wins. Call AddPlugin before AddOutput if the plugin should have priority when matchers overlap.

func (*Engine) AddRawInput

func (e *Engine) AddRawInput(name string, matcher Matcher, ch <-chan *RawMessage) (<-chan struct{}, error)

AddRawInput registers a raw input channel. Raw inputs are filtered, unmarshaled, and fed to the merger. Use for broker integration (Kafka, NATS, RabbitMQ, etc.). Can be called before or after Start().

func (*Engine) AddRawOutput

func (e *Engine) AddRawOutput(name string, matcher Matcher) (<-chan *RawMessage, error)

AddRawOutput registers a raw output and returns the channel to consume from. Raw outputs receive messages after marshaling to bytes. Use for broker integration (Kafka, NATS, RabbitMQ, etc.). Can be called before or after Start().

func (*Engine) Start

func (e *Engine) Start(ctx context.Context) (<-chan struct{}, error)

Start begins processing messages. Returns a done channel that closes when the engine has fully stopped.

Shutdown behavior:

For graceful shutdown, close all input channels first, then cancel the context. This allows the pipeline to drain naturally without message loss.

If context is cancelled with inputs still open:

  • ShutdownTimeout <= 0: forces immediate shutdown, drops messages in inputs
  • ShutdownTimeout > 0: waits up to timeout for natural completion, then forces shutdown and drops messages still in input channels

Messages that pass the merger are guaranteed to be delivered to outputs. The router, distributor, and marshal pipes use the same ShutdownTimeout, creating a cascading drain that ensures delivery after the merger.

func (*Engine) Use

func (e *Engine) Use(m ...Middleware) error

Use registers middleware to wrap message processing. Middleware is applied in order: first registered wraps outermost. Must be called before Start().

type EngineConfig

type EngineConfig struct {
	// Marshaler converts between typed data and raw bytes.
	Marshaler Marshaler
	// BufferSize is the engine buffer size for merger and distributor (default: 100).
	BufferSize int
	// RouterPool configures the router's worker pool (default: 1 worker, 100 buffer).
	RouterPool PoolConfig
	// ProcessTimeout sets a per-message processing deadline (default: 0, no timeout).
	// Applied to all handlers in the router and marshal/unmarshal pipes.
	// If > 0, each handler invocation is wrapped with a timeout context.
	ProcessTimeout time.Duration
	// AckStrategy determines how messages are acknowledged (default: AckOnSuccess).
	// AckOnSuccess: auto-ack on success, auto-nack on error.
	// AckManual: handler responsible for acking/nacking.
	// AckForward: ack when all outputs ack.
	AckStrategy AckStrategy
	// ErrorHandler is called on processing errors (default: no-op).
	// Errors are logged via Logger; use ErrorHandler for custom handling
	// like metrics, alerting, or recovery logic.
	ErrorHandler ErrorHandler
	// Logger for engine events (default: slog.Default()).
	Logger Logger
	// ShutdownTimeout controls shutdown behavior on context cancellation.
	// If <= 0, forces immediate shutdown (no grace period).
	// If > 0, waits up to this duration for natural completion, then forces shutdown.
	// On forced shutdown, remaining messages are drained and reported via ErrorHandler.
	ShutdownTimeout time.Duration
}

EngineConfig configures the message engine.

type ErrorHandler

type ErrorHandler func(msg *Message, err error)

ErrorHandler processes engine errors.

type EventTypeNaming

type EventTypeNaming interface {
	EventType(t reflect.Type) string
}

EventTypeNaming derives CloudEvents event types from Go types.

var DefaultNaming EventTypeNaming = defaultNaming{}

DefaultNaming uses the Go type name as-is, without transformation. Example: OrderCreated → "OrderCreated"

var KebabNaming EventTypeNaming = kebabNaming{}

KebabNaming converts PascalCase to dot-separated lowercase. Example: OrderCreated → "order.created"

var SnakeNaming EventTypeNaming = snakeNaming{}

SnakeNaming converts PascalCase to underscore-separated lowercase. Example: OrderCreated → "order_created"

type FactoryMap

type FactoryMap map[string]func() any

FactoryMap is a simple InputRegistry for standalone use.

func (FactoryMap) NewInput

func (m FactoryMap) NewInput(eventType string) any

type Handler

type Handler interface {
	// EventType returns the CE type this handler processes.
	EventType() string

	// NewInput creates a new instance for unmarshaling input data.
	NewInput() any

	// Handle processes a message and returns output messages.
	Handle(ctx context.Context, msg *Message) ([]*Message, error)
}

Handler processes messages of a specific CE type.

func NewCommandHandler

func NewCommandHandler[C, E any](
	fn func(ctx context.Context, cmd C) ([]E, error),
	cfg CommandHandlerConfig,
) Handler

NewCommandHandler creates a handler that receives commands directly. Config provides Source and EventTypeNaming for deriving CE types. If Naming is nil, DefaultNaming is used.

func NewHandler

func NewHandler[T any](
	fn func(ctx context.Context, msg *Message) ([]*Message, error),
	naming EventTypeNaming,
) Handler

NewHandler creates a handler from a typed function. The generic type T is used for unmarshaling and event type derivation. EventTypeNaming derives EventType from T. If nil, DefaultNaming is used.

type InputRegistry

type InputRegistry interface {
	NewInput(eventType string) any // nil if unknown type
}

InputRegistry creates typed instances for unmarshaling.

type JSONMarshaler

type JSONMarshaler struct{}

JSONMarshaler implements Marshaler using JSON encoding.

func NewJSONMarshaler

func NewJSONMarshaler() *JSONMarshaler

NewJSONMarshaler creates a new JSON marshaler.

func (*JSONMarshaler) DataContentType

func (m *JSONMarshaler) DataContentType() string

DataContentType returns "application/json".

func (*JSONMarshaler) Marshal

func (m *JSONMarshaler) Marshal(v any) ([]byte, error)

Marshal encodes a value to JSON bytes.

func (*JSONMarshaler) Unmarshal

func (m *JSONMarshaler) Unmarshal(data []byte, v any) error

Unmarshal decodes JSON bytes into a value.

type Logger

type Logger interface {
	// Debug logs a message at debug level.
	Debug(msg string, args ...any)
	// Info logs a message at info level.
	Info(msg string, args ...any)
	// Warn logs a message at warning level.
	Warn(msg string, args ...any)
	// Error logs a message at error level.
	Error(msg string, args ...any)
}

Logger defines an interface for logging at different severity levels.

type MarshalPipe added in v0.16.0

type MarshalPipe struct {
	// contains filtered or unexported fields
}

MarshalPipe converts Message to RawMessage using a marshaler. Automatically nacks messages on errors and provides consistent logging.

func NewMarshalPipe

func NewMarshalPipe(marshaler Marshaler, cfg PipeConfig) *MarshalPipe

NewMarshalPipe creates a pipe that marshals Message to RawMessage. Messages are automatically nacked on marshal failures.

func (*MarshalPipe) Pipe added in v0.16.0

func (p *MarshalPipe) Pipe(ctx context.Context, in <-chan *Message) (<-chan *RawMessage, error)

Pipe starts the marshal pipeline.

func (*MarshalPipe) Use added in v0.17.1

Use adds middleware to the marshal processing chain. Middleware is applied in the order it is added. Returns ErrAlreadyStarted if the pipe has already been started.

type Marshaler

type Marshaler interface {
	// Marshal encodes a value to bytes.
	Marshal(v any) ([]byte, error)

	// Unmarshal decodes bytes into a value.
	Unmarshal(data []byte, v any) error

	// DataContentType returns the CloudEvents datacontenttype attribute value.
	// Example: "application/json"
	DataContentType() string
}

Marshaler handles serialization and deserialization of message data.

type Matcher

type Matcher interface {
	Match(attrs Attributes) bool
}

Matcher tests whether attributes match a condition. Used for input filtering and output routing. Operates on Attributes only (not full Message) to work with both Message and RawMessage without wrapper allocation.

type Merger added in v0.16.0

type Merger struct {
	// contains filtered or unexported fields
}

Merger combines multiple message input channels into a single output. Automatically nacks messages on errors and provides consistent logging.

func NewMerger added in v0.16.0

func NewMerger(cfg MergerConfig) *Merger

NewMerger creates a new message merger. Messages are automatically nacked on merge failures (e.g., shutdown timeout).

func (*Merger) AddInput added in v0.16.0

func (m *Merger) AddInput(ch <-chan *Message) (<-chan struct{}, error)

AddInput registers an input channel. Returns a done channel that closes when the input is fully consumed. Can be called before or after Merge().

func (*Merger) Merge added in v0.16.0

func (m *Merger) Merge(ctx context.Context) (<-chan *Message, error)

Merge starts merging all registered inputs into a single output channel. The output channel closes when all inputs are closed (or shutdown timeout).

type MergerConfig added in v0.16.0

type MergerConfig struct {
	// BufferSize is the output channel buffer size (default: 100).
	BufferSize int
	// ShutdownTimeout controls shutdown behavior on context cancellation.
	// If <= 0, forces immediate shutdown (no grace period).
	// If > 0, waits up to this duration for natural completion, then forces shutdown.
	ShutdownTimeout time.Duration
	// Logger for merger events (default: slog.Default()).
	Logger Logger
	// ErrorHandler is called on merge errors after auto-nack (optional).
	ErrorHandler ErrorHandler
}

MergerConfig configures the message merger.

type Message

type Message = TypedMessage[any]

Message is the internal message type used by handlers and middleware. Data holds any typed payload after unmarshaling from RawMessage.

func FromContext added in v0.17.0

func FromContext(ctx context.Context) *Message

FromContext is an alias for MessageFromContext.

func MessageFromContext added in v0.17.0

func MessageFromContext(ctx context.Context) *Message

MessageFromContext retrieves the Message from context. Returns nil if no message is present.

func New

func New(data any, attrs Attributes, acking *Acking) *Message

New creates a Message for engine input channels. Pass nil for attrs or acking if not needed.

type Middleware

type Middleware func(ProcessFunc) ProcessFunc

Middleware wraps a ProcessFunc to add cross-cutting concerns.

type PipeConfig added in v0.16.0

type PipeConfig struct {
	// Pool configures concurrency and buffering (default: 1 worker, 100 buffer).
	Pool PoolConfig
	// ProcessTimeout sets a per-message processing deadline (default: 0, no timeout).
	// If > 0, each handler invocation is wrapped with a timeout context.
	// During normal operation, handlers are cancelled if they exceed ProcessTimeout.
	// During shutdown grace period, handlers continue with their ProcessTimeout.
	// On forced shutdown (grace period expired), handlers are cancelled immediately.
	ProcessTimeout time.Duration
	// ShutdownTimeout controls shutdown behavior on context cancellation.
	// If <= 0, forces immediate shutdown (no grace period).
	// If > 0, waits up to this duration for natural completion, then forces shutdown.
	ShutdownTimeout time.Duration
	// AckStrategy determines how messages are acknowledged (default: AckOnSuccess).
	// AckOnSuccess: auto-ack on success, auto-nack on error.
	// AckManual: handler responsible for acking/nacking.
	// AckForward: ack when all outputs ack.
	AckStrategy AckStrategy
	// Logger for pipe events (default: slog.Default()).
	Logger Logger
	// ErrorHandler is called on processing errors (default: no-op, errors logged via Logger).
	ErrorHandler ErrorHandler
}

PipeConfig configures process pipes (Router, UnmarshalPipe, MarshalPipe).

type Plugin

type Plugin func(*Engine) error

Plugin registers handlers, middleware, or other components with an engine.

type PoolConfig added in v0.14.0

type PoolConfig struct {
	// Workers is the number of concurrent workers (default: 1).
	Workers int
	// BufferSize is the output channel buffer size (default: 100).
	BufferSize int
}

PoolConfig configures concurrency and buffering for process pipes.

type ProcessFunc

type ProcessFunc func(context.Context, *Message) ([]*Message, error)

ProcessFunc is the message processing function signature.

type RawMessage

type RawMessage = TypedMessage[[]byte]

RawMessage is the broker boundary message type with serialized []byte data. Used for pub/sub integrations (Kafka, RabbitMQ, NATS, etc.).

func NewRaw

func NewRaw(data []byte, attrs Attributes, acking *Acking) *RawMessage

NewRaw creates a RawMessage for broker integration. Pass nil for attrs or acking if not needed.

func ParseRaw added in v0.12.0

func ParseRaw(r io.Reader) (*RawMessage, error)

ParseRaw parses CloudEvents structured JSON from r into a RawMessage. Handles both data and data_base64 fields per CloudEvents spec.

func RawMessageFromContext added in v0.17.0

func RawMessageFromContext(ctx context.Context) *RawMessage

RawMessageFromContext retrieves the RawMessage from context. Returns nil if no raw message is present.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router dispatches messages to handlers by CE type. Implements Pipe signature for composability with pipe.Apply(). Uses pipe.ProcessPipe internally for middleware, concurrency, and error handling.

func NewRouter

func NewRouter(cfg PipeConfig) *Router

NewRouter creates a new message router.

func (*Router) AddHandler

func (r *Router) AddHandler(name string, matcher Matcher, h Handler) error

AddHandler registers a handler. The optional matcher is applied after type matching.

func (*Router) NewInput

func (r *Router) NewInput(eventType string) any

NewInput creates a typed instance for unmarshaling. Implements InputRegistry.

func (*Router) Pipe

func (r *Router) Pipe(ctx context.Context, in <-chan *Message) (<-chan *Message, error)

Pipe routes messages to handlers and returns outputs. Signature matches pipe.Pipe[*Message, *Message] for composability.

Built-in middleware applied automatically (innermost, closest to handler):

  • Acking: handles ack/nack based on AckStrategy (default: AckOnSuccess)

User middleware via Use() wraps outside the acking middleware.

func (*Router) Use

func (r *Router) Use(m ...Middleware) error

Use registers middleware to wrap message processing. Middleware is applied in order: first registered wraps outermost. Must be called before Pipe().

type TypedMessage

type TypedMessage[T any] struct {
	// Data is the event payload per CloudEvents spec.
	Data T

	// Attributes contains the context attributes per CloudEvents spec.
	Attributes Attributes
	// contains filtered or unexported fields
}

TypedMessage wraps a typed data payload with attributes and acknowledgment callbacks. This is the base generic type for all message variants. Ack/Nack operations are mutually exclusive and idempotent.

func Copy

func Copy[In, Out any](msg *TypedMessage[In], data Out) *TypedMessage[Out]

Copy creates a new message with different data while preserving attributes (cloned) and acknowledgment callbacks (shared).

func NewTyped

func NewTyped[T any](data T, attrs Attributes, acking *Acking) *TypedMessage[T]

NewTyped creates a generic typed message. Pass nil for attrs or acking if not needed.

func (*TypedMessage[T]) Ack

func (m *TypedMessage[T]) Ack() bool

Ack acknowledges successful processing of the message. Returns true if acknowledgment succeeded or was already performed. Returns false if no ack callback was provided or if the message was already nacked. The ack callback is invoked at most once when all stages have acked. Thread-safe. Callbacks are invoked outside the mutex to prevent deadlocks.

func (*TypedMessage[T]) Context added in v0.17.0

func (m *TypedMessage[T]) Context(parent context.Context) context.Context

Context returns a context derived from parent with message-specific behavior.

The context provides:

  • Deadline from minimum of parent deadline and message ExpiryTime
  • Message reference via MessageFromContext or RawMessageFromContext
  • Attributes via AttributesFromContext
  • Parent cancellation propagation

Note: This method reports the deadline via ctx.Deadline() but does not create timers or enforce the deadline. Use middleware.Deadline() for enforcement with proper timer cleanup.

For settlement detection (ack/nack), use msg.Done() directly. This keeps context cancellation (lifecycle) separate from message settlement (domain logic).

func (*TypedMessage[T]) CorrelationID added in v0.13.0

func (m *TypedMessage[T]) CorrelationID() string

CorrelationID returns the correlation ID extension. Returns empty string if not set.

func (*TypedMessage[T]) DataContentType added in v0.12.0

func (m *TypedMessage[T]) DataContentType() string

DataContentType returns the data content type. Returns empty string if not set.

func (*TypedMessage[T]) DataSchema added in v0.12.0

func (m *TypedMessage[T]) DataSchema() string

DataSchema returns the data schema URI. Returns empty string if not set.

func (*TypedMessage[T]) Done added in v0.17.0

func (m *TypedMessage[T]) Done() <-chan struct{}

Done returns a channel that is closed when the message is settled (acked or nacked). Returns nil if no acking is set.

func (*TypedMessage[T]) Err added in v0.17.0

func (m *TypedMessage[T]) Err() error

Err returns the error from Nack, or nil if pending or acked. Use with Done() to check settlement status, similar to context.Context:

select {
case <-msg.Done():
    if err := msg.Err(); err != nil {
        // nacked with error
    } else {
        // acked successfully
    }
default:
    // still pending
}

func (*TypedMessage[T]) ExpiryTime added in v0.13.0

func (m *TypedMessage[T]) ExpiryTime() time.Time

ExpiryTime returns the expiry time extension. Returns zero time if not set or invalid.

func (*TypedMessage[T]) ID added in v0.12.0

func (m *TypedMessage[T]) ID() string

ID returns the event identifier. Returns empty string if not set.

func (*TypedMessage[T]) MarshalJSON added in v0.12.0

func (m *TypedMessage[T]) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler. Returns the message as CloudEvents structured JSON.

func (*TypedMessage[T]) Nack

func (m *TypedMessage[T]) Nack(err error) bool

Nack negatively acknowledges the message due to a processing error. Returns true if negative acknowledgment succeeded or was already performed. Returns false if no nack callback was provided or if the message was already acked. The nack callback is invoked immediately with the first error, permanently blocking all further acks. Also closes the done channel, allowing sibling messages to detect the settlement. Thread-safe. Callbacks are invoked outside the mutex to prevent deadlocks.

func (*TypedMessage[T]) Source added in v0.12.0

func (m *TypedMessage[T]) Source() string

Source returns the event source. Returns empty string if not set.

func (*TypedMessage[T]) SpecVersion added in v0.12.0

func (m *TypedMessage[T]) SpecVersion() string

SpecVersion returns the CloudEvents spec version. Returns empty string if not set.

func (*TypedMessage[T]) String

func (m *TypedMessage[T]) String() string

String implements fmt.Stringer. Returns the message as CloudEvents structured JSON.

func (*TypedMessage[T]) Subject added in v0.12.0

func (m *TypedMessage[T]) Subject() string

Subject returns the event subject. Returns empty string if not set.

func (*TypedMessage[T]) Time added in v0.12.0

func (m *TypedMessage[T]) Time() time.Time

Time returns the event timestamp. Returns zero time if not set or invalid.

func (*TypedMessage[T]) Type added in v0.12.0

func (m *TypedMessage[T]) Type() string

Type returns the event type. Returns empty string if not set.

func (*TypedMessage[T]) WriteTo

func (m *TypedMessage[T]) WriteTo(w io.Writer) (int64, error)

WriteTo implements io.WriterTo. Writes the message as CloudEvents structured JSON.

type UnmarshalPipe added in v0.16.0

type UnmarshalPipe struct {
	// contains filtered or unexported fields
}

UnmarshalPipe converts RawMessage to Message using a registry and marshaler. Automatically nacks messages on errors and provides consistent logging.

func NewUnmarshalPipe

func NewUnmarshalPipe(registry InputRegistry, marshaler Marshaler, cfg PipeConfig) *UnmarshalPipe

NewUnmarshalPipe creates a pipe that unmarshals RawMessage to Message. Uses registry to create typed instances for unmarshaling. Messages are automatically nacked on unmarshal failures.

func (*UnmarshalPipe) Pipe added in v0.16.0

func (p *UnmarshalPipe) Pipe(ctx context.Context, in <-chan *RawMessage) (<-chan *Message, error)

Pipe starts the unmarshal pipeline.

func (*UnmarshalPipe) Use added in v0.17.1

Use adds middleware to the unmarshal processing chain. Middleware is applied in the order it is added. Returns ErrAlreadyStarted if the pipe has already been started.

Directories

Path Synopsis
Package cloudevents provides integration between gopipe's message.Engine and the CloudEvents SDK protocol bindings.
Package cloudevents provides integration between gopipe's message.Engine and the CloudEvents SDK protocol bindings.
Package http provides HTTP pub/sub for CloudEvents using standard library net/http.
Package http provides HTTP pub/sub for CloudEvents using standard library net/http.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL