Documentation
¶
Overview ¶
Package dispatch provides a flexible message routing framework for event-driven systems.
The dispatch package routes messages from multiple sources (EventBridge, SNS, Step Functions, Kinesis, or custom formats) to typed handlers. It handles envelope parsing, payload unmarshaling, validation, and completion semantics — letting you focus on business logic.
Quick Start ¶
Define a handler for your event type:
type UserCreatedHandler struct {
onboarding Onboarding
}
type UserCreatedPayload struct {
UserID string `json:"user_id"`
Email string `json:"email"`
}
func (h *UserCreatedHandler) Handle(ctx context.Context, p UserCreatedPayload) error {
return h.onboarding.RegisterUser(ctx, p.UserID, p.Email)
}
Create a router, add sources, and register handlers:
r := dispatch.New()
r.AddSource(myEventBridgeSource)
dispatch.Register(r, "user/created", &UserCreatedHandler{onboarding})
// Process messages
err := r.Process(ctx, rawMessageBytes)
Design Philosophy ¶
The package separates concerns into three layers:
- Sources: Parse raw bytes and extract routing keys + payloads
- Router: Matches keys to handlers, orchestrates the dispatch flow
- Handlers: Pure business logic with typed payloads
This separation allows:
- Multiple message formats on a single queue
- Transport-agnostic handler code
- Consistent observability via hooks
- Easy testing with mock sources
Discriminator Pattern ¶
Sources implement a two-phase matching strategy for efficient routing:
- Discriminator: Cheap field presence/value checks
- Parse: Full envelope parsing only after discriminator matches
This avoids expensive JSON parsing when messages don't match a source, and enables O(1) hot-path matching via adaptive ordering (the last successful source is tried first on subsequent messages).
func (s *mySource) Discriminator() dispatch.Discriminator {
return dispatch.And(
dispatch.HasFields("source", "detail-type"),
dispatch.FieldEquals("source", "my.service"),
)
}
Composable discriminators are provided:
- HasFields: Check for field presence
- FieldEquals: Check field value
- And: All discriminators must match
- Or: Any discriminator must match
Inspector and View ¶
The Inspector/View abstraction enables format-agnostic field access:
type Inspector interface {
Inspect(raw []byte) (View, error)
}
type View interface {
HasField(path string) bool
GetString(path string) (string, bool)
GetBytes(path string) ([]byte, bool)
}
By default, the router uses JSONInspector for all sources. For mixed formats (e.g., JSON and protobuf), use AddGroup with a custom inspector:
r := dispatch.New() r.AddSource(jsonSource) // Uses default JSON inspector r.AddGroup(protoInspector, grpcSource, kafkaSource) // Custom inspector
Sources ¶
A Source parses raw message bytes and returns routing information:
type Source interface {
Name() string
Discriminator() Discriminator
Parse(raw []byte) (Parsed, bool)
}
Sources are matched using their Discriminator, then parsed in registration order. The first source whose discriminator matches and Parse succeeds handles the message.
The Parsed struct contains:
- Key: routing key to match against registered handlers
- Payload: raw JSON to unmarshal into the handler's type
- Complete: optional callback for completion semantics (e.g., Step Functions)
Example source implementation:
type mySource struct{}
func (s *mySource) Name() string { return "my-source" }
func (s *mySource) Discriminator() dispatch.Discriminator {
return dispatch.HasFields("type", "payload")
}
func (s *mySource) Parse(raw []byte) (dispatch.Parsed, bool) {
var env struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
}
if err := json.Unmarshal(raw, &env); err != nil || env.Type == "" {
return dispatch.Parsed{}, false
}
return dispatch.Parsed{
Key: env.Type,
Payload: env.Payload,
}, true
}
Use SourceFunc for simple sources without a struct:
r.AddSource(dispatch.SourceFunc("custom", dispatch.HasFields("event"), parseFunc))
Handlers ¶
Handlers implement the Handler interface with a typed payload:
type Handler[T any] interface {
Handle(ctx context.Context, payload T) error
}
The router automatically:
- Unmarshals the JSON payload to the handler's type
- Validates the payload if it implements validation.Validatable
- Calls the handler with the typed payload
Use HandlerFunc for simple cases without a struct:
dispatch.RegisterFunc(r, "ping", func(ctx context.Context, p PingPayload) error {
return nil
})
Hooks ¶
Hooks provide observability without coupling to specific logging or metrics systems. Use functional options to configure hooks:
r := dispatch.New(
dispatch.WithOnParse(func(ctx context.Context, source, key string) context.Context {
return logx.WithCtx(ctx, slog.String("source", source), slog.String("key", key))
}),
dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
metrics.Timing("dispatch.success", d, "source:"+source)
}),
dispatch.WithOnFailure(func(ctx context.Context, source, key string, err error, d time.Duration) {
metrics.Incr("dispatch.error", "source:"+source)
}),
)
Available hooks:
- WithOnParse: Called after parsing, enriches context
- WithOnDispatch: Called just before handler executes
- WithOnSuccess: Called after handler succeeds
- WithOnFailure: Called after handler fails
- WithOnNoSource: Called when no source matches
- WithOnNoHandler: Called when no handler is registered
- WithOnUnmarshalError: Called on JSON unmarshal errors
- WithOnValidationError: Called on validation errors
Multiple hooks of the same type are called in order.
Source-Specific Hooks ¶
Sources can implement optional hook interfaces to add source-specific behavior. These hooks run after global hooks, and both are always called:
type OnParseHook interface {
OnParse(ctx context.Context, key string) context.Context
}
type OnSuccessHook interface {
OnSuccess(ctx context.Context, key string, duration time.Duration)
}
type OnFailureHook interface {
OnFailure(ctx context.Context, key string, err error, duration time.Duration)
}
For error-returning hooks, if either global or source returns an error, that error is returned. This allows sources to override global skip/fail policies.
Completion Callbacks ¶
Sources can provide a Complete callback in Parsed for transport-specific completion semantics. For example, Step Functions requires SendTaskSuccess or SendTaskFailure after processing:
func (s *sfnSource) Parse(raw []byte) (dispatch.Parsed, bool) {
// ... parse envelope ...
return dispatch.Parsed{
Key: taskType,
Payload: payload,
Complete: func(ctx context.Context, err error) error {
if err != nil {
return s.sfn.SendTaskFailure(...)
}
return s.sfn.SendTaskSuccess(...)
},
}, true
}
Validation ¶
Payloads that implement validation.Validatable are automatically validated after unmarshaling:
type UserPayload struct {
UserID string `json:"user_id"`
Email string `json:"email"`
}
func (p *UserPayload) Validate() error {
return validation.ValidateStruct(p,
validation.Field(&p.UserID, validation.Required),
validation.Field(&p.Email, validation.Required, is.Email),
)
}
Validation errors trigger the OnValidationError hook.
Error Handling ¶
The OnNoSource, OnNoHandler, OnUnmarshalError, and OnValidationError hooks control what happens when errors occur:
- Return nil to skip the message (it goes to DLQ if configured)
- Return an error to fail (message retries based on queue configuration)
By default, all errors cause failures. Override with hooks to skip bad messages:
r := dispatch.New(
dispatch.WithOnUnmarshalError(func(ctx context.Context, source, key string, err error) error {
logger.Error("bad payload", "error", err)
return nil // skip to DLQ, don't retry
}),
)
Thread Safety ¶
Router is safe for concurrent use after configuration is complete. Do not call AddSource, AddGroup, or Register after calling Process.
Example ¶
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/bjaus/dispatch"
)
// UserCreatedPayload is the payload for user/created events.
type UserCreatedPayload struct {
UserID string `json:"user_id"`
Email string `json:"email"`
}
// UserCreatedHandler handles user/created events.
type UserCreatedHandler struct{}
func (h *UserCreatedHandler) Handle(ctx context.Context, p UserCreatedPayload) error {
fmt.Printf("User created: %s (%s)\n", p.UserID, p.Email)
return nil
}
// simpleSource is a minimal source implementation for examples.
type simpleSource struct{}
func (s *simpleSource) Name() string { return "simple" }
func (s *simpleSource) Discriminator() dispatch.Discriminator {
return dispatch.HasFields("type", "payload")
}
func (s *simpleSource) Parse(raw []byte) (dispatch.Parsed, bool) {
var env struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
}
if err := json.Unmarshal(raw, &env); err != nil || env.Type == "" {
return dispatch.Parsed{}, false
}
return dispatch.Parsed{
Key: env.Type,
Payload: env.Payload,
}, true
}
func main() {
// Create router with hooks
r := dispatch.New(
dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
log.Printf("[%s] %s succeeded (%v)", source, key, d)
}),
dispatch.WithOnFailure(func(ctx context.Context, source, key string, err error, d time.Duration) {
log.Printf("[%s] %s failed: %v (%v)", source, key, err, d)
}),
)
// Add source
r.AddSource(&simpleSource{})
// Register handler
dispatch.Register(r, "user/created", &UserCreatedHandler{})
// Process a message
msg := []byte(`{"type": "user/created", "payload": {"user_id": "123", "email": "test@example.com"}}`)
if err := r.Process(context.Background(), msg); err != nil {
log.Fatal(err)
}
}
Output: User created: 123 (test@example.com)
Example (Completion) ¶
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/bjaus/dispatch"
)
// completionSource demonstrates a source with completion callback.
type completionSource struct{}
func (s *completionSource) Name() string { return "completion" }
func (s *completionSource) Discriminator() dispatch.Discriminator {
return dispatch.HasFields("task", "token", "payload")
}
func (s *completionSource) Parse(raw []byte) (dispatch.Parsed, bool) {
var env struct {
Task string `json:"task"`
Token string `json:"token"`
Payload json.RawMessage `json:"payload"`
}
if err := json.Unmarshal(raw, &env); err != nil || env.Token == "" {
return dispatch.Parsed{}, false
}
return dispatch.Parsed{
Key: env.Task,
Payload: env.Payload,
Complete: func(ctx context.Context, err error) error {
if err != nil {
fmt.Printf("Task %s failed: %v\n", env.Token, err)
} else {
fmt.Printf("Task %s succeeded\n", env.Token)
}
return nil
},
}, true
}
func main() {
r := dispatch.New()
r.AddSource(&completionSource{})
dispatch.RegisterFunc(r, "process", func(ctx context.Context, p struct{ Value int }) error {
fmt.Println("Processing value:", p.Value)
return nil
})
msg := []byte(`{"task": "process", "token": "abc123", "payload": {"value": 42}}`)
_ = r.Process(context.Background(), msg)
}
Output: Processing value: 42 Task abc123 succeeded
Example (HandlerFunc) ¶
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/bjaus/dispatch"
)
// simpleSource is a minimal source implementation for examples.
type simpleSource struct{}
func (s *simpleSource) Name() string { return "simple" }
func (s *simpleSource) Discriminator() dispatch.Discriminator {
return dispatch.HasFields("type", "payload")
}
func (s *simpleSource) Parse(raw []byte) (dispatch.Parsed, bool) {
var env struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
}
if err := json.Unmarshal(raw, &env); err != nil || env.Type == "" {
return dispatch.Parsed{}, false
}
return dispatch.Parsed{
Key: env.Type,
Payload: env.Payload,
}, true
}
func main() {
r := dispatch.New()
r.AddSource(&simpleSource{})
// Register with a function instead of a struct
dispatch.RegisterFunc(r, "ping", func(ctx context.Context, p struct{ Message string }) error {
fmt.Println("Ping:", p.Message)
return nil
})
msg := []byte(`{"type": "ping", "payload": {"message": "hello"}}`)
_ = r.Process(context.Background(), msg)
}
Output: Ping: hello
Example (MultipleHooks) ¶
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/bjaus/dispatch"
)
// simpleSource is a minimal source implementation for examples.
type simpleSource struct{}
func (s *simpleSource) Name() string { return "simple" }
func (s *simpleSource) Discriminator() dispatch.Discriminator {
return dispatch.HasFields("type", "payload")
}
func (s *simpleSource) Parse(raw []byte) (dispatch.Parsed, bool) {
var env struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
}
if err := json.Unmarshal(raw, &env); err != nil || env.Type == "" {
return dispatch.Parsed{}, false
}
return dispatch.Parsed{
Key: env.Type,
Payload: env.Payload,
}, true
}
func main() {
// Pass multiple hooks to New
r := dispatch.New(
dispatch.WithOnDispatch(func(ctx context.Context, source, key string) {
fmt.Printf("Processing %s from %s\n", key, source)
}),
dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
fmt.Printf("Metric: %s.%s.success\n", source, key)
}),
)
r.AddSource(&simpleSource{})
dispatch.RegisterFunc(r, "test", func(ctx context.Context, p struct{}) error {
return nil
})
msg := []byte(`{"type": "test", "payload": {}}`)
_ = r.Process(context.Background(), msg)
}
Output: Processing test from simple Metric: simple.test.success
Example (SkipBadMessages) ¶
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/bjaus/dispatch"
)
// simpleSource is a minimal source implementation for examples.
type simpleSource struct{}
func (s *simpleSource) Name() string { return "simple" }
func (s *simpleSource) Discriminator() dispatch.Discriminator {
return dispatch.HasFields("type", "payload")
}
func (s *simpleSource) Parse(raw []byte) (dispatch.Parsed, bool) {
var env struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
}
if err := json.Unmarshal(raw, &env); err != nil || env.Type == "" {
return dispatch.Parsed{}, false
}
return dispatch.Parsed{
Key: env.Type,
Payload: env.Payload,
}, true
}
func main() {
r := dispatch.New(
dispatch.WithOnNoHandler(func(ctx context.Context, source, key string) error {
fmt.Println("Skipping unknown event:", key)
return nil // return nil to skip, error to fail
}),
dispatch.WithOnUnmarshalError(func(ctx context.Context, source, key string, err error) error {
fmt.Println("Skipping bad payload:", err)
return nil // skip bad payloads
}),
)
r.AddSource(&simpleSource{})
// No handler registered for "unknown" - will be skipped
msg := []byte(`{"type": "unknown", "payload": {}}`)
err := r.Process(context.Background(), msg)
fmt.Println("Error:", err)
}
Output: Skipping unknown event: unknown Error: <nil>
Example (SourceFunc) ¶
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/bjaus/dispatch"
)
func main() {
r := dispatch.New()
// Use SourceFunc for simple sources
r.AddSource(dispatch.SourceFunc("custom", dispatch.HasFields("event", "data"), func(raw []byte) (dispatch.Parsed, bool) {
var env struct {
Event string `json:"event"`
Data json.RawMessage `json:"data"`
}
if err := json.Unmarshal(raw, &env); err != nil || env.Event == "" {
return dispatch.Parsed{}, false
}
return dispatch.Parsed{Key: env.Event, Payload: env.Data}, true
}))
dispatch.RegisterFunc(r, "hello", func(ctx context.Context, p struct{ Name string }) error {
fmt.Println("Hello,", p.Name)
return nil
})
msg := []byte(`{"event": "hello", "data": {"name": "World"}}`)
_ = r.Process(context.Background(), msg)
}
Output: Hello, World
Index ¶
- Variables
- func Register[T any](r *Router, key string, h Handler[T])
- func RegisterFunc[T any](r *Router, key string, fn func(ctx context.Context, payload T) error)
- type Discriminator
- type Handler
- type HandlerFunc
- type Inspector
- type OnDispatchFunc
- type OnDispatchHook
- type OnFailureFunc
- type OnFailureHook
- type OnNoHandlerFunc
- type OnNoHandlerHook
- type OnNoSourceFunc
- type OnParseFunc
- type OnParseHook
- type OnSuccessFunc
- type OnSuccessHook
- type OnUnmarshalErrorFunc
- type OnUnmarshalErrorHook
- type OnValidationErrorFunc
- type OnValidationErrorHook
- type Option
- func WithInspector(i Inspector) Option
- func WithOnDispatch(fn OnDispatchFunc) Option
- func WithOnFailure(fn OnFailureFunc) Option
- func WithOnNoHandler(fn OnNoHandlerFunc) Option
- func WithOnNoSource(fn OnNoSourceFunc) Option
- func WithOnParse(fn OnParseFunc) Option
- func WithOnSuccess(fn OnSuccessFunc) Option
- func WithOnUnmarshalError(fn OnUnmarshalErrorFunc) Option
- func WithOnValidationError(fn OnValidationErrorFunc) Option
- type Parsed
- type Router
- type Source
- type View
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidJSON = errors.New("invalid JSON")
ErrInvalidJSON is returned when the input is not valid JSON.
Functions ¶
func Register ¶
Register adds a handler for a routing key. The key must match the Key field returned by a source's Parse method.
This is a package-level function (not a method) due to Go generics limitations: methods cannot have type parameters independent of the receiver.
Example:
dispatch.Register(r, "user/created", &UserCreatedHandler{db: db})
dispatch.Register(r, "user/deleted", &UserDeletedHandler{db: db})
Types ¶
type Discriminator ¶
Discriminator determines if a source should handle a message based on the message content. Discriminators are cheap to evaluate compared to full parsing.
func And ¶
func And(ds ...Discriminator) Discriminator
And returns a Discriminator that matches when all discriminators match.
func FieldEquals ¶
func FieldEquals(path, value string) Discriminator
FieldEquals returns a Discriminator that matches when the path exists and equals the given string value.
func HasFields ¶
func HasFields(paths ...string) Discriminator
HasFields returns a Discriminator that matches when all paths exist.
func Or ¶
func Or(ds ...Discriminator) Discriminator
Or returns a Discriminator that matches when any discriminator matches.
type Handler ¶
Handler processes a typed payload. This is the primary interface to implement.
The type parameter T is the payload type that the handler expects. The router automatically unmarshals the raw JSON payload to this type and validates it if T implements validation.Validatable.
Example:
type UserCreatedHandler struct {
db *sql.DB
}
func (h *UserCreatedHandler) Handle(ctx context.Context, p UserCreatedPayload) error {
_, err := h.db.ExecContext(ctx, "INSERT INTO users ...", p.UserID, p.Email)
return err
}
type HandlerFunc ¶
HandlerFunc is a function adapter for Handler. Use this for simple handlers that don't need a struct:
dispatch.RegisterFunc(r, "ping", func(ctx context.Context, p PingPayload) error {
return nil
})
type Inspector ¶
Inspector examines raw bytes and returns a View for field queries. Different inspectors handle different formats (JSON, protobuf, etc.).
func JSONInspector ¶
func JSONInspector() Inspector
JSONInspector returns an Inspector that uses gjson for field access.
type OnDispatchFunc ¶
OnDispatchFunc is called just before the handler executes.
type OnDispatchHook ¶
OnDispatchHook is an optional interface that sources can implement to add source-specific pre-dispatch behavior. Called after global OnDispatch hooks.
type OnFailureFunc ¶
OnFailureFunc is called after the handler fails.
type OnFailureHook ¶
type OnFailureHook interface {
OnFailure(ctx context.Context, key string, err error, duration time.Duration)
}
OnFailureHook is an optional interface that sources can implement to add source-specific behavior on handler failure. Called after global OnFailure hooks.
type OnNoHandlerFunc ¶
OnNoHandlerFunc is called when no handler is registered for the routing key. Return nil to skip, return an error to fail.
type OnNoHandlerHook ¶
OnNoHandlerHook is an optional interface that sources can implement to add source-specific behavior when no handler is found. Called after global hooks; if either returns an error, that error is used.
type OnNoSourceFunc ¶
OnNoSourceFunc is called when no source can parse the message. Return nil to skip the message, return an error to fail.
type OnParseFunc ¶
OnParseFunc is called after a source successfully parses a message. Use this to enrich the context with logging fields or trace spans. The returned context is used for the rest of the request.
type OnParseHook ¶
OnParseHook is an optional interface that sources can implement to add source-specific context enrichment. Called after global OnParse hooks.
type OnSuccessFunc ¶
OnSuccessFunc is called after the handler completes successfully.
type OnSuccessHook ¶
OnSuccessHook is an optional interface that sources can implement to add source-specific behavior on handler success. Called after global OnSuccess hooks.
type OnUnmarshalErrorFunc ¶
OnUnmarshalErrorFunc is called when JSON unmarshaling fails. Return nil to skip, return an error to fail.
type OnUnmarshalErrorHook ¶
type OnUnmarshalErrorHook interface {
OnUnmarshalError(ctx context.Context, key string, err error) error
}
OnUnmarshalErrorHook is an optional interface that sources can implement to add source-specific behavior on unmarshal errors. Called after global hooks; if either returns an error, that error is used.
type OnValidationErrorFunc ¶
OnValidationErrorFunc is called when payload validation fails. Return nil to skip, return an error to fail.
type OnValidationErrorHook ¶
type OnValidationErrorHook interface {
OnValidationError(ctx context.Context, key string, err error) error
}
OnValidationErrorHook is an optional interface that sources can implement to add source-specific behavior on validation errors. Called after global hooks; if either returns an error, that error is used.
type Option ¶
type Option func(*Router)
Option configures Router behavior.
func WithInspector ¶
WithInspector sets the default inspector for sources added with AddSource.
func WithOnDispatch ¶
func WithOnDispatch(fn OnDispatchFunc) Option
WithOnDispatch adds a hook called just before the handler executes. Multiple hooks are called in order.
Example:
dispatch.WithOnDispatch(func(ctx context.Context, source, key string) {
logger.Info(ctx, "dispatching event", "key", key)
})
func WithOnFailure ¶
func WithOnFailure(fn OnFailureFunc) Option
WithOnFailure adds a hook called after the handler fails. Multiple hooks are called in order.
Example:
dispatch.WithOnFailure(func(ctx context.Context, source, key string, err error, d time.Duration) {
metrics.Incr("dispatch.failure", "source:"+source)
logger.Error(ctx, "handler failed", "error", err)
})
func WithOnNoHandler ¶
func WithOnNoHandler(fn OnNoHandlerFunc) Option
WithOnNoHandler adds a hook called when no handler is registered for the key. Return nil to skip, return an error to fail. Multiple hooks are called in order; first error wins.
Example:
dispatch.WithOnNoHandler(func(ctx context.Context, source, key string) error {
logger.Warn(ctx, "no handler", "key", key)
return nil // skip
})
func WithOnNoSource ¶
func WithOnNoSource(fn OnNoSourceFunc) Option
WithOnNoSource adds a hook called when no source can parse the message. Return nil to skip, return an error to fail. Multiple hooks are called in order; first error wins.
Example:
dispatch.WithOnNoSource(func(ctx context.Context, raw []byte) error {
logger.Warn(ctx, "unknown message format")
return nil // skip to DLQ
})
func WithOnParse ¶
func WithOnParse(fn OnParseFunc) Option
WithOnParse adds a hook called after a source successfully parses a message. Multiple hooks are called in order, with context chaining through each.
Example:
dispatch.WithOnParse(func(ctx context.Context, source, key string) context.Context {
return logx.WithCtx(ctx, slog.String("source", source))
})
func WithOnSuccess ¶
func WithOnSuccess(fn OnSuccessFunc) Option
WithOnSuccess adds a hook called after the handler completes successfully. Multiple hooks are called in order.
Example:
dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
metrics.Timing("dispatch.success", d, "source:"+source)
})
func WithOnUnmarshalError ¶
func WithOnUnmarshalError(fn OnUnmarshalErrorFunc) Option
WithOnUnmarshalError adds a hook called when JSON unmarshaling fails. Return nil to skip, return an error to fail. Multiple hooks are called in order; first error wins.
Example:
dispatch.WithOnUnmarshalError(func(ctx context.Context, source, key string, err error) error {
logger.Error(ctx, "bad payload", "error", err)
return nil // skip bad payloads
})
func WithOnValidationError ¶
func WithOnValidationError(fn OnValidationErrorFunc) Option
WithOnValidationError adds a hook called when payload validation fails. Return nil to skip, return an error to fail. Multiple hooks are called in order; first error wins.
Example:
dispatch.WithOnValidationError(func(ctx context.Context, source, key string, err error) error {
logger.Error(ctx, "validation failed", "error", err)
return nil // skip invalid payloads
})
type Parsed ¶
type Parsed struct {
// Key is the routing key used to find the handler.
// This is matched against keys passed to Register.
Key string
// Payload is the raw JSON to unmarshal into the handler's type.
Payload json.RawMessage
// Complete is called after the handler finishes, regardless of success or failure.
// Use this for transport-specific completion semantics like Step Functions
// SendTaskSuccess/SendTaskFailure.
//
// If Complete is nil, no completion callback is made.
// If Complete returns an error, that error is returned from Process.
// The err parameter is the handler's error (or nil on success).
Complete func(ctx context.Context, err error) error
}
Parsed contains the result of source parsing.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router dispatches messages to registered handlers based on routing keys.
Usage:
- Create a router with New
- Add sources with AddSource (or AddGroup for custom inspectors)
- Register handlers with Register
- Process messages with Process
Router is safe for concurrent use after configuration. Do not call AddSource, AddGroup, or Register after calling Process.
func New ¶
New creates a Router with the given options.
By default, the router uses JSONInspector for source matching. Use WithInspector to override.
Example:
r := dispatch.New(
dispatch.WithOnParse(func(ctx context.Context, source, key string) context.Context {
return logx.WithCtx(ctx, slog.String("source", source))
}),
dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
metrics.Timing("dispatch.success", d)
}),
)
func (*Router) AddGroup ¶
AddGroup registers sources with a custom inspector. Use this when you have sources that use a different message format (e.g., protobuf).
Groups are checked after the default group, in registration order.
Example:
r.AddGroup(protoInspector, grpcSource, kafkaSource)
func (*Router) AddSource ¶
AddSource registers a source to the default inspector group. Sources are matched using their Discriminator, then parsed in registration order.
Example:
r.AddSource(eventBridgeSource) r.AddSource(snsSource) r.AddSource(sfnSource)
func (*Router) Process ¶
Process parses the raw message, routes to the appropriate handler, and executes completion callbacks.
The processing flow:
- Use discriminators to find a matching source
- Parse the message with the matched source
- Look up the handler by the parsed routing key
- Unmarshal the payload to the handler's type
- Validate the payload if it implements Validatable
- Call the handler
- Call the source's Complete callback if provided
Hooks are called at appropriate points throughout this flow.
Example:
// In an SQS consumer
func (s *Subscriber) ProcessMessage(ctx context.Context, msg sqs.Message) error {
return s.router.Process(ctx, []byte(*msg.Body))
}
// In a Lambda handler
func handler(ctx context.Context, event json.RawMessage) error {
return router.Process(ctx, event)
}
type Source ¶
type Source interface {
// Name returns the source identifier for logging and metrics.
Name() string
// Discriminator returns a predicate for cheap message detection.
// The router calls this before Parse to avoid expensive parsing
// when the message format doesn't match.
Discriminator() Discriminator
// Parse attempts to parse raw bytes as this source's format.
// Returns the parsed result and true if successful, or false if this
// source does not recognize the message format.
Parse(raw []byte) (Parsed, bool)
}
Source parses raw message bytes and extracts routing information.
Sources are registered with Router.AddSource and matched using their Discriminator before Parse is called. This allows cheap detection before expensive parsing.
Implement Source to support different message formats:
- EventBridge events
- SNS notifications
- Step Functions task tokens
- Kinesis records
- Custom formats
Example:
type mySource struct{}
func (s *mySource) Name() string { return "my-source" }
func (s *mySource) Discriminator() dispatch.Discriminator {
return dispatch.HasFields("type", "payload")
}
func (s *mySource) Parse(raw []byte) (dispatch.Parsed, bool) {
var env struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
}
if err := json.Unmarshal(raw, &env); err != nil || env.Type == "" {
return dispatch.Parsed{}, false
}
return dispatch.Parsed{Key: env.Type, Payload: env.Payload}, true
}
func SourceFunc ¶
SourceFunc creates a Source from a name, discriminator, and parse function. Use this for simple sources that don't need a struct:
r.AddSource(dispatch.SourceFunc(
"legacy",
dispatch.HasFields("type", "payload"),
func(raw []byte) (dispatch.Parsed, bool) {
// parse logic
},
))
type View ¶
type View interface {
// HasField returns true if the path exists in the message.
HasField(path string) bool
// GetString returns the string value at path, or false if not found
// or not a string.
GetString(path string) (string, bool)
// GetBytes returns the raw bytes at path, or false if not found.
// For JSON, this returns the raw JSON value (including quotes for strings).
GetBytes(path string) ([]byte, bool)
}
View provides format-agnostic field access for discriminator matching.