dispatch

package module
v0.0.0-...-1a336ea Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2026 License: MIT Imports: 7 Imported by: 0

README

dispatch

Go Reference Go Report Card CI codecov

A flexible message routing framework for event-driven Go applications.

Features

  • Multi-Source Routing — Route messages from webhooks, message queues, or custom formats through a single processor
  • Discriminator Pattern — Cheap detection before expensive parsing for O(1) hot-path matching
  • Typed Handlers — Automatic JSON unmarshaling and validation with generics
  • Pluggable Hooks — Observability without coupling to specific logging or metrics systems
  • Completion Callbacks — Built-in support for async acknowledgment patterns
  • Format Agnostic — Inspector/View abstraction supports JSON, protobuf, or custom formats
  • Zero Allocation Matching — Uses gjson for efficient JSON field lookups

Installation

go get github.com/bjaus/dispatch

Requires Go 1.25 or later.

Quick Start

package main

import (
    "context"
    "encoding/json"
    "log"

    "github.com/bjaus/dispatch"
)

// Define your payload type
type UserCreatedPayload struct {
    UserID string `json:"user_id"`
    Email  string `json:"email"`
}

// Define your handler
type UserCreatedHandler struct{}

func (h *UserCreatedHandler) Handle(ctx context.Context, p UserCreatedPayload) error {
    log.Printf("User created: %s (%s)", p.UserID, p.Email)
    return nil
}

// Define a source to parse your message format
type mySource struct{}

func (s *mySource) Name() string { return "my-source" }

func (s *mySource) Discriminator() dispatch.Discriminator {
    return dispatch.HasFields("type", "payload")
}

func (s *mySource) Parse(raw []byte) (dispatch.Parsed, bool) {
    var env struct {
        Type    string          `json:"type"`
        Payload json.RawMessage `json:"payload"`
    }
    if err := json.Unmarshal(raw, &env); err != nil || env.Type == "" {
        return dispatch.Parsed{}, false
    }
    return dispatch.Parsed{Key: env.Type, Payload: env.Payload}, true
}

func main() {
    // Create router
    r := dispatch.New()

    // Add source
    r.AddSource(&mySource{})

    // Register handler
    dispatch.Register(r, "user/created", &UserCreatedHandler{})

    // Process a message
    msg := []byte(`{"type": "user/created", "payload": {"user_id": "123", "email": "test@example.com"}}`)
    if err := r.Process(context.Background(), msg); err != nil {
        log.Fatal(err)
    }
}

Architecture

The package separates concerns into three layers:

Layer Responsibility
Sources Parse raw bytes, extract routing key + payload
Router Match keys to handlers, orchestrate dispatch flow
Handlers Pure business logic with typed payloads
Discriminator Pattern

Sources implement a two-phase matching strategy:

  1. Discriminator — Cheap field presence/value checks using the Inspector/View abstraction
  2. Parse — Full envelope parsing only after discriminator matches

This avoids expensive parsing when messages don't match, and enables O(1) hot-path matching via adaptive ordering (last successful source is tried first).

func (s *mySource) Discriminator() dispatch.Discriminator {
    // Cheap check: does the message have these fields?
    return dispatch.And(
        dispatch.HasFields("source", "detail-type", "detail"),
        dispatch.FieldEquals("source", "my.service"),
    )
}
Inspector Groups

By default, all sources use the JSON inspector. For mixed formats (e.g., JSON + protobuf), use groups:

r := dispatch.New()

// Default group uses JSON inspector
r.AddSource(webhookSource)
r.AddSource(apiSource)

// Custom group for protobuf messages
r.AddGroup(protoInspector, grpcSource, kafkaSource)

Discriminators

Composable predicates for source matching:

// Check field presence
dispatch.HasFields("type", "payload")

// Check field value
dispatch.FieldEquals("source", "aws.events")

// Combine with And/Or
dispatch.And(
    dispatch.HasFields("detail-type"),
    dispatch.Or(
        dispatch.FieldEquals("source", "service.a"),
        dispatch.FieldEquals("source", "service.b"),
    ),
)

Hooks

Add observability without coupling to specific systems:

r := dispatch.New(
    dispatch.WithOnParse(func(ctx context.Context, source, key string) context.Context {
        slog.InfoContext(ctx, "parsing message", "source", source, "key", key)
        return ctx
    }),
    dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
        slog.InfoContext(ctx, "handler succeeded", "source", source, "key", key, "duration", d)
    }),
    dispatch.WithOnFailure(func(ctx context.Context, source, key string, err error, d time.Duration) {
        slog.ErrorContext(ctx, "handler failed", "source", source, "key", key, "error", err, "duration", d)
    }),
)
Available Hooks
Hook Called When
WithOnParse After source parses message (enriches context)
WithOnDispatch Just before handler executes
WithOnSuccess After handler succeeds
WithOnFailure After handler fails
WithOnNoSource No source matches the message
WithOnNoHandler No handler registered for key
WithOnUnmarshalError JSON unmarshal fails
WithOnValidationError Payload validation fails
Source-Specific Hooks

Sources can implement hook interfaces for source-specific behavior:

type OnParseHook interface {
    OnParse(ctx context.Context, key string) context.Context
}

type OnSuccessHook interface {
    OnSuccess(ctx context.Context, key string, duration time.Duration)
}

Completion Callbacks

For transports that require explicit acknowledgment after processing:

func (s *taskSource) Parse(raw []byte) (dispatch.Parsed, bool) {
    var env struct {
        TaskID  string          `json:"task_id"`
        Type    string          `json:"type"`
        Payload json.RawMessage `json:"payload"`
    }
    if err := json.Unmarshal(raw, &env); err != nil {
        return dispatch.Parsed{}, false
    }
    return dispatch.Parsed{
        Key:     env.Type,
        Payload: env.Payload,
        Complete: func(ctx context.Context, err error) error {
            if err != nil {
                return s.taskQueue.ReportFailure(ctx, env.TaskID, err)
            }
            return s.taskQueue.ReportSuccess(ctx, env.TaskID)
        },
    }, true
}

Validation

Payloads implementing Validate() error are automatically validated:

type UserPayload struct {
    UserID string `json:"user_id"`
    Email  string `json:"email"`
}

func (p *UserPayload) Validate() error {
    if p.UserID == "" {
        return errors.New("user_id is required")
    }
    if p.Email == "" {
        return errors.New("email is required")
    }
    return nil
}

Works with any validation library (ozzo-validation, go-playground/validator, etc.) as long as your payload has a Validate() error method.

Error Handling

Error hooks control skip vs. fail behavior:

r := dispatch.New(
    // Skip unknown events (go to DLQ)
    dispatch.WithOnNoHandler(func(ctx context.Context, source, key string) error {
        log.Printf("skipping unknown event: %s", key)
        return nil // nil = skip, error = fail
    }),

    // Skip malformed payloads
    dispatch.WithOnUnmarshalError(func(ctx context.Context, source, key string, err error) error {
        log.Printf("bad payload: %v", err)
        return nil
    }),
)

Integration Patterns

HTTP Webhook Handler
func webhookHandler(w http.ResponseWriter, r *http.Request) {
    body, _ := io.ReadAll(r.Body)
    if err := router.Process(r.Context(), body); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    w.WriteHeader(http.StatusOK)
}
Message Queue Consumer
func consume(ctx context.Context, queue MessageQueue) error {
    for {
        msg, err := queue.Receive(ctx)
        if err != nil {
            return err
        }
        if err := router.Process(ctx, msg.Body); err != nil {
            msg.Nack() // retry later
            continue
        }
        msg.Ack()
    }
}
Kafka Consumer
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        if err := c.router.Process(session.Context(), msg.Value); err != nil {
            slog.Error("processing failed", "error", err, "topic", msg.Topic)
            continue
        }
        session.MarkMessage(msg, "")
    }
    return nil
}

Testing

go test -v ./...

License

MIT License - see LICENSE for details.

Documentation

Overview

Package dispatch provides a flexible message routing framework for event-driven systems.

The dispatch package routes messages from multiple sources (EventBridge, SNS, Step Functions, Kinesis, or custom formats) to typed handlers. It handles envelope parsing, payload unmarshaling, validation, and completion semantics — letting you focus on business logic.

Quick Start

Define a handler for your event type:

type UserCreatedHandler struct {
    onboarding Onboarding
}

type UserCreatedPayload struct {
    UserID string `json:"user_id"`
    Email  string `json:"email"`
}

func (h *UserCreatedHandler) Handle(ctx context.Context, p UserCreatedPayload) error {
    return h.onboarding.RegisterUser(ctx, p.UserID, p.Email)
}

Create a router, add sources, and register handlers:

r := dispatch.New()

r.AddSource(myEventBridgeSource)

dispatch.Register(r, "user/created", &UserCreatedHandler{onboarding})

// Process messages
err := r.Process(ctx, rawMessageBytes)

Design Philosophy

The package separates concerns into three layers:

  • Sources: Parse raw bytes and extract routing keys + payloads
  • Router: Matches keys to handlers, orchestrates the dispatch flow
  • Handlers: Pure business logic with typed payloads

This separation allows:

  • Multiple message formats on a single queue
  • Transport-agnostic handler code
  • Consistent observability via hooks
  • Easy testing with mock sources

Discriminator Pattern

Sources implement a two-phase matching strategy for efficient routing:

  1. Discriminator: Cheap field presence/value checks
  2. Parse: Full envelope parsing only after discriminator matches

This avoids expensive JSON parsing when messages don't match a source, and enables O(1) hot-path matching via adaptive ordering (the last successful source is tried first on subsequent messages).

func (s *mySource) Discriminator() dispatch.Discriminator {
    return dispatch.And(
        dispatch.HasFields("source", "detail-type"),
        dispatch.FieldEquals("source", "my.service"),
    )
}

Composable discriminators are provided:

  • HasFields: Check for field presence
  • FieldEquals: Check field value
  • And: All discriminators must match
  • Or: Any discriminator must match

Inspector and View

The Inspector/View abstraction enables format-agnostic field access:

type Inspector interface {
    Inspect(raw []byte) (View, error)
}

type View interface {
    HasField(path string) bool
    GetString(path string) (string, bool)
    GetBytes(path string) ([]byte, bool)
}

By default, the router uses JSONInspector for all sources. For mixed formats (e.g., JSON and protobuf), use AddGroup with a custom inspector:

r := dispatch.New()
r.AddSource(jsonSource)                          // Uses default JSON inspector
r.AddGroup(protoInspector, grpcSource, kafkaSource) // Custom inspector

Sources

A Source parses raw message bytes and returns routing information:

type Source interface {
    Name() string
    Discriminator() Discriminator
    Parse(raw []byte) (Parsed, bool)
}

Sources are matched using their Discriminator, then parsed in registration order. The first source whose discriminator matches and Parse succeeds handles the message.

The Parsed struct contains:

  • Key: routing key to match against registered handlers
  • Payload: raw JSON to unmarshal into the handler's type
  • Complete: optional callback for completion semantics (e.g., Step Functions)

Example source implementation:

type mySource struct{}

func (s *mySource) Name() string { return "my-source" }

func (s *mySource) Discriminator() dispatch.Discriminator {
    return dispatch.HasFields("type", "payload")
}

func (s *mySource) Parse(raw []byte) (dispatch.Parsed, bool) {
    var env struct {
        Type    string          `json:"type"`
        Payload json.RawMessage `json:"payload"`
    }
    if err := json.Unmarshal(raw, &env); err != nil || env.Type == "" {
        return dispatch.Parsed{}, false
    }
    return dispatch.Parsed{
        Key:     env.Type,
        Payload: env.Payload,
    }, true
}

Use SourceFunc for simple sources without a struct:

r.AddSource(dispatch.SourceFunc("custom", dispatch.HasFields("event"), parseFunc))

Handlers

Handlers implement the Handler interface with a typed payload:

type Handler[T any] interface {
    Handle(ctx context.Context, payload T) error
}

The router automatically:

  • Unmarshals the JSON payload to the handler's type
  • Validates the payload if it implements validation.Validatable
  • Calls the handler with the typed payload

Use HandlerFunc for simple cases without a struct:

dispatch.RegisterFunc(r, "ping", func(ctx context.Context, p PingPayload) error {
    return nil
})

Hooks

Hooks provide observability without coupling to specific logging or metrics systems. Use functional options to configure hooks:

r := dispatch.New(
    dispatch.WithOnParse(func(ctx context.Context, source, key string) context.Context {
        return logx.WithCtx(ctx, slog.String("source", source), slog.String("key", key))
    }),
    dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
        metrics.Timing("dispatch.success", d, "source:"+source)
    }),
    dispatch.WithOnFailure(func(ctx context.Context, source, key string, err error, d time.Duration) {
        metrics.Incr("dispatch.error", "source:"+source)
    }),
)

Available hooks:

  • WithOnParse: Called after parsing, enriches context
  • WithOnDispatch: Called just before handler executes
  • WithOnSuccess: Called after handler succeeds
  • WithOnFailure: Called after handler fails
  • WithOnNoSource: Called when no source matches
  • WithOnNoHandler: Called when no handler is registered
  • WithOnUnmarshalError: Called on JSON unmarshal errors
  • WithOnValidationError: Called on validation errors

Multiple hooks of the same type are called in order.

Source-Specific Hooks

Sources can implement optional hook interfaces to add source-specific behavior. These hooks run after global hooks, and both are always called:

type OnParseHook interface {
    OnParse(ctx context.Context, key string) context.Context
}

type OnSuccessHook interface {
    OnSuccess(ctx context.Context, key string, duration time.Duration)
}

type OnFailureHook interface {
    OnFailure(ctx context.Context, key string, err error, duration time.Duration)
}

For error-returning hooks, if either global or source returns an error, that error is returned. This allows sources to override global skip/fail policies.

Completion Callbacks

Sources can provide a Complete callback in Parsed for transport-specific completion semantics. For example, Step Functions requires SendTaskSuccess or SendTaskFailure after processing:

func (s *sfnSource) Parse(raw []byte) (dispatch.Parsed, bool) {
    // ... parse envelope ...
    return dispatch.Parsed{
        Key:     taskType,
        Payload: payload,
        Complete: func(ctx context.Context, err error) error {
            if err != nil {
                return s.sfn.SendTaskFailure(...)
            }
            return s.sfn.SendTaskSuccess(...)
        },
    }, true
}

Validation

Payloads that implement validation.Validatable are automatically validated after unmarshaling:

type UserPayload struct {
    UserID string `json:"user_id"`
    Email  string `json:"email"`
}

func (p *UserPayload) Validate() error {
    return validation.ValidateStruct(p,
        validation.Field(&p.UserID, validation.Required),
        validation.Field(&p.Email, validation.Required, is.Email),
    )
}

Validation errors trigger the OnValidationError hook.

Error Handling

The OnNoSource, OnNoHandler, OnUnmarshalError, and OnValidationError hooks control what happens when errors occur:

  • Return nil to skip the message (it goes to DLQ if configured)
  • Return an error to fail (message retries based on queue configuration)

By default, all errors cause failures. Override with hooks to skip bad messages:

r := dispatch.New(
    dispatch.WithOnUnmarshalError(func(ctx context.Context, source, key string, err error) error {
        logger.Error("bad payload", "error", err)
        return nil // skip to DLQ, don't retry
    }),
)

Thread Safety

Router is safe for concurrent use after configuration is complete. Do not call AddSource, AddGroup, or Register after calling Process.

Example
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/bjaus/dispatch"
)

// UserCreatedPayload is the payload for user/created events.
type UserCreatedPayload struct {
	UserID string `json:"user_id"`
	Email  string `json:"email"`
}

// UserCreatedHandler handles user/created events.
type UserCreatedHandler struct{}

func (h *UserCreatedHandler) Handle(ctx context.Context, p UserCreatedPayload) error {
	fmt.Printf("User created: %s (%s)\n", p.UserID, p.Email)
	return nil
}

// simpleSource is a minimal source implementation for examples.
type simpleSource struct{}

func (s *simpleSource) Name() string { return "simple" }

func (s *simpleSource) Discriminator() dispatch.Discriminator {
	return dispatch.HasFields("type", "payload")
}

func (s *simpleSource) Parse(raw []byte) (dispatch.Parsed, bool) {
	var env struct {
		Type    string          `json:"type"`
		Payload json.RawMessage `json:"payload"`
	}
	if err := json.Unmarshal(raw, &env); err != nil || env.Type == "" {
		return dispatch.Parsed{}, false
	}
	return dispatch.Parsed{
		Key:     env.Type,
		Payload: env.Payload,
	}, true
}

func main() {
	// Create router with hooks
	r := dispatch.New(
		dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
			log.Printf("[%s] %s succeeded (%v)", source, key, d)
		}),
		dispatch.WithOnFailure(func(ctx context.Context, source, key string, err error, d time.Duration) {
			log.Printf("[%s] %s failed: %v (%v)", source, key, err, d)
		}),
	)

	// Add source
	r.AddSource(&simpleSource{})

	// Register handler
	dispatch.Register(r, "user/created", &UserCreatedHandler{})

	// Process a message
	msg := []byte(`{"type": "user/created", "payload": {"user_id": "123", "email": "test@example.com"}}`)
	if err := r.Process(context.Background(), msg); err != nil {
		log.Fatal(err)
	}

}
Output:

User created: 123 (test@example.com)
Example (Completion)
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/bjaus/dispatch"
)

// completionSource demonstrates a source with completion callback.
type completionSource struct{}

func (s *completionSource) Name() string { return "completion" }

func (s *completionSource) Discriminator() dispatch.Discriminator {
	return dispatch.HasFields("task", "token", "payload")
}

func (s *completionSource) Parse(raw []byte) (dispatch.Parsed, bool) {
	var env struct {
		Task    string          `json:"task"`
		Token   string          `json:"token"`
		Payload json.RawMessage `json:"payload"`
	}
	if err := json.Unmarshal(raw, &env); err != nil || env.Token == "" {
		return dispatch.Parsed{}, false
	}
	return dispatch.Parsed{
		Key:     env.Task,
		Payload: env.Payload,
		Complete: func(ctx context.Context, err error) error {
			if err != nil {
				fmt.Printf("Task %s failed: %v\n", env.Token, err)
			} else {
				fmt.Printf("Task %s succeeded\n", env.Token)
			}
			return nil
		},
	}, true
}

func main() {
	r := dispatch.New()
	r.AddSource(&completionSource{})

	dispatch.RegisterFunc(r, "process", func(ctx context.Context, p struct{ Value int }) error {
		fmt.Println("Processing value:", p.Value)
		return nil
	})

	msg := []byte(`{"task": "process", "token": "abc123", "payload": {"value": 42}}`)
	_ = r.Process(context.Background(), msg)

}
Output:

Processing value: 42
Task abc123 succeeded
Example (HandlerFunc)
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/bjaus/dispatch"
)

// simpleSource is a minimal source implementation for examples.
type simpleSource struct{}

func (s *simpleSource) Name() string { return "simple" }

func (s *simpleSource) Discriminator() dispatch.Discriminator {
	return dispatch.HasFields("type", "payload")
}

func (s *simpleSource) Parse(raw []byte) (dispatch.Parsed, bool) {
	var env struct {
		Type    string          `json:"type"`
		Payload json.RawMessage `json:"payload"`
	}
	if err := json.Unmarshal(raw, &env); err != nil || env.Type == "" {
		return dispatch.Parsed{}, false
	}
	return dispatch.Parsed{
		Key:     env.Type,
		Payload: env.Payload,
	}, true
}

func main() {
	r := dispatch.New()
	r.AddSource(&simpleSource{})

	// Register with a function instead of a struct
	dispatch.RegisterFunc(r, "ping", func(ctx context.Context, p struct{ Message string }) error {
		fmt.Println("Ping:", p.Message)
		return nil
	})

	msg := []byte(`{"type": "ping", "payload": {"message": "hello"}}`)
	_ = r.Process(context.Background(), msg)

}
Output:

Ping: hello
Example (MultipleHooks)
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/bjaus/dispatch"
)

// simpleSource is a minimal source implementation for examples.
type simpleSource struct{}

func (s *simpleSource) Name() string { return "simple" }

func (s *simpleSource) Discriminator() dispatch.Discriminator {
	return dispatch.HasFields("type", "payload")
}

func (s *simpleSource) Parse(raw []byte) (dispatch.Parsed, bool) {
	var env struct {
		Type    string          `json:"type"`
		Payload json.RawMessage `json:"payload"`
	}
	if err := json.Unmarshal(raw, &env); err != nil || env.Type == "" {
		return dispatch.Parsed{}, false
	}
	return dispatch.Parsed{
		Key:     env.Type,
		Payload: env.Payload,
	}, true
}

func main() {
	// Pass multiple hooks to New
	r := dispatch.New(
		dispatch.WithOnDispatch(func(ctx context.Context, source, key string) {
			fmt.Printf("Processing %s from %s\n", key, source)
		}),
		dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
			fmt.Printf("Metric: %s.%s.success\n", source, key)
		}),
	)
	r.AddSource(&simpleSource{})

	dispatch.RegisterFunc(r, "test", func(ctx context.Context, p struct{}) error {
		return nil
	})

	msg := []byte(`{"type": "test", "payload": {}}`)
	_ = r.Process(context.Background(), msg)

}
Output:

Processing test from simple
Metric: simple.test.success
Example (SkipBadMessages)
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/bjaus/dispatch"
)

// simpleSource is a minimal source implementation for examples.
type simpleSource struct{}

func (s *simpleSource) Name() string { return "simple" }

func (s *simpleSource) Discriminator() dispatch.Discriminator {
	return dispatch.HasFields("type", "payload")
}

func (s *simpleSource) Parse(raw []byte) (dispatch.Parsed, bool) {
	var env struct {
		Type    string          `json:"type"`
		Payload json.RawMessage `json:"payload"`
	}
	if err := json.Unmarshal(raw, &env); err != nil || env.Type == "" {
		return dispatch.Parsed{}, false
	}
	return dispatch.Parsed{
		Key:     env.Type,
		Payload: env.Payload,
	}, true
}

func main() {
	r := dispatch.New(
		dispatch.WithOnNoHandler(func(ctx context.Context, source, key string) error {
			fmt.Println("Skipping unknown event:", key)
			return nil // return nil to skip, error to fail
		}),
		dispatch.WithOnUnmarshalError(func(ctx context.Context, source, key string, err error) error {
			fmt.Println("Skipping bad payload:", err)
			return nil // skip bad payloads
		}),
	)

	r.AddSource(&simpleSource{})

	// No handler registered for "unknown" - will be skipped
	msg := []byte(`{"type": "unknown", "payload": {}}`)
	err := r.Process(context.Background(), msg)
	fmt.Println("Error:", err)

}
Output:

Skipping unknown event: unknown
Error: <nil>
Example (SourceFunc)
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/bjaus/dispatch"
)

func main() {
	r := dispatch.New()

	// Use SourceFunc for simple sources
	r.AddSource(dispatch.SourceFunc("custom", dispatch.HasFields("event", "data"), func(raw []byte) (dispatch.Parsed, bool) {
		var env struct {
			Event string          `json:"event"`
			Data  json.RawMessage `json:"data"`
		}
		if err := json.Unmarshal(raw, &env); err != nil || env.Event == "" {
			return dispatch.Parsed{}, false
		}
		return dispatch.Parsed{Key: env.Event, Payload: env.Data}, true
	}))

	dispatch.RegisterFunc(r, "hello", func(ctx context.Context, p struct{ Name string }) error {
		fmt.Println("Hello,", p.Name)
		return nil
	})

	msg := []byte(`{"event": "hello", "data": {"name": "World"}}`)
	_ = r.Process(context.Background(), msg)

}
Output:

Hello, World

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrInvalidJSON = errors.New("invalid JSON")

ErrInvalidJSON is returned when the input is not valid JSON.

Functions

func Register

func Register[T any](r *Router, key string, h Handler[T])

Register adds a handler for a routing key. The key must match the Key field returned by a source's Parse method.

This is a package-level function (not a method) due to Go generics limitations: methods cannot have type parameters independent of the receiver.

Example:

dispatch.Register(r, "user/created", &UserCreatedHandler{db: db})
dispatch.Register(r, "user/deleted", &UserDeletedHandler{db: db})

func RegisterFunc

func RegisterFunc[T any](r *Router, key string, fn func(ctx context.Context, payload T) error)

RegisterFunc is a convenience function for registering a handler function.

Example:

dispatch.RegisterFunc(r, "ping", func(ctx context.Context, p PingPayload) error {
    return nil
})

Types

type Discriminator

type Discriminator interface {
	Match(v View) bool
}

Discriminator determines if a source should handle a message based on the message content. Discriminators are cheap to evaluate compared to full parsing.

func And

func And(ds ...Discriminator) Discriminator

And returns a Discriminator that matches when all discriminators match.

func FieldEquals

func FieldEquals(path, value string) Discriminator

FieldEquals returns a Discriminator that matches when the path exists and equals the given string value.

func HasFields

func HasFields(paths ...string) Discriminator

HasFields returns a Discriminator that matches when all paths exist.

func Or

func Or(ds ...Discriminator) Discriminator

Or returns a Discriminator that matches when any discriminator matches.

type Handler

type Handler[T any] interface {
	Handle(ctx context.Context, payload T) error
}

Handler processes a typed payload. This is the primary interface to implement.

The type parameter T is the payload type that the handler expects. The router automatically unmarshals the raw JSON payload to this type and validates it if T implements validation.Validatable.

Example:

type UserCreatedHandler struct {
    db *sql.DB
}

func (h *UserCreatedHandler) Handle(ctx context.Context, p UserCreatedPayload) error {
    _, err := h.db.ExecContext(ctx, "INSERT INTO users ...", p.UserID, p.Email)
    return err
}

type HandlerFunc

type HandlerFunc[T any] func(ctx context.Context, payload T) error

HandlerFunc is a function adapter for Handler. Use this for simple handlers that don't need a struct:

dispatch.RegisterFunc(r, "ping", func(ctx context.Context, p PingPayload) error {
    return nil
})

func (HandlerFunc[T]) Handle

func (f HandlerFunc[T]) Handle(ctx context.Context, payload T) error

Handle implements the Handler interface.

type Inspector

type Inspector interface {
	Inspect(raw []byte) (View, error)
}

Inspector examines raw bytes and returns a View for field queries. Different inspectors handle different formats (JSON, protobuf, etc.).

func JSONInspector

func JSONInspector() Inspector

JSONInspector returns an Inspector that uses gjson for field access.

type OnDispatchFunc

type OnDispatchFunc func(ctx context.Context, source, key string)

OnDispatchFunc is called just before the handler executes.

type OnDispatchHook

type OnDispatchHook interface {
	OnDispatch(ctx context.Context, key string)
}

OnDispatchHook is an optional interface that sources can implement to add source-specific pre-dispatch behavior. Called after global OnDispatch hooks.

type OnFailureFunc

type OnFailureFunc func(ctx context.Context, source, key string, err error, duration time.Duration)

OnFailureFunc is called after the handler fails.

type OnFailureHook

type OnFailureHook interface {
	OnFailure(ctx context.Context, key string, err error, duration time.Duration)
}

OnFailureHook is an optional interface that sources can implement to add source-specific behavior on handler failure. Called after global OnFailure hooks.

type OnNoHandlerFunc

type OnNoHandlerFunc func(ctx context.Context, source, key string) error

OnNoHandlerFunc is called when no handler is registered for the routing key. Return nil to skip, return an error to fail.

type OnNoHandlerHook

type OnNoHandlerHook interface {
	OnNoHandler(ctx context.Context, key string) error
}

OnNoHandlerHook is an optional interface that sources can implement to add source-specific behavior when no handler is found. Called after global hooks; if either returns an error, that error is used.

type OnNoSourceFunc

type OnNoSourceFunc func(ctx context.Context, raw []byte) error

OnNoSourceFunc is called when no source can parse the message. Return nil to skip the message, return an error to fail.

type OnParseFunc

type OnParseFunc func(ctx context.Context, source, key string) context.Context

OnParseFunc is called after a source successfully parses a message. Use this to enrich the context with logging fields or trace spans. The returned context is used for the rest of the request.

type OnParseHook

type OnParseHook interface {
	OnParse(ctx context.Context, key string) context.Context
}

OnParseHook is an optional interface that sources can implement to add source-specific context enrichment. Called after global OnParse hooks.

type OnSuccessFunc

type OnSuccessFunc func(ctx context.Context, source, key string, duration time.Duration)

OnSuccessFunc is called after the handler completes successfully.

type OnSuccessHook

type OnSuccessHook interface {
	OnSuccess(ctx context.Context, key string, duration time.Duration)
}

OnSuccessHook is an optional interface that sources can implement to add source-specific behavior on handler success. Called after global OnSuccess hooks.

type OnUnmarshalErrorFunc

type OnUnmarshalErrorFunc func(ctx context.Context, source, key string, err error) error

OnUnmarshalErrorFunc is called when JSON unmarshaling fails. Return nil to skip, return an error to fail.

type OnUnmarshalErrorHook

type OnUnmarshalErrorHook interface {
	OnUnmarshalError(ctx context.Context, key string, err error) error
}

OnUnmarshalErrorHook is an optional interface that sources can implement to add source-specific behavior on unmarshal errors. Called after global hooks; if either returns an error, that error is used.

type OnValidationErrorFunc

type OnValidationErrorFunc func(ctx context.Context, source, key string, err error) error

OnValidationErrorFunc is called when payload validation fails. Return nil to skip, return an error to fail.

type OnValidationErrorHook

type OnValidationErrorHook interface {
	OnValidationError(ctx context.Context, key string, err error) error
}

OnValidationErrorHook is an optional interface that sources can implement to add source-specific behavior on validation errors. Called after global hooks; if either returns an error, that error is used.

type Option

type Option func(*Router)

Option configures Router behavior.

func WithInspector

func WithInspector(i Inspector) Option

WithInspector sets the default inspector for sources added with AddSource.

func WithOnDispatch

func WithOnDispatch(fn OnDispatchFunc) Option

WithOnDispatch adds a hook called just before the handler executes. Multiple hooks are called in order.

Example:

dispatch.WithOnDispatch(func(ctx context.Context, source, key string) {
    logger.Info(ctx, "dispatching event", "key", key)
})

func WithOnFailure

func WithOnFailure(fn OnFailureFunc) Option

WithOnFailure adds a hook called after the handler fails. Multiple hooks are called in order.

Example:

dispatch.WithOnFailure(func(ctx context.Context, source, key string, err error, d time.Duration) {
    metrics.Incr("dispatch.failure", "source:"+source)
    logger.Error(ctx, "handler failed", "error", err)
})

func WithOnNoHandler

func WithOnNoHandler(fn OnNoHandlerFunc) Option

WithOnNoHandler adds a hook called when no handler is registered for the key. Return nil to skip, return an error to fail. Multiple hooks are called in order; first error wins.

Example:

dispatch.WithOnNoHandler(func(ctx context.Context, source, key string) error {
    logger.Warn(ctx, "no handler", "key", key)
    return nil // skip
})

func WithOnNoSource

func WithOnNoSource(fn OnNoSourceFunc) Option

WithOnNoSource adds a hook called when no source can parse the message. Return nil to skip, return an error to fail. Multiple hooks are called in order; first error wins.

Example:

dispatch.WithOnNoSource(func(ctx context.Context, raw []byte) error {
    logger.Warn(ctx, "unknown message format")
    return nil // skip to DLQ
})

func WithOnParse

func WithOnParse(fn OnParseFunc) Option

WithOnParse adds a hook called after a source successfully parses a message. Multiple hooks are called in order, with context chaining through each.

Example:

dispatch.WithOnParse(func(ctx context.Context, source, key string) context.Context {
    return logx.WithCtx(ctx, slog.String("source", source))
})

func WithOnSuccess

func WithOnSuccess(fn OnSuccessFunc) Option

WithOnSuccess adds a hook called after the handler completes successfully. Multiple hooks are called in order.

Example:

dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
    metrics.Timing("dispatch.success", d, "source:"+source)
})

func WithOnUnmarshalError

func WithOnUnmarshalError(fn OnUnmarshalErrorFunc) Option

WithOnUnmarshalError adds a hook called when JSON unmarshaling fails. Return nil to skip, return an error to fail. Multiple hooks are called in order; first error wins.

Example:

dispatch.WithOnUnmarshalError(func(ctx context.Context, source, key string, err error) error {
    logger.Error(ctx, "bad payload", "error", err)
    return nil // skip bad payloads
})

func WithOnValidationError

func WithOnValidationError(fn OnValidationErrorFunc) Option

WithOnValidationError adds a hook called when payload validation fails. Return nil to skip, return an error to fail. Multiple hooks are called in order; first error wins.

Example:

dispatch.WithOnValidationError(func(ctx context.Context, source, key string, err error) error {
    logger.Error(ctx, "validation failed", "error", err)
    return nil // skip invalid payloads
})

type Parsed

type Parsed struct {
	// Key is the routing key used to find the handler.
	// This is matched against keys passed to Register.
	Key string

	// Payload is the raw JSON to unmarshal into the handler's type.
	Payload json.RawMessage

	// Complete is called after the handler finishes, regardless of success or failure.
	// Use this for transport-specific completion semantics like Step Functions
	// SendTaskSuccess/SendTaskFailure.
	//
	// If Complete is nil, no completion callback is made.
	// If Complete returns an error, that error is returned from Process.
	// The err parameter is the handler's error (or nil on success).
	Complete func(ctx context.Context, err error) error
}

Parsed contains the result of source parsing.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router dispatches messages to registered handlers based on routing keys.

Usage:

  1. Create a router with New
  2. Add sources with AddSource (or AddGroup for custom inspectors)
  3. Register handlers with Register
  4. Process messages with Process

Router is safe for concurrent use after configuration. Do not call AddSource, AddGroup, or Register after calling Process.

func New

func New(opts ...Option) *Router

New creates a Router with the given options.

By default, the router uses JSONInspector for source matching. Use WithInspector to override.

Example:

r := dispatch.New(
    dispatch.WithOnParse(func(ctx context.Context, source, key string) context.Context {
        return logx.WithCtx(ctx, slog.String("source", source))
    }),
    dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
        metrics.Timing("dispatch.success", d)
    }),
)

func (*Router) AddGroup

func (r *Router) AddGroup(inspector Inspector, sources ...Source)

AddGroup registers sources with a custom inspector. Use this when you have sources that use a different message format (e.g., protobuf).

Groups are checked after the default group, in registration order.

Example:

r.AddGroup(protoInspector, grpcSource, kafkaSource)

func (*Router) AddSource

func (r *Router) AddSource(s Source)

AddSource registers a source to the default inspector group. Sources are matched using their Discriminator, then parsed in registration order.

Example:

r.AddSource(eventBridgeSource)
r.AddSource(snsSource)
r.AddSource(sfnSource)

func (*Router) Process

func (r *Router) Process(ctx context.Context, raw []byte) error

Process parses the raw message, routes to the appropriate handler, and executes completion callbacks.

The processing flow:

  1. Use discriminators to find a matching source
  2. Parse the message with the matched source
  3. Look up the handler by the parsed routing key
  4. Unmarshal the payload to the handler's type
  5. Validate the payload if it implements Validatable
  6. Call the handler
  7. Call the source's Complete callback if provided

Hooks are called at appropriate points throughout this flow.

Example:

// In an SQS consumer
func (s *Subscriber) ProcessMessage(ctx context.Context, msg sqs.Message) error {
    return s.router.Process(ctx, []byte(*msg.Body))
}

// In a Lambda handler
func handler(ctx context.Context, event json.RawMessage) error {
    return router.Process(ctx, event)
}

type Source

type Source interface {
	// Name returns the source identifier for logging and metrics.
	Name() string

	// Discriminator returns a predicate for cheap message detection.
	// The router calls this before Parse to avoid expensive parsing
	// when the message format doesn't match.
	Discriminator() Discriminator

	// Parse attempts to parse raw bytes as this source's format.
	// Returns the parsed result and true if successful, or false if this
	// source does not recognize the message format.
	Parse(raw []byte) (Parsed, bool)
}

Source parses raw message bytes and extracts routing information.

Sources are registered with Router.AddSource and matched using their Discriminator before Parse is called. This allows cheap detection before expensive parsing.

Implement Source to support different message formats:

  • EventBridge events
  • SNS notifications
  • Step Functions task tokens
  • Kinesis records
  • Custom formats

Example:

type mySource struct{}

func (s *mySource) Name() string { return "my-source" }

func (s *mySource) Discriminator() dispatch.Discriminator {
    return dispatch.HasFields("type", "payload")
}

func (s *mySource) Parse(raw []byte) (dispatch.Parsed, bool) {
    var env struct {
        Type    string          `json:"type"`
        Payload json.RawMessage `json:"payload"`
    }
    if err := json.Unmarshal(raw, &env); err != nil || env.Type == "" {
        return dispatch.Parsed{}, false
    }
    return dispatch.Parsed{Key: env.Type, Payload: env.Payload}, true
}

func SourceFunc

func SourceFunc(name string, disc Discriminator, parse func([]byte) (Parsed, bool)) Source

SourceFunc creates a Source from a name, discriminator, and parse function. Use this for simple sources that don't need a struct:

r.AddSource(dispatch.SourceFunc(
    "legacy",
    dispatch.HasFields("type", "payload"),
    func(raw []byte) (dispatch.Parsed, bool) {
        // parse logic
    },
))

type View

type View interface {
	// HasField returns true if the path exists in the message.
	HasField(path string) bool

	// GetString returns the string value at path, or false if not found
	// or not a string.
	GetString(path string) (string, bool)

	// GetBytes returns the raw bytes at path, or false if not found.
	// For JSON, this returns the raw JSON value (including quotes for strings).
	GetBytes(path string) ([]byte, bool)
}

View provides format-agnostic field access for discriminator matching.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL