Documentation
¶
Overview ¶
Package dispatch provides a lightweight, type-safe event bus implementation for building event-driven applications in Go. It supports both synchronous and asynchronous event handling, with features like wildcard pattern matching and typed payload handling through generics.
Basic Usage:
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
dispatcher := dispatch.NewDispatcher(logger)
// Register a handler
dispatcher.On("user.created", func(ctx context.Context, event dispatch.Event) {
// Handle the event
})
// Emit an event
dispatcher.Emit(context.Background(), "user.created", userData)
Event Signatures:
Events use dot-notation signatures that typically follow the pattern:
<source>.<action>
For example:
- user.created
- order.completed
- email.sent
Wildcard pattern matching is supported when registering handlers:
- "user.*" matches all user events
- "*.created" matches all creation events
- "system.*" matches all system events
Type-Safe Payload Handling:
The package provides several helpers for safe payload type conversion:
// Direct conversion
user, err := dispatch.PayloadAs[User](event)
// Type-safe handler
dispatcher.On("user.created", dispatch.HandlePayload[User](func(ctx context.Context, user User) {
// Work with strongly typed user data
}))
// Collection helpers
config, err := dispatch.PayloadAsMap(event) // For map[string]any
items, err := dispatch.PayloadAsSlice(event) // For []any
regions, err := dispatch.PayloadMapAs[Region](event) // For map[string]Region
users, err := dispatch.PayloadSliceAs[User](event) // For []User
Event Emission:
Events can be emitted either asynchronously (non-blocking) or synchronously (blocking):
// Async emission (handlers run in goroutines) dispatcher.Emit(ctx, "user.created", userData) // Sync emission (waits for all handlers to complete) dispatcher.EmitSync(ctx, "user.created", userData)
Context Support:
All event handlers receive a context.Context that can be used for cancellation, timeouts, and passing request-scoped values:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() dispatcher.EmitSync(ctx, "long.process", data)
Thread Safety:
The event dispatcher is thread-safe and can be safely used from multiple goroutines. Handler registration and event emission are protected by appropriate synchronization.
When to Use:
This package is designed for single-binary applications needing simple, type-safe, in-memory event handling. It's ideal for monolithic applications using the Hop framework where events don't need persistence or distributed processing. For distributed systems, message persistence, or advanced features like message routing and transformation, consider using a more comprehensive solution like [Watermill](https://github.com/ThreeDotsLabs/watermill) or a message queue.
Error Handling:
The event dispatcher automatically recovers from panics in event handlers and logs them using the provided logger. This ensures that a failing handler won't affect other handlers or the stability of the event bus.
Index ¶
- func IsPayloadType[T any](e Event) bool
- func MustPayloadAs[T any](e Event) T
- func PayloadAs[T any](e Event) (T, error)
- func PayloadAsMap(e Event) (map[string]any, error)
- func PayloadAsSlice(e Event) ([]any, error)
- func PayloadMapAs[T any](e Event) (map[string]T, error)
- func PayloadSliceAs[T any](e Event) ([]T, error)
- type Dispatcher
- type Event
- type Handler
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsPayloadType ¶
IsPayloadType checks if an event's payload is of the specified type T.
func MustPayloadAs ¶
MustPayloadAs converts an event's payload to the specified type T. Panics if the conversion fails.
func PayloadAs ¶
PayloadAs safely converts an event's payload to the specified type T. Returns the typed payload and any conversion error.
Example ¶
package main
import (
"fmt"
"github.com/patrickward/hop/dispatch"
)
func main() {
// Example event with a structured payload
type UserCreated struct {
ID string
Name string
}
evt := dispatch.NewEvent("user.created", UserCreated{
ID: "123",
Name: "John Doe",
})
// Safe conversion with error handling
user, err := dispatch.PayloadAs[UserCreated](evt)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("User created: %s\n", user.Name)
}
Output: User created: John Doe
func PayloadAsMap ¶
PayloadAsMap is a convenience function for working with map[string]any payloads, which are common when dealing with JSON data.
Example ¶
package main
import (
"fmt"
"github.com/patrickward/hop/dispatch"
)
func main() {
// Create an event with a map payload
evt := dispatch.NewEvent("config.updated", map[string]any{
"database": "postgres",
"port": 5432,
})
// Use the convenience function for map payloads
config, err := dispatch.PayloadAsMap(evt)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
if dbName, ok := config["database"].(string); ok {
fmt.Printf("Database: %s\n", dbName)
}
}
Output: Database: postgres
func PayloadAsSlice ¶
PayloadAsSlice is a convenience function for working with []any payloads.
Example ¶
package main
import (
"fmt"
"github.com/patrickward/hop/dispatch"
)
func main() {
// Create an event with a slice payload
evt := dispatch.NewEvent("users.updated", []any{
"john",
"jane",
})
// Use the convenience function for slice payloads
users, err := dispatch.PayloadAsSlice(evt)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Users: %v\n", users)
}
Output: Users: [john jane]
func PayloadMapAs ¶
PayloadMapAs converts a map payload into a map with typed values. Returns an error if the payload is not a map or if any value cannot be converted to type T.
Example:
type User struct { ID string }
userMap, err := PayloadMapAs[User](event)
Example ¶
package main
import (
"fmt"
"sort"
"sync"
"github.com/patrickward/hop/dispatch"
)
func main() {
// Define a type we want to convert to
type Region struct {
Code string
Name string
}
var mu sync.Mutex
var results []string
// Create an event with a map payload
evt := dispatch.NewEvent("regions.updated", map[string]any{
"us-east": Region{Code: "USE", Name: "US East"},
"us-west": Region{Code: "USW", Name: "US West"},
})
// Convert the payload to a map of Regions
regions, err := dispatch.PayloadMapAs[Region](evt)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
// Process the typed regions
for key, region := range regions {
mu.Lock()
results = append(results, fmt.Sprintf("Region %s: %s (%s)", key, region.Name, region.Code))
mu.Unlock()
}
// Sort and print results for consistent output
sort.Strings(results)
for _, result := range results {
fmt.Println(result)
}
}
Output: Region us-east: US East (USE) Region us-west: US West (USW)
func PayloadSliceAs ¶
PayloadSliceAs converts a slice payload into a slice of typed elements. Returns an error if the payload is not a slice or if any element cannot be converted to type T.
Example:
type User struct { ID string }
users, err := PayloadSliceAs[User](event)
Example ¶
package main
import (
"fmt"
"github.com/patrickward/hop/dispatch"
)
func main() {
// Define a type we want to convert to
type User struct {
ID string
Name string
}
// Create an event with a slice payload
evt := dispatch.NewEvent("users.imported", []any{
User{ID: "1", Name: "Alice"},
User{ID: "2", Name: "Bob"},
})
// Convert the payload to a slice of Users
users, err := dispatch.PayloadSliceAs[User](evt)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
// Process the typed users
for _, user := range users {
fmt.Printf("User: %s (ID: %s)\n", user.Name, user.ID)
}
}
Output: User: Alice (ID: 1) User: Bob (ID: 2)
Types ¶
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher manages event publishing and subscription
Example (Basic) ¶
package main
import (
"context"
"fmt"
"log/slog"
"os"
"time"
"github.com/patrickward/hop/dispatch"
)
func main() {
// Create a new event dispatcher with a basic logger
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
dispatcher := dispatch.NewDispatcher(logger)
// Register an event handler
dispatcher.On("user.login", func(ctx context.Context, event dispatch.Event) {
payload := event.Payload.(map[string]string)
fmt.Printf("User logged in: %s\n", payload["username"])
})
// Emit an event
dispatcher.Emit(context.Background(), "user.login", map[string]string{
"username": "alice",
})
// Wait for async handler to complete
time.Sleep(10 * time.Millisecond)
}
Output: User logged in: alice
Example (ContextCancellation) ¶
package main
import (
"context"
"fmt"
"log/slog"
"os"
"time"
"github.com/patrickward/hop/dispatch"
)
func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
dispatcher := dispatch.NewDispatcher(logger)
// Create a context with cancellation
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
// Register a handler that respects context cancellation
dispatcher.On("long.task", func(ctx context.Context, event dispatch.Event) {
select {
case <-ctx.Done():
fmt.Println("Task cancelled")
return
case <-time.After(100 * time.Millisecond):
fmt.Println("Task completed")
}
})
// Emit event with cancellable context
dispatcher.EmitSync(ctx, "long.task", nil)
}
Output: Task cancelled
Example (MultipleHandlers) ¶
package main
import (
"context"
"fmt"
"log/slog"
"os"
"sort"
"sync"
"github.com/patrickward/hop/dispatch"
)
func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
dispatcher := dispatch.NewDispatcher(logger)
var mu sync.Mutex
var results []string
// Register multiple handlers for the same event
dispatcher.On("notification.sent", func(ctx context.Context, event dispatch.Event) {
mu.Lock()
results = append(results, "Logging notification")
mu.Unlock()
})
dispatcher.On("notification.sent", func(ctx context.Context, event dispatch.Event) {
mu.Lock()
results = append(results, "Sending analytics")
mu.Unlock()
})
dispatcher.On("notification.sent", func(ctx context.Context, event dispatch.Event) {
mu.Lock()
results = append(results, "Updating cache")
mu.Unlock()
})
// Emit event synchronously - all handlers will be called
dispatcher.EmitSync(context.Background(), "notification.sent", nil)
// Sort and print results
sort.Strings(results)
for _, result := range results {
fmt.Println(result)
}
}
Output: Logging notification Sending analytics Updating cache
Example (SyncEmit) ¶
package main
import (
"context"
"fmt"
"log/slog"
"os"
"time"
"github.com/patrickward/hop/dispatch"
)
func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
dispatcher := dispatch.NewDispatcher(logger)
// Register event handlers
dispatcher.On("task.process", func(ctx context.Context, event dispatch.Event) {
fmt.Println("Processing task...")
time.Sleep(10 * time.Millisecond)
fmt.Println("Task completed")
})
// EmitSync will wait for all handlers to complete
fmt.Println("Starting task")
dispatcher.EmitSync(context.Background(), "task.process", nil)
fmt.Println("All processing complete")
}
Output: Starting task Processing task... Task completed All processing complete
Example (Wildcards) ¶
package main
import (
"context"
"fmt"
"log/slog"
"os"
"sort"
"sync"
"github.com/patrickward/hop/dispatch"
)
func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
dispatcher := dispatch.NewDispatcher(logger)
var mu sync.Mutex
var results []string
// Register handlers with wildcards
dispatcher.On("user.*", func(ctx context.Context, event dispatch.Event) {
mu.Lock()
results = append(results, fmt.Sprintf("User event: %s", event.Signature))
mu.Unlock()
})
dispatcher.On("*.created", func(ctx context.Context, event dispatch.Event) {
mu.Lock()
results = append(results, fmt.Sprintf("Created event: %s", event.Signature))
mu.Unlock()
})
// Emit events synchronously
dispatcher.EmitSync(context.Background(), "user.created", nil)
dispatcher.EmitSync(context.Background(), "user.deleted", nil)
dispatcher.EmitSync(context.Background(), "post.created", nil)
// Sort and print results
sort.Strings(results)
for _, result := range results {
fmt.Println(result)
}
}
Output: Created event: post.created Created event: user.created User event: user.created User event: user.deleted
func NewDispatcher ¶
func NewDispatcher(logger *slog.Logger) *Dispatcher
NewDispatcher creates a new event bus/dispatcher
func (*Dispatcher) Emit ¶
func (b *Dispatcher) Emit(ctx context.Context, signature string, payload any)
Emit sends an event to all registered handlers asynchronously
func (*Dispatcher) EmitSync ¶
func (b *Dispatcher) EmitSync(ctx context.Context, signature string, payload any)
EmitSync sends an event and waits for all handlers to complete
func (*Dispatcher) On ¶
func (b *Dispatcher) On(signature string, handler Handler)
On registers a handler for an event signature Supports wildcards: "hop.*" or "*.system.start"
type Event ¶
type Event struct {
ID string `json:"id"`
Signature string `json:"signature"` // e.g. "hop.system.start"
Payload any `json:"payload,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
Event represents a system event with a simplified structure
func NewEvent ¶
NewEvent creates an event with the given signature and optional payload
Example ¶
package main
import (
"fmt"
"github.com/patrickward/hop/dispatch"
)
func main() {
// Create a new event with a payload
evt := dispatch.NewEvent("user.created", map[string]string{
"id": "123",
"email": "user@example.com",
})
fmt.Printf("Signature: %s\n", evt.Signature)
}
Output: Signature: user.created
type Handler ¶
Handler processes an event
func HandlePayload ¶
HandlePayload creates an event handler that automatically converts the payload to the specified type T and calls the provided typed handler function. If type conversion fails, logs the error and returns without calling the handler.
Example ¶
type UserCreated struct {
ID string
Name string
}
logger := newTestLogger(os.Stderr)
// Create a test event bus
bus := dispatch.NewDispatcher(logger) // You'd normally pass a logger here
// Register handler with automatic payload conversion
bus.On("user.created", dispatch.HandlePayload[UserCreated](func(ctx context.Context, user UserCreated) {
fmt.Printf("Processing user: %s\n", user.Name)
}))
// Emit an event
ctx := context.Background()
bus.EmitSync(ctx, "user.created", UserCreated{
ID: "123",
Name: "John Doe",
})
Output: Processing user: John Doe