herald

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2026 License: MIT Imports: 7 Imported by: 0

README

herald

CI Status codecov Go Report Card CodeQL Go Reference License Go Version Release

Bidirectional bindings between capitan events and message brokers.

Emit a capitan event, herald publishes it. Herald receives a message, capitan emits it. Same types, same signals, automatic serialization.

Bidirectional Event Distribution

// capitan → broker: Publish events to external systems
pub := herald.NewPublisher(provider, signal, key, nil)
pub.Start()

// broker → capitan: Subscribe to external messages as events
sub := herald.NewSubscriber(provider, signal, key, nil)
sub.Start(ctx)

One provider, one signal, one key. Herald handles serialization, acknowledgment, and error routing.

Installation

go get github.com/zoobzio/herald

Requires Go 1.23+.

Quick Start

package main

import (
    "context"
    "fmt"

    kafkago "github.com/segmentio/kafka-go"
    "github.com/zoobzio/capitan"
    "github.com/zoobzio/herald"
    "github.com/zoobzio/herald/kafka"
)

type Order struct {
    ID    string  `json:"id"`
    Total float64 `json:"total"`
}

func main() {
    ctx := context.Background()

    // Define signal and typed key
    orderCreated := capitan.NewSignal("order.created", "New order")
    orderKey := capitan.NewKey[Order]("order", "app.Order")

    // Create Kafka provider
    writer := &kafkago.Writer{Addr: kafkago.TCP("localhost:9092"), Topic: "orders"}
    reader := kafkago.NewReader(kafkago.ReaderConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "orders",
        GroupID: "order-processor",
    })
    provider := kafka.New("orders", kafka.WithWriter(writer), kafka.WithReader(reader))
    defer provider.Close()

    // Publish: capitan events → Kafka
    pub := herald.NewPublisher(provider, orderCreated, orderKey, nil)
    pub.Start()
    defer pub.Close()

    // Subscribe: Kafka → capitan events
    sub := herald.NewSubscriber(provider, orderCreated, orderKey, nil)
    sub.Start(ctx)
    defer sub.Close()

    // Handle incoming messages with standard capitan hooks
    capitan.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
        order, _ := orderKey.From(e)
        fmt.Printf("Received order: %s\n", order.ID)
    })

    // Emit an event — automatically published to Kafka
    capitan.Emit(ctx, orderCreated, orderKey.Field(Order{ID: "ORD-123", Total: 99.99}))

    capitan.Shutdown()
}

Capabilities

Feature Description Docs
Bidirectional Flow Publish capitan events to brokers or subscribe broker messages as events Publishing, Subscribing
Type-Safe Generics Compile-time checked publishers and subscribers Overview
11 Providers Kafka, NATS, JetStream, Pub/Sub, Redis, SQS, AMQP, SNS, Bolt, Firestore, io Providers
Pipeline Middleware Validation, transformation, and side effects via processors Reliability
Reliability Patterns Retry, backoff, timeout, circuit breaker, rate limiting via pipz Reliability
Auto Acknowledgment Messages acked/nacked based on processing outcome Subscribing
Custom Codecs Pluggable serialization (JSON default, custom supported) Codecs
Error Observability All errors emit as capitan events Error Handling

Why herald?

  • Type-safe — Generic publishers and subscribers with compile-time checking
  • Bidirectional — Publish to brokers or subscribe from brokers
  • 11 providers — Kafka, NATS, JetStream, Pub/Sub, Redis, SQS, RabbitMQ, SNS, BoltDB, Firestore, io
  • Reliable — Pipeline middleware for retry, backoff, timeout, circuit breaker, rate limiting
  • Observable — Errors flow through capitan

Unified Event Flow

Herald enables a pattern: internal events become external messages, external messages become internal events.

Your application emits capitan events as usual. Herald publishes them to any broker. Other services publish to brokers. Herald subscribes and emits them as capitan events. Same signals, same keys, same hooks — the boundary between internal and external disappears.

// Service A: emit locally, publish externally
capitan.Emit(ctx, orderCreated, orderKey.Field(order))

// Service B: subscribe externally, handle locally
capitan.Hook(orderCreated, processOrder)

Two services, one event type, zero coupling. The broker is just a transport.

Providers

Provider Package Use Case
Kafka kafka High-throughput streaming
NATS nats Lightweight cloud messaging
JetStream jetstream NATS with persistence and headers
Google Pub/Sub pubsub GCP managed messaging
Redis Streams redis In-memory with persistence
AWS SQS sqs AWS managed queues
RabbitMQ/AMQP amqp Traditional message broker
AWS SNS sns Pub/sub fanout
BoltDB bolt Embedded local queues
Firestore firestore Firebase/GCP document store
io io Testing with io.Reader/Writer

Processing Hooks

Add processing steps via middleware processors:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithMiddleware(
        herald.UseApply[Order]("validate", func(ctx context.Context, env *herald.Envelope[Order]) (*herald.Envelope[Order], error) {
            if env.Value.Total < 0 {
                return env, errors.New("invalid total")
            }
            return env, nil
        }),
        herald.UseEffect[Order]("log", func(ctx context.Context, env *herald.Envelope[Order]) error {
            log.Printf("order %s", env.Value.ID)
            return nil
        }),
        herald.UseTransform[Order]("enrich", func(ctx context.Context, env *herald.Envelope[Order]) *herald.Envelope[Order] {
            env.Value.ProcessedAt = time.Now()
            return env
        }),
    ),
})
  • UseApply — Transform envelope with possible error
  • UseEffect — Side effect, envelope passes through unchanged
  • UseTransform — Pure transform, cannot fail

Pipeline Options

Add reliability features via pipz:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithRetry[Order](3),
    herald.WithBackoff[Order](3, 100*time.Millisecond),
    herald.WithTimeout[Order](5*time.Second),
    herald.WithCircuitBreaker[Order](5, 30*time.Second),
    herald.WithRateLimit[Order](100, 10),
})

See Reliability Guide for middleware and pipeline details.

Acknowledgment

Herald handles message acknowledgment automatically:

Outcome Action
Message processed successfully Ack() — Message acknowledged
Deserialization fails Nack() — Message returned for redelivery
Provider doesn't support ack No-op (e.g., NATS core, SNS)

Error Handling

All errors flow through capitan:

capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
    err, _ := herald.ErrorKey.From(e)
    log.Printf("[herald] %s: %v", err.Operation, err.Err)
})

See Error Handling Guide for details.

Documentation

Full documentation is available in the docs/ directory:

Learn
  • Overview — Architecture and philosophy
  • Publishing — Forward capitan events to brokers
  • Subscribing — Consume broker messages as capitan events
  • Providers — Available broker implementations
Guides
  • Reliability — Retry, backoff, circuit breaker, rate limiting
  • Codecs — Custom serialization formats
  • Error Handling — Centralized error management
  • Testing — Testing herald-based applications
Reference

Contributing

See CONTRIBUTING.md for guidelines.

License

MIT License — see LICENSE for details.

Documentation

Overview

Package herald provides bidirectional bindings between Capitan events and distributed messaging systems.

Herald bridges in-process event coordination (Capitan) with external message brokers, enabling seamless integration with distributed systems. Each broker is typed to a specific struct that represents the message contract.

Publishers observe Capitan signals and forward them to broker topics. Subscribers consume from broker topics and emit to Capitan signals.

A node should be either a Publisher OR Subscriber for a given signal, never both, preventing event loops in distributed topologies.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoWriter is returned when Publish is called on a provider without a writer configured.
	ErrNoWriter = errors.New("herald: no writer configured for publishing")

	// ErrNoReader is returned when Subscribe is called on a provider without a reader configured.
	ErrNoReader = errors.New("herald: no reader configured for subscribing")
)

Sentinel errors for provider misconfiguration.

View Source
var (
	// ErrorSignal is emitted when herald encounters an operational error.
	// This includes publish failures, subscribe errors, and unmarshal failures.
	ErrorSignal = capitan.NewSignal("herald.error", "Herald operational error")

	// ErrorKey extracts Error from events on ErrorSignal.
	ErrorKey = capitan.NewKey[Error]("error", "herald.Error")

	// MetadataKey extracts Metadata from events emitted by subscribers.
	// Use this in Capitan hooks to access broker message headers.
	MetadataKey = capitan.NewKey[Metadata]("metadata", "herald.Metadata")
)

Error signals and types for observability. Hook into ErrorSignal to receive notifications of operational failures.

Functions

func UseApply added in v0.0.4

func UseApply[T any](identity pipz.Identity, fn func(context.Context, *Envelope[T]) (*Envelope[T], error)) pipz.Chainable[*Envelope[T]]

UseApply creates a processor that can transform the envelope and fail. Use for operations like enrichment, validation, or transformation that may produce errors.

func UseBackoff added in v0.0.4

func UseBackoff[T any](maxAttempts int, baseDelay time.Duration, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]

UseBackoff wraps a processor with exponential backoff retry logic. Failed operations are retried with increasing delays.

func UseEffect added in v0.0.4

func UseEffect[T any](identity pipz.Identity, fn func(context.Context, *Envelope[T]) error) pipz.Chainable[*Envelope[T]]

UseEffect creates a processor that performs a side effect. The envelope passes through unchanged. Use for logging, metrics, or notifications that should not affect the value.

func UseEnrich added in v0.0.4

func UseEnrich[T any](identity pipz.Identity, fn func(context.Context, *Envelope[T]) (*Envelope[T], error)) pipz.Chainable[*Envelope[T]]

UseEnrich creates a processor that attempts optional enhancement. If the enrichment fails, processing continues with the original envelope.

func UseFallback added in v0.0.4

func UseFallback[T any](primary pipz.Chainable[*Envelope[T]], fallbacks ...pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]

UseFallback wraps a processor with fallback alternatives. If the primary fails, each fallback is tried in order.

func UseFilter added in v0.0.4

func UseFilter[T any](identity pipz.Identity, condition func(context.Context, *Envelope[T]) bool, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]

UseFilter wraps a processor with a condition. If the condition returns false, the envelope passes through unchanged.

func UseMutate added in v0.0.4

func UseMutate[T any](identity pipz.Identity, transformer func(context.Context, *Envelope[T]) *Envelope[T], condition func(context.Context, *Envelope[T]) bool) pipz.Chainable[*Envelope[T]]

UseMutate creates a processor that conditionally transforms the envelope. The transformer is only applied if the condition returns true.

func UseRateLimit added in v0.0.4

func UseRateLimit[T any](rate float64, burst int, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]

UseRateLimit wraps a processor with rate limiting. Uses a token bucket algorithm with the specified rate (tokens per second) and burst size.

func UseRetry added in v0.0.4

func UseRetry[T any](maxAttempts int, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]

UseRetry wraps a processor with retry logic. Failed operations are retried immediately up to maxAttempts times.

func UseTimeout added in v0.0.4

func UseTimeout[T any](d time.Duration, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]

UseTimeout wraps a processor with a deadline. If processing takes longer than the specified duration, the operation fails.

func UseTransform added in v0.0.4

func UseTransform[T any](identity pipz.Identity, fn func(context.Context, *Envelope[T]) *Envelope[T]) pipz.Chainable[*Envelope[T]]

UseTransform creates a processor that transforms the envelope. Cannot fail. Use for pure transformations that always succeed.

Types

type Codec

type Codec interface {
	// Marshal serializes a value to bytes.
	Marshal(v any) ([]byte, error)

	// Unmarshal deserializes bytes into a value.
	Unmarshal(data []byte, v any) error

	// ContentType returns the MIME type for metadata propagation.
	ContentType() string
}

Codec defines the serialization contract for message payloads. Implement this interface to use alternative formats like Protobuf, MessagePack, or Avro.

type Envelope added in v0.0.4

type Envelope[T any] struct {
	// Value is the typed message payload.
	Value T

	// Metadata contains message headers/attributes.
	// For publishers: set headers to send with the message.
	// For subscribers: read headers received from the broker.
	Metadata Metadata
}

Envelope wraps a value with metadata for pipeline processing. Provides type-safe access to message headers in middleware.

type Error

type Error struct {
	// Operation is the operation that failed: "publish", "subscribe", or "unmarshal"
	Operation string `json:"operation"`

	// Signal is the name of the user's signal involved in the error.
	Signal string `json:"signal"`

	// Err is the error message.
	Err string `json:"error"`

	// Nack is true if the message was nack'd for redelivery.
	Nack bool `json:"nack"`

	// Raw contains the original message bytes, if available.
	// Populated for unmarshal errors to aid debugging.
	Raw []byte `json:"raw,omitempty"`
}

Error represents an operational error in herald.

type JSONCodec

type JSONCodec struct{}

JSONCodec implements Codec using encoding/json.

func (JSONCodec) ContentType

func (JSONCodec) ContentType() string

ContentType returns the JSON MIME type.

func (JSONCodec) Marshal

func (JSONCodec) Marshal(v any) ([]byte, error)

Marshal serializes v to JSON bytes.

func (JSONCodec) Unmarshal

func (JSONCodec) Unmarshal(data []byte, v any) error

Unmarshal deserializes JSON bytes into v.

type Message

type Message struct {
	// Data is the raw message payload.
	Data []byte

	// Metadata contains message headers/attributes.
	// Maps to broker-native headers (Kafka headers, AMQP properties, SQS attributes, etc.)
	Metadata Metadata

	// Ack acknowledges successful processing.
	// The broker will not redeliver this message.
	Ack func() error

	// Nack signals processing failure.
	// The broker will typically redeliver the message (behavior varies by broker).
	Nack func() error
}

Message represents a message received from a broker with acknowledgment controls. Ack confirms successful processing; Nack signals failure and typically triggers redelivery.

type Metadata

type Metadata map[string]string

Metadata holds message headers/attributes for cross-cutting concerns. Used for correlation IDs, tracing context, content types, and routing hints.

type Option

type Option[T any] func(pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]

Option modifies a pipeline for reliability features. Options wrap the terminal operation with additional behavior.

func WithBackoff

func WithBackoff[T any](maxAttempts int, baseDelay time.Duration) Option[T]

WithBackoff adds retry logic with exponential backoff to the pipeline. Failed operations are retried with increasing delays between attempts. The delay starts at baseDelay and doubles after each failure.

func WithCircuitBreaker

func WithCircuitBreaker[T any](failures int, recovery time.Duration) Option[T]

WithCircuitBreaker adds circuit breaker protection to the pipeline. After 'failures' consecutive failures, the circuit opens for 'recovery' duration.

func WithErrorHandler

func WithErrorHandler[T any](handler pipz.Chainable[*pipz.Error[*Envelope[T]]]) Option[T]

WithErrorHandler adds error handling to the pipeline. The error handler receives error context and can process/log/alert as needed.

func WithFallback added in v0.0.4

func WithFallback[T any](fallbacks ...pipz.Chainable[*Envelope[T]]) Option[T]

WithFallback wraps the pipeline with fallback alternatives. If the primary pipeline fails, each fallback is tried in order. Useful for broker failover scenarios.

func WithFilter added in v0.0.4

func WithFilter[T any](identity pipz.Identity, condition func(context.Context, *Envelope[T]) bool) Option[T]

WithFilter wraps the pipeline with a condition. If the condition returns false, the pipeline is skipped.

func WithMiddleware added in v0.0.4

func WithMiddleware[T any](processors ...pipz.Chainable[*Envelope[T]]) Option[T]

WithMiddleware wraps the pipeline with a sequence of processors. Processors execute in order, with the wrapped pipeline last.

Example:

herald.NewPublisher[Event](
    provider, signal, key,
    []herald.Option[Event]{
        herald.WithMiddleware(
            herald.UseEffect[Event](logID, logFn),
            herald.UseApply[Event](validateID, validateFn),
        ),
    },
)

func WithPipeline

func WithPipeline[T any](custom pipz.Chainable[*Envelope[T]]) Option[T]

WithPipeline allows full control over the processing pipeline. Use this for advanced composition beyond the provided options. The provided pipeline replaces any default processing.

func WithRateLimit

func WithRateLimit[T any](rate float64, burst int) Option[T]

WithRateLimit adds rate limiting to the pipeline. rate = operations per second, burst = burst capacity.

func WithRetry

func WithRetry[T any](maxAttempts int) Option[T]

WithRetry adds retry logic to the pipeline. Failed operations are retried up to maxAttempts times immediately.

func WithTimeout

func WithTimeout[T any](duration time.Duration) Option[T]

WithTimeout adds timeout protection to the pipeline. Operations exceeding this duration will be canceled.

type Provider

type Provider interface {
	// Publish sends raw bytes with metadata to the broker.
	// Metadata is mapped to broker-native headers (Kafka headers, AMQP properties, etc.)
	Publish(ctx context.Context, data []byte, metadata Metadata) error

	// Subscribe returns a stream of messages from the broker.
	// Each message includes Ack/Nack functions for explicit acknowledgment.
	// Metadata is populated from broker-native headers.
	Subscribe(ctx context.Context) <-chan Result[Message]

	// Ping verifies broker connectivity.
	// Returns nil if the connection is healthy, error otherwise.
	// Use this for health checks and readiness probes.
	Ping(ctx context.Context) error

	// Close releases broker resources.
	Close() error
}

Provider defines the interface for message broker implementations. Each provider handles broker-specific connection and message semantics.

Message ordering depends on the underlying broker implementation. Most brokers (Kafka, NATS, etc.) provide ordering guarantees within a partition or subject, but not globally. Consult your provider's documentation for specifics.

type Publisher

type Publisher[T any] struct {
	// contains filtered or unexported fields
}

Publisher observes a Capitan signal and publishes events to a broker. T is the struct type representing the message contract.

func NewPublisher

func NewPublisher[T any](provider Provider, signal capitan.Signal, key capitan.GenericKey[T], pipelineOpts []Option[T], opts ...PublisherOption[T]) *Publisher[T]

NewPublisher creates a Publisher that observes the given signal and publishes T to the broker.

Parameters:

  • provider: broker implementation (kafka, nats, sqs, etc.)
  • signal: capitan signal to observe for events
  • key: typed key for extracting T from events
  • pipelineOpts: reliability middleware (retry, timeout, circuit breaker); nil for none
  • opts: publisher configuration (custom codec, custom capitan instance)

func (*Publisher[T]) Close

func (p *Publisher[T]) Close() error

Close stops the publisher, waits for in-flight publishes, and releases resources.

func (*Publisher[T]) Start

func (p *Publisher[T]) Start()

Start begins observing the signal and publishing to the broker.

type PublisherOption

type PublisherOption[T any] func(*Publisher[T])

PublisherOption configures a Publisher.

func WithPublisherCapitan

func WithPublisherCapitan[T any](c *capitan.Capitan) PublisherOption[T]

WithPublisherCapitan sets a custom Capitan instance for the publisher.

func WithPublisherCodec

func WithPublisherCodec[T any](c Codec) PublisherOption[T]

WithPublisherCodec sets a custom codec for the publisher. If not specified, JSONCodec is used.

type Result

type Result[T any] struct {
	// contains filtered or unexported fields
}

Result represents either a successful value or an error. Used for stream-based message consumption where errors and values flow through the same channel.

func NewError

func NewError[T any](err error) Result[T]

NewError creates a failed Result containing the given error.

func NewSuccess

func NewSuccess[T any](value T) Result[T]

NewSuccess creates a successful Result containing the given value.

func (Result[T]) Error

func (r Result[T]) Error() error

Error returns the error, or nil if this is a successful Result.

func (Result[T]) IsError

func (r Result[T]) IsError() bool

IsError returns true if this Result contains an error.

func (Result[T]) IsSuccess

func (r Result[T]) IsSuccess() bool

IsSuccess returns true if this Result contains a successful value.

func (Result[T]) Value

func (r Result[T]) Value() T

Value returns the successful value. Returns the zero value if this is an error Result.

type Subscriber

type Subscriber[T any] struct {
	// contains filtered or unexported fields
}

Subscriber consumes from a broker and emits events to Capitan. T is the struct type representing the message contract.

func NewSubscriber

func NewSubscriber[T any](provider Provider, signal capitan.Signal, key capitan.GenericKey[T], pipelineOpts []Option[T], opts ...SubscriberOption[T]) *Subscriber[T]

NewSubscriber creates a Subscriber that consumes from the broker and emits T to the given signal.

Parameters:

  • provider: broker implementation (kafka, nats, sqs, etc.)
  • signal: capitan signal to emit events to
  • key: typed key for creating fields from T
  • pipelineOpts: reliability middleware (retry, timeout, circuit breaker); nil for none
  • opts: subscriber configuration (custom codec, custom capitan instance)

func (*Subscriber[T]) Close

func (s *Subscriber[T]) Close() error

Close stops the subscriber and waits for the goroutine to exit.

func (*Subscriber[T]) Start

func (s *Subscriber[T]) Start(ctx context.Context)

Start begins consuming from the broker and emitting to Capitan. The provided context controls the subscriber's lifetime; canceling it stops consumption.

type SubscriberOption

type SubscriberOption[T any] func(*Subscriber[T])

SubscriberOption configures a Subscriber.

func WithSubscriberCapitan

func WithSubscriberCapitan[T any](c *capitan.Capitan) SubscriberOption[T]

WithSubscriberCapitan sets a custom Capitan instance for the subscriber.

func WithSubscriberCodec

func WithSubscriberCodec[T any](c Codec) SubscriberOption[T]

WithSubscriberCodec sets a custom codec for the subscriber. If not specified, JSONCodec is used.

Directories

Path Synopsis
Package testing provides test utilities and helpers for herald users.
Package testing provides test utilities and helpers for herald users.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL