Documentation
¶
Overview ¶
Package message provides CloudEvents-aligned message handling with type-based routing.
This package is part of gopipe, a composable data pipeline toolkit for Go. The gopipe family includes:
- channel — Stateless transforms, filters, fan-in/out
- pipe — Stateful components with lifecycle management
- message (this package) — CloudEvents message routing with type-based handlers
The package centers around the Engine, which orchestrates message flow between inputs, handlers, and outputs. Messages follow the CloudEvents specification with typed data payloads and context attributes.
Quick Start ¶
engine := message.NewEngine(message.EngineConfig{
Marshaler: message.NewJSONMarshaler(),
})
handler := message.NewCommandHandler(
func(ctx context.Context, cmd OrderCmd) ([]OrderEvent, error) {
return []OrderEvent{{ID: cmd.ID, Status: "created"}}, nil
},
message.CommandHandlerConfig{Source: "/orders", Naming: message.KebabNaming},
)
engine.AddHandler("orders", nil, handler)
engine.AddRawInput("in", nil, inputCh)
output, _ := engine.AddRawOutput("out", nil)
done, _ := engine.Start(ctx)
Architecture ¶
The engine uses a single merger for all inputs. Each raw input has its own unmarshal pipe that feeds typed messages into the shared merger. Typed inputs feed directly into the merger, then route to handlers via the router.
See README.md in this package for detailed architecture diagrams.
Design Notes ¶
Handler is self-describing via [Handler.EventType] and [Handler.NewInput], eliminating the need for a central type registry. The engine reads these methods to route messages and create instances for unmarshaling.
[Matcher.Match] uses Attributes instead of *Message because all matchers only access attributes, avoiding allocation when matching raw messages.
Add* methods take direct parameters (name, matcher) rather than config structs for simplicity. Config structs are reserved for constructors where extensibility is more important.
For rejected alternatives and common mistakes, see AGENTS.md in the repository root.
Acknowledgment ¶
Messages support acknowledgment callbacks for broker integration. All pipeline components auto-nack on failure (unmarshal error, no handler, handler error, distribution failure, shutdown). Acking on success is explicit.
Basic pattern - ack after output delivery:
// Input source sets up acking
raw := message.NewRaw(data, attrs, message.NewAcking(
func() { broker.Ack(msgID) },
func(err error) { broker.Nack(msgID) },
))
// Output consumer acks after successful delivery
for msg := range output {
if err := broker.Publish(msg); err == nil {
msg.Ack()
}
}
For automatic ack-on-handler-success, use middleware.AutoAck:
engine.Use(middleware.AutoAck())
Batch Processing ¶
When flattening batches (1 input → N outputs), use NewSharedAcking for all-or-nothing semantics:
func flatten(batch *Message) []*Message {
items := extractItems(batch.Data)
shared := message.NewSharedAcking(
func() { batch.Ack() },
func(err error) { batch.Nack(err) },
len(items),
)
outputs := make([]*Message, len(items))
for i, item := range items {
outputs[i] = message.New(item, batch.Attributes, shared)
}
return outputs
}
Alternative strategies for batches:
- Ack immediately, track failures via metrics/DLQ (high throughput)
- Route failures to dead-letter queue, ack individual items
- Threshold-based: nack batch only if failure rate exceeds threshold
Copy shares the acking pointer, preserving acknowledgment through transforms.
Message Types ¶
TypedMessage is the generic base type. Message (any data) and RawMessage ([]byte data) are type aliases for common use cases.
Subpackages ¶
- cloudevents: Integration with CloudEvents SDK protocol bindings
- match: Matchers for filtering messages by attributes
- middleware: Cross-cutting concerns (correlation ID, logging)
- plugin: Reusable engine plugins
Index ¶
- Constants
- Variables
- func IsExtensionAttr(key string) bool
- func IsOptionalAttr(key string) bool
- func IsRequiredAttr(key string) bool
- func NewID() string
- type AckStrategy
- type Acking
- type Attributes
- type CommandHandlerConfig
- type Distributor
- type DistributorConfig
- type Engine
- func (e *Engine) AddHandler(name string, matcher Matcher, h Handler) error
- func (e *Engine) AddInput(name string, matcher Matcher, ch <-chan *Message) (<-chan struct{}, error)
- func (e *Engine) AddOutput(name string, matcher Matcher) (<-chan *Message, error)
- func (e *Engine) AddPlugin(plugins ...Plugin) error
- func (e *Engine) AddRawInput(name string, matcher Matcher, ch <-chan *RawMessage) (<-chan struct{}, error)
- func (e *Engine) AddRawOutput(name string, matcher Matcher) (<-chan *RawMessage, error)
- func (e *Engine) Start(ctx context.Context) (<-chan struct{}, error)
- func (e *Engine) Use(m ...Middleware) error
- type EngineConfig
- type ErrorHandler
- type EventTypeNaming
- type FactoryMap
- type Handler
- type InputRegistry
- type JSONMarshaler
- type Logger
- type MarshalPipe
- type Marshaler
- type Matcher
- type Merger
- type MergerConfig
- type Message
- type Middleware
- type PipeConfig
- type Plugin
- type PoolConfig
- type ProcessFunc
- type RawMessage
- type Router
- type TypedMessage
- func (m *TypedMessage[T]) Ack() bool
- func (m *TypedMessage[T]) Context(parent context.Context) context.Context
- func (m *TypedMessage[T]) CorrelationID() string
- func (m *TypedMessage[T]) DataContentType() string
- func (m *TypedMessage[T]) DataSchema() string
- func (m *TypedMessage[T]) Done() <-chan struct{}
- func (m *TypedMessage[T]) Err() error
- func (m *TypedMessage[T]) ExpiryTime() time.Time
- func (m *TypedMessage[T]) ID() string
- func (m *TypedMessage[T]) MarshalJSON() ([]byte, error)
- func (m *TypedMessage[T]) Nack(err error) bool
- func (m *TypedMessage[T]) Source() string
- func (m *TypedMessage[T]) SpecVersion() string
- func (m *TypedMessage[T]) String() string
- func (m *TypedMessage[T]) Subject() string
- func (m *TypedMessage[T]) Time() time.Time
- func (m *TypedMessage[T]) Type() string
- func (m *TypedMessage[T]) WriteTo(w io.Writer) (int64, error)
- type UnmarshalPipe
Constants ¶
const ( // AttrID is required by CloudEvents. Unique event identifier. AttrID = "id" // AttrType is required by CloudEvents. Event type (e.g., "order.created"). AttrType = "type" // AttrSource is required by CloudEvents. Event source URI. AttrSource = "source" // AttrSpecVersion is required by CloudEvents. Spec version (default "1.0"). AttrSpecVersion = "specversion" // AttrSubject is optional in CloudEvents. Event subject/context. AttrSubject = "subject" // AttrTime is optional in CloudEvents. Event timestamp (RFC3339). AttrTime = "time" // AttrDataContentType is optional in CloudEvents. Data content type. AttrDataContentType = "datacontenttype" // AttrDataSchema is optional in CloudEvents. Data schema URI. AttrDataSchema = "dataschema" )
CloudEvents attribute keys for use in Attributes map literals.
const ( // AttrCorrelationID correlates related events. Propagated by middleware.CorrelationID. AttrCorrelationID = "correlationid" // AttrExpiryTime is the event expiration timestamp. Used by middleware.Deadline. AttrExpiryTime = "expirytime" )
CloudEvents extension attribute keys. Extensions are custom attributes not defined in the core CloudEvents spec. Extension names follow CloudEvents naming rules: lowercase a-z, 0-9 only, max 20 chars.
Variables ¶
var ( // ErrAlreadyStarted is returned when Start() is called on a running engine. ErrAlreadyStarted = errors.New("engine already started") // ErrInputRejected is returned when a message is rejected by input matcher. ErrInputRejected = errors.New("message rejected by input matcher") // ErrNoHandler is returned when no handler exists for a message type. ErrNoHandler = errors.New("no handler for message type") // ErrHandlerRejected is returned when a message is rejected by handler matcher. ErrHandlerRejected = errors.New("message rejected by handler matcher") // ErrUnknownType is returned when unmarshaling a message with unknown type. ErrUnknownType = errors.New("unknown message type") // ErrHandlerExists is returned when registering a handler for an event type that already has one. ErrHandlerExists = errors.New("handler already registered for event type") )
Functions ¶
func IsExtensionAttr ¶ added in v0.13.0
IsExtensionAttr returns true if key is a CloudEvents extension attribute (not defined in the core CloudEvents spec). Examples: correlationid, expirytime.
func IsOptionalAttr ¶ added in v0.13.0
IsOptionalAttr returns true if key is an optional CloudEvents context attribute per CloudEvents 1.0 spec: subject, time, datacontenttype, dataschema.
func IsRequiredAttr ¶ added in v0.13.0
IsRequiredAttr returns true if key is a required CloudEvents context attribute per CloudEvents 1.0 spec: id, source, specversion, type.
Types ¶
type AckStrategy ¶ added in v0.17.0
type AckStrategy int
AckStrategy determines how the router handles message acknowledgment.
const ( // AckOnSuccess automatically acks on successful processing // and nacks on error. This is the default. AckOnSuccess AckStrategy = iota // AckManual means the handler is responsible for acking/nacking. // Provides maximum control but requires explicit ack/nack calls. AckManual // AckForward forwards acknowledgment to output messages. // The input is acked only when ALL outputs are acked. // If ANY output nacks, the input is immediately nacked. // Useful for event sourcing where a command should only be // acked after all resulting events are processed. AckForward )
func (AckStrategy) String ¶ added in v0.17.0
func (s AckStrategy) String() string
String implements fmt.Stringer.
type Acking ¶
type Acking struct {
// contains filtered or unexported fields
}
Acking coordinates acknowledgment across one or more messages. When expectedAckCount messages call Ack(), the ack callback is invoked. If any message calls Nack(), the nack callback is invoked immediately, and the done channel is closed. Acking is thread-safe and can be shared between multiple messages.
func NewAcking ¶
NewAcking creates an Acking for a single message. Returns nil if either callback is nil.
The ack and nack callbacks must not panic. If they do, the panic will propagate after cleanup (the done channel will still be closed to prevent resource leaks, but the program will crash).
func NewSharedAcking ¶
NewSharedAcking creates an Acking shared across multiple messages. The ack callback is invoked after expectedCount Ack() calls. If any message nacks, all sibling messages' done channels are closed. Returns nil if expectedCount <= 0 or if either callback is nil.
The ack and nack callbacks must not panic. If they do, the panic will propagate after cleanup (the done channel will still be closed to prevent resource leaks, but the program will crash).
type Attributes ¶
Attributes is a map of message context attributes per CloudEvents spec. CloudEvents defines attributes as the metadata that describes the event.
Thread safety: Attributes is not safe for concurrent read/write access. Handlers receive a single message at a time, so concurrent access is rare. If sharing attributes between goroutines, use external synchronization.
func AttributesFromContext ¶
func AttributesFromContext(ctx context.Context) Attributes
AttributesFromContext retrieves message attributes from context. Returns nil if no attributes are present.
type CommandHandlerConfig ¶
type CommandHandlerConfig struct {
Source string // required, CE source attribute
Naming EventTypeNaming // optional, derives CE types for input and output
Attributes Attributes // optional, merged into all output messages
}
CommandHandlerConfig configures a command handler.
type Distributor ¶ added in v0.16.0
type Distributor struct {
// contains filtered or unexported fields
}
Distributor routes messages to multiple outputs based on matchers. Automatically nacks messages on errors and provides consistent logging.
func NewDistributor ¶ added in v0.16.0
func NewDistributor(cfg DistributorConfig) *Distributor
NewDistributor creates a new message distributor. Messages are automatically nacked on distribution failures.
func (*Distributor) AddOutput ¶ added in v0.16.0
func (d *Distributor) AddOutput(matcher Matcher) (<-chan *Message, error)
AddOutput registers an output with the given matcher. Returns a channel that receives messages matching the criteria. Can be called before or after Distribute().
func (*Distributor) Distribute ¶ added in v0.16.0
func (d *Distributor) Distribute(ctx context.Context, in <-chan *Message) (<-chan struct{}, error)
Distribute starts distributing messages from the input to registered outputs. Returns a done channel that closes when distribution is complete.
type DistributorConfig ¶ added in v0.16.0
type DistributorConfig struct {
// BufferSize is the per-output channel buffer size (default: 100).
BufferSize int
// ShutdownTimeout controls shutdown behavior on context cancellation.
// If <= 0, forces immediate shutdown (no grace period).
// If > 0, waits up to this duration for natural completion, then forces shutdown.
ShutdownTimeout time.Duration
// Logger for distributor events (default: slog.Default()).
Logger Logger
// ErrorHandler is called on distribution errors after auto-nack (optional).
ErrorHandler ErrorHandler
}
DistributorConfig configures the message distributor.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine orchestrates message flow between inputs, handlers, and outputs. Uses a single merger for all inputs (typed and unmarshaled raw) and a single distributor for all outputs (typed and pre-marshal raw).
func (*Engine) AddHandler ¶
AddHandler registers a handler to the default pool. The optional matcher is applied after type matching.
func (*Engine) AddInput ¶
func (e *Engine) AddInput(name string, matcher Matcher, ch <-chan *Message) (<-chan struct{}, error)
AddInput registers a typed input channel. Typed inputs go directly to the merger, bypassing unmarshaling. Use for internal messaging, testing, or when data is already typed. Can be called before or after Start().
func (*Engine) AddOutput ¶
AddOutput registers a typed output and returns the channel to consume from. Typed outputs receive messages directly from the distributor without marshaling. Use for internal messaging or when you need typed access to messages. Can be called before or after Start().
func (*Engine) AddPlugin ¶
AddPlugin registers plugins that configure the engine. Plugins can add handlers, middleware, inputs, outputs, etc. Must be called before Start().
Ordering: Plugins that add outputs are matched first-wins. Call AddPlugin before AddOutput if the plugin should have priority when matchers overlap.
func (*Engine) AddRawInput ¶
func (e *Engine) AddRawInput(name string, matcher Matcher, ch <-chan *RawMessage) (<-chan struct{}, error)
AddRawInput registers a raw input channel. Raw inputs are filtered, unmarshaled, and fed to the merger. Use for broker integration (Kafka, NATS, RabbitMQ, etc.). Can be called before or after Start().
func (*Engine) AddRawOutput ¶
func (e *Engine) AddRawOutput(name string, matcher Matcher) (<-chan *RawMessage, error)
AddRawOutput registers a raw output and returns the channel to consume from. Raw outputs receive messages after marshaling to bytes. Use for broker integration (Kafka, NATS, RabbitMQ, etc.). Can be called before or after Start().
func (*Engine) Start ¶
Start begins processing messages. Returns a done channel that closes when the engine has fully stopped.
Shutdown behavior:
For graceful shutdown, close all input channels first, then cancel the context. This allows the pipeline to drain naturally without message loss.
If context is cancelled with inputs still open:
- ShutdownTimeout <= 0: forces immediate shutdown, drops messages in inputs
- ShutdownTimeout > 0: waits up to timeout for natural completion, then forces shutdown and drops messages still in input channels
Messages that pass the merger are guaranteed to be delivered to outputs. The router, distributor, and marshal pipes use the same ShutdownTimeout, creating a cascading drain that ensures delivery after the merger.
func (*Engine) Use ¶
func (e *Engine) Use(m ...Middleware) error
Use registers middleware to wrap message processing. Middleware is applied in order: first registered wraps outermost. Must be called before Start().
type EngineConfig ¶
type EngineConfig struct {
// Marshaler converts between typed data and raw bytes.
Marshaler Marshaler
// BufferSize is the engine buffer size for merger and distributor (default: 100).
BufferSize int
// RouterPool configures the router's worker pool (default: 1 worker, 100 buffer).
RouterPool PoolConfig
// ProcessTimeout sets a per-message processing deadline (default: 0, no timeout).
// Applied to all handlers in the router and marshal/unmarshal pipes.
// If > 0, each handler invocation is wrapped with a timeout context.
ProcessTimeout time.Duration
// AckStrategy determines how messages are acknowledged (default: AckOnSuccess).
// AckOnSuccess: auto-ack on success, auto-nack on error.
// AckManual: handler responsible for acking/nacking.
// AckForward: ack when all outputs ack.
AckStrategy AckStrategy
// ErrorHandler is called on processing errors (default: no-op).
// Errors are logged via Logger; use ErrorHandler for custom handling
// like metrics, alerting, or recovery logic.
ErrorHandler ErrorHandler
// Logger for engine events (default: slog.Default()).
Logger Logger
// ShutdownTimeout controls shutdown behavior on context cancellation.
// If <= 0, forces immediate shutdown (no grace period).
// If > 0, waits up to this duration for natural completion, then forces shutdown.
// On forced shutdown, remaining messages are drained and reported via ErrorHandler.
ShutdownTimeout time.Duration
}
EngineConfig configures the message engine.
type ErrorHandler ¶
ErrorHandler processes engine errors.
type EventTypeNaming ¶
EventTypeNaming derives CloudEvents event types from Go types.
var DefaultNaming EventTypeNaming = defaultNaming{}
DefaultNaming uses the Go type name as-is, without transformation. Example: OrderCreated → "OrderCreated"
var KebabNaming EventTypeNaming = kebabNaming{}
KebabNaming converts PascalCase to dot-separated lowercase. Example: OrderCreated → "order.created"
var SnakeNaming EventTypeNaming = snakeNaming{}
SnakeNaming converts PascalCase to underscore-separated lowercase. Example: OrderCreated → "order_created"
type FactoryMap ¶
FactoryMap is a simple InputRegistry for standalone use.
func (FactoryMap) NewInput ¶
func (m FactoryMap) NewInput(eventType string) any
type Handler ¶
type Handler interface {
// EventType returns the CE type this handler processes.
EventType() string
// NewInput creates a new instance for unmarshaling input data.
NewInput() any
// Handle processes a message and returns output messages.
Handle(ctx context.Context, msg *Message) ([]*Message, error)
}
Handler processes messages of a specific CE type.
func NewCommandHandler ¶
func NewCommandHandler[C, E any]( fn func(ctx context.Context, cmd C) ([]E, error), cfg CommandHandlerConfig, ) Handler
NewCommandHandler creates a handler that receives commands directly. Config provides Source and EventTypeNaming for deriving CE types. If Naming is nil, DefaultNaming is used.
func NewHandler ¶
func NewHandler[T any]( fn func(ctx context.Context, msg *Message) ([]*Message, error), naming EventTypeNaming, ) Handler
NewHandler creates a handler from a typed function. The generic type T is used for unmarshaling and event type derivation. EventTypeNaming derives EventType from T. If nil, DefaultNaming is used.
type InputRegistry ¶
InputRegistry creates typed instances for unmarshaling.
type JSONMarshaler ¶
type JSONMarshaler struct{}
JSONMarshaler implements Marshaler using JSON encoding.
func NewJSONMarshaler ¶
func NewJSONMarshaler() *JSONMarshaler
NewJSONMarshaler creates a new JSON marshaler.
func (*JSONMarshaler) DataContentType ¶
func (m *JSONMarshaler) DataContentType() string
DataContentType returns "application/json".
type Logger ¶
type Logger interface {
// Debug logs a message at debug level.
Debug(msg string, args ...any)
// Info logs a message at info level.
Info(msg string, args ...any)
// Warn logs a message at warning level.
Warn(msg string, args ...any)
// Error logs a message at error level.
Error(msg string, args ...any)
}
Logger defines an interface for logging at different severity levels.
type MarshalPipe ¶ added in v0.16.0
type MarshalPipe struct {
// contains filtered or unexported fields
}
MarshalPipe converts Message to RawMessage using a marshaler. Automatically nacks messages on errors and provides consistent logging.
func NewMarshalPipe ¶
func NewMarshalPipe(marshaler Marshaler, cfg PipeConfig) *MarshalPipe
NewMarshalPipe creates a pipe that marshals Message to RawMessage. Messages are automatically nacked on marshal failures.
func (*MarshalPipe) Pipe ¶ added in v0.16.0
func (p *MarshalPipe) Pipe(ctx context.Context, in <-chan *Message) (<-chan *RawMessage, error)
Pipe starts the marshal pipeline.
func (*MarshalPipe) Use ¶ added in v0.17.1
func (p *MarshalPipe) Use(mw ...middleware.Middleware[*Message, *RawMessage]) error
Use adds middleware to the marshal processing chain. Middleware is applied in the order it is added. Returns ErrAlreadyStarted if the pipe has already been started.
type Marshaler ¶
type Marshaler interface {
// Marshal encodes a value to bytes.
Marshal(v any) ([]byte, error)
// Unmarshal decodes bytes into a value.
Unmarshal(data []byte, v any) error
// DataContentType returns the CloudEvents datacontenttype attribute value.
// Example: "application/json"
DataContentType() string
}
Marshaler handles serialization and deserialization of message data.
type Matcher ¶
type Matcher interface {
Match(attrs Attributes) bool
}
Matcher tests whether attributes match a condition. Used for input filtering and output routing. Operates on Attributes only (not full Message) to work with both Message and RawMessage without wrapper allocation.
type Merger ¶ added in v0.16.0
type Merger struct {
// contains filtered or unexported fields
}
Merger combines multiple message input channels into a single output. Automatically nacks messages on errors and provides consistent logging.
func NewMerger ¶ added in v0.16.0
func NewMerger(cfg MergerConfig) *Merger
NewMerger creates a new message merger. Messages are automatically nacked on merge failures (e.g., shutdown timeout).
type MergerConfig ¶ added in v0.16.0
type MergerConfig struct {
// BufferSize is the output channel buffer size (default: 100).
BufferSize int
// ShutdownTimeout controls shutdown behavior on context cancellation.
// If <= 0, forces immediate shutdown (no grace period).
// If > 0, waits up to this duration for natural completion, then forces shutdown.
ShutdownTimeout time.Duration
// Logger for merger events (default: slog.Default()).
Logger Logger
// ErrorHandler is called on merge errors after auto-nack (optional).
ErrorHandler ErrorHandler
}
MergerConfig configures the message merger.
type Message ¶
type Message = TypedMessage[any]
Message is the internal message type used by handlers and middleware. Data holds any typed payload after unmarshaling from RawMessage.
func FromContext ¶ added in v0.17.0
FromContext is an alias for MessageFromContext.
func MessageFromContext ¶ added in v0.17.0
MessageFromContext retrieves the Message from context. Returns nil if no message is present.
type Middleware ¶
type Middleware func(ProcessFunc) ProcessFunc
Middleware wraps a ProcessFunc to add cross-cutting concerns.
type PipeConfig ¶ added in v0.16.0
type PipeConfig struct {
// Pool configures concurrency and buffering (default: 1 worker, 100 buffer).
Pool PoolConfig
// ProcessTimeout sets a per-message processing deadline (default: 0, no timeout).
// If > 0, each handler invocation is wrapped with a timeout context.
// During normal operation, handlers are cancelled if they exceed ProcessTimeout.
// During shutdown grace period, handlers continue with their ProcessTimeout.
// On forced shutdown (grace period expired), handlers are cancelled immediately.
ProcessTimeout time.Duration
// ShutdownTimeout controls shutdown behavior on context cancellation.
// If <= 0, forces immediate shutdown (no grace period).
// If > 0, waits up to this duration for natural completion, then forces shutdown.
ShutdownTimeout time.Duration
// AckStrategy determines how messages are acknowledged (default: AckOnSuccess).
// AckOnSuccess: auto-ack on success, auto-nack on error.
// AckManual: handler responsible for acking/nacking.
// AckForward: ack when all outputs ack.
AckStrategy AckStrategy
// Logger for pipe events (default: slog.Default()).
Logger Logger
// ErrorHandler is called on processing errors (default: no-op, errors logged via Logger).
ErrorHandler ErrorHandler
}
PipeConfig configures process pipes (Router, UnmarshalPipe, MarshalPipe).
type PoolConfig ¶ added in v0.14.0
type PoolConfig struct {
// Workers is the number of concurrent workers (default: 1).
Workers int
// BufferSize is the output channel buffer size (default: 100).
BufferSize int
}
PoolConfig configures concurrency and buffering for process pipes.
type ProcessFunc ¶
ProcessFunc is the message processing function signature.
type RawMessage ¶
type RawMessage = TypedMessage[[]byte]
RawMessage is the broker boundary message type with serialized []byte data. Used for pub/sub integrations (Kafka, RabbitMQ, NATS, etc.).
func NewRaw ¶
func NewRaw(data []byte, attrs Attributes, acking *Acking) *RawMessage
NewRaw creates a RawMessage for broker integration. Pass nil for attrs or acking if not needed.
func ParseRaw ¶ added in v0.12.0
func ParseRaw(r io.Reader) (*RawMessage, error)
ParseRaw parses CloudEvents structured JSON from r into a RawMessage. Handles both data and data_base64 fields per CloudEvents spec.
func RawMessageFromContext ¶ added in v0.17.0
func RawMessageFromContext(ctx context.Context) *RawMessage
RawMessageFromContext retrieves the RawMessage from context. Returns nil if no raw message is present.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router dispatches messages to handlers by CE type. Implements Pipe signature for composability with pipe.Apply(). Uses pipe.ProcessPipe internally for middleware, concurrency, and error handling.
func (*Router) AddHandler ¶
AddHandler registers a handler. The optional matcher is applied after type matching.
func (*Router) NewInput ¶
NewInput creates a typed instance for unmarshaling. Implements InputRegistry.
func (*Router) Pipe ¶
Pipe routes messages to handlers and returns outputs. Signature matches pipe.Pipe[*Message, *Message] for composability.
Built-in middleware applied automatically (innermost, closest to handler):
- Acking: handles ack/nack based on AckStrategy (default: AckOnSuccess)
User middleware via Use() wraps outside the acking middleware.
func (*Router) Use ¶
func (r *Router) Use(m ...Middleware) error
Use registers middleware to wrap message processing. Middleware is applied in order: first registered wraps outermost. Must be called before Pipe().
type TypedMessage ¶
type TypedMessage[T any] struct { // Data is the event payload per CloudEvents spec. Data T // Attributes contains the context attributes per CloudEvents spec. Attributes Attributes // contains filtered or unexported fields }
TypedMessage wraps a typed data payload with attributes and acknowledgment callbacks. This is the base generic type for all message variants. Ack/Nack operations are mutually exclusive and idempotent.
func Copy ¶
func Copy[In, Out any](msg *TypedMessage[In], data Out) *TypedMessage[Out]
Copy creates a new message with different data while preserving attributes (cloned) and acknowledgment callbacks (shared).
func NewTyped ¶
func NewTyped[T any](data T, attrs Attributes, acking *Acking) *TypedMessage[T]
NewTyped creates a generic typed message. Pass nil for attrs or acking if not needed.
func (*TypedMessage[T]) Ack ¶
func (m *TypedMessage[T]) Ack() bool
Ack acknowledges successful processing of the message. Returns true if acknowledgment succeeded or was already performed. Returns false if no ack callback was provided or if the message was already nacked. The ack callback is invoked at most once when all stages have acked. Thread-safe. Callbacks are invoked outside the mutex to prevent deadlocks.
func (*TypedMessage[T]) Context ¶ added in v0.17.0
func (m *TypedMessage[T]) Context(parent context.Context) context.Context
Context returns a context derived from parent with message-specific behavior.
The context provides:
- Deadline from minimum of parent deadline and message ExpiryTime
- Message reference via MessageFromContext or RawMessageFromContext
- Attributes via AttributesFromContext
- Parent cancellation propagation
Note: This method reports the deadline via ctx.Deadline() but does not create timers or enforce the deadline. Use middleware.Deadline() for enforcement with proper timer cleanup.
For settlement detection (ack/nack), use msg.Done() directly. This keeps context cancellation (lifecycle) separate from message settlement (domain logic).
func (*TypedMessage[T]) CorrelationID ¶ added in v0.13.0
func (m *TypedMessage[T]) CorrelationID() string
CorrelationID returns the correlation ID extension. Returns empty string if not set.
func (*TypedMessage[T]) DataContentType ¶ added in v0.12.0
func (m *TypedMessage[T]) DataContentType() string
DataContentType returns the data content type. Returns empty string if not set.
func (*TypedMessage[T]) DataSchema ¶ added in v0.12.0
func (m *TypedMessage[T]) DataSchema() string
DataSchema returns the data schema URI. Returns empty string if not set.
func (*TypedMessage[T]) Done ¶ added in v0.17.0
func (m *TypedMessage[T]) Done() <-chan struct{}
Done returns a channel that is closed when the message is settled (acked or nacked). Returns nil if no acking is set.
func (*TypedMessage[T]) Err ¶ added in v0.17.0
func (m *TypedMessage[T]) Err() error
Err returns the error from Nack, or nil if pending or acked. Use with Done() to check settlement status, similar to context.Context:
select {
case <-msg.Done():
if err := msg.Err(); err != nil {
// nacked with error
} else {
// acked successfully
}
default:
// still pending
}
func (*TypedMessage[T]) ExpiryTime ¶ added in v0.13.0
func (m *TypedMessage[T]) ExpiryTime() time.Time
ExpiryTime returns the expiry time extension. Returns zero time if not set or invalid.
func (*TypedMessage[T]) ID ¶ added in v0.12.0
func (m *TypedMessage[T]) ID() string
ID returns the event identifier. Returns empty string if not set.
func (*TypedMessage[T]) MarshalJSON ¶ added in v0.12.0
func (m *TypedMessage[T]) MarshalJSON() ([]byte, error)
MarshalJSON implements json.Marshaler. Returns the message as CloudEvents structured JSON.
func (*TypedMessage[T]) Nack ¶
func (m *TypedMessage[T]) Nack(err error) bool
Nack negatively acknowledges the message due to a processing error. Returns true if negative acknowledgment succeeded or was already performed. Returns false if no nack callback was provided or if the message was already acked. The nack callback is invoked immediately with the first error, permanently blocking all further acks. Also closes the done channel, allowing sibling messages to detect the settlement. Thread-safe. Callbacks are invoked outside the mutex to prevent deadlocks.
func (*TypedMessage[T]) Source ¶ added in v0.12.0
func (m *TypedMessage[T]) Source() string
Source returns the event source. Returns empty string if not set.
func (*TypedMessage[T]) SpecVersion ¶ added in v0.12.0
func (m *TypedMessage[T]) SpecVersion() string
SpecVersion returns the CloudEvents spec version. Returns empty string if not set.
func (*TypedMessage[T]) String ¶
func (m *TypedMessage[T]) String() string
String implements fmt.Stringer. Returns the message as CloudEvents structured JSON.
func (*TypedMessage[T]) Subject ¶ added in v0.12.0
func (m *TypedMessage[T]) Subject() string
Subject returns the event subject. Returns empty string if not set.
func (*TypedMessage[T]) Time ¶ added in v0.12.0
func (m *TypedMessage[T]) Time() time.Time
Time returns the event timestamp. Returns zero time if not set or invalid.
func (*TypedMessage[T]) Type ¶ added in v0.12.0
func (m *TypedMessage[T]) Type() string
Type returns the event type. Returns empty string if not set.
type UnmarshalPipe ¶ added in v0.16.0
type UnmarshalPipe struct {
// contains filtered or unexported fields
}
UnmarshalPipe converts RawMessage to Message using a registry and marshaler. Automatically nacks messages on errors and provides consistent logging.
func NewUnmarshalPipe ¶
func NewUnmarshalPipe(registry InputRegistry, marshaler Marshaler, cfg PipeConfig) *UnmarshalPipe
NewUnmarshalPipe creates a pipe that unmarshals RawMessage to Message. Uses registry to create typed instances for unmarshaling. Messages are automatically nacked on unmarshal failures.
func (*UnmarshalPipe) Pipe ¶ added in v0.16.0
func (p *UnmarshalPipe) Pipe(ctx context.Context, in <-chan *RawMessage) (<-chan *Message, error)
Pipe starts the unmarshal pipeline.
func (*UnmarshalPipe) Use ¶ added in v0.17.1
func (p *UnmarshalPipe) Use(mw ...middleware.Middleware[*RawMessage, *Message]) error
Use adds middleware to the unmarshal processing chain. Middleware is applied in the order it is added. Returns ErrAlreadyStarted if the pipe has already been started.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package cloudevents provides integration between gopipe's message.Engine and the CloudEvents SDK protocol bindings.
|
Package cloudevents provides integration between gopipe's message.Engine and the CloudEvents SDK protocol bindings. |
|
Package http provides HTTP pub/sub for CloudEvents using standard library net/http.
|
Package http provides HTTP pub/sub for CloudEvents using standard library net/http. |