Documentation
¶
Overview ¶
Package herald provides bidirectional bindings between Capitan events and distributed messaging systems.
Herald bridges in-process event coordination (Capitan) with external message brokers, enabling seamless integration with distributed systems. Each broker is typed to a specific struct that represents the message contract.
Publishers observe Capitan signals and forward them to broker topics. Subscribers consume from broker topics and emit to Capitan signals.
A node should be either a Publisher OR Subscriber for a given signal, never both, preventing event loops in distributed topologies.
Index ¶
- Variables
- func UseApply[T any](identity pipz.Identity, ...) pipz.Chainable[*Envelope[T]]
- func UseBackoff[T any](maxAttempts int, baseDelay time.Duration, ...) pipz.Chainable[*Envelope[T]]
- func UseEffect[T any](identity pipz.Identity, fn func(context.Context, *Envelope[T]) error) pipz.Chainable[*Envelope[T]]
- func UseEnrich[T any](identity pipz.Identity, ...) pipz.Chainable[*Envelope[T]]
- func UseFallback[T any](primary pipz.Chainable[*Envelope[T]], ...) pipz.Chainable[*Envelope[T]]
- func UseFilter[T any](identity pipz.Identity, condition func(context.Context, *Envelope[T]) bool, ...) pipz.Chainable[*Envelope[T]]
- func UseMutate[T any](identity pipz.Identity, ...) pipz.Chainable[*Envelope[T]]
- func UseRateLimit[T any](rate float64, burst int, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]
- func UseRetry[T any](maxAttempts int, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]
- func UseTimeout[T any](d time.Duration, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]
- func UseTransform[T any](identity pipz.Identity, fn func(context.Context, *Envelope[T]) *Envelope[T]) pipz.Chainable[*Envelope[T]]
- type Codec
- type Envelope
- type Error
- type JSONCodec
- type Message
- type Metadata
- type Option
- func WithBackoff[T any](maxAttempts int, baseDelay time.Duration) Option[T]
- func WithCircuitBreaker[T any](failures int, recovery time.Duration) Option[T]
- func WithErrorHandler[T any](handler pipz.Chainable[*pipz.Error[*Envelope[T]]]) Option[T]
- func WithFallback[T any](fallbacks ...pipz.Chainable[*Envelope[T]]) Option[T]
- func WithFilter[T any](identity pipz.Identity, condition func(context.Context, *Envelope[T]) bool) Option[T]
- func WithMiddleware[T any](processors ...pipz.Chainable[*Envelope[T]]) Option[T]
- func WithPipeline[T any](custom pipz.Chainable[*Envelope[T]]) Option[T]
- func WithRateLimit[T any](rate float64, burst int) Option[T]
- func WithRetry[T any](maxAttempts int) Option[T]
- func WithTimeout[T any](duration time.Duration) Option[T]
- type Provider
- type Publisher
- type PublisherOption
- type Result
- type Subscriber
- type SubscriberOption
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoWriter is returned when Publish is called on a provider without a writer configured. ErrNoWriter = errors.New("herald: no writer configured for publishing") // ErrNoReader is returned when Subscribe is called on a provider without a reader configured. ErrNoReader = errors.New("herald: no reader configured for subscribing") )
Sentinel errors for provider misconfiguration.
var ( // ErrorSignal is emitted when herald encounters an operational error. // This includes publish failures, subscribe errors, and unmarshal failures. ErrorSignal = capitan.NewSignal("herald.error", "Herald operational error") // ErrorKey extracts Error from events on ErrorSignal. ErrorKey = capitan.NewKey[Error]("error", "herald.Error") // MetadataKey extracts Metadata from events emitted by subscribers. // Use this in Capitan hooks to access broker message headers. MetadataKey = capitan.NewKey[Metadata]("metadata", "herald.Metadata") )
Error signals and types for observability. Hook into ErrorSignal to receive notifications of operational failures.
Functions ¶
func UseApply ¶ added in v0.0.4
func UseApply[T any](identity pipz.Identity, fn func(context.Context, *Envelope[T]) (*Envelope[T], error)) pipz.Chainable[*Envelope[T]]
UseApply creates a processor that can transform the envelope and fail. Use for operations like enrichment, validation, or transformation that may produce errors.
func UseBackoff ¶ added in v0.0.4
func UseBackoff[T any](maxAttempts int, baseDelay time.Duration, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]
UseBackoff wraps a processor with exponential backoff retry logic. Failed operations are retried with increasing delays.
func UseEffect ¶ added in v0.0.4
func UseEffect[T any](identity pipz.Identity, fn func(context.Context, *Envelope[T]) error) pipz.Chainable[*Envelope[T]]
UseEffect creates a processor that performs a side effect. The envelope passes through unchanged. Use for logging, metrics, or notifications that should not affect the value.
func UseEnrich ¶ added in v0.0.4
func UseEnrich[T any](identity pipz.Identity, fn func(context.Context, *Envelope[T]) (*Envelope[T], error)) pipz.Chainable[*Envelope[T]]
UseEnrich creates a processor that attempts optional enhancement. If the enrichment fails, processing continues with the original envelope.
func UseFallback ¶ added in v0.0.4
func UseFallback[T any](primary pipz.Chainable[*Envelope[T]], fallbacks ...pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]
UseFallback wraps a processor with fallback alternatives. If the primary fails, each fallback is tried in order.
func UseFilter ¶ added in v0.0.4
func UseFilter[T any](identity pipz.Identity, condition func(context.Context, *Envelope[T]) bool, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]
UseFilter wraps a processor with a condition. If the condition returns false, the envelope passes through unchanged.
func UseMutate ¶ added in v0.0.4
func UseMutate[T any](identity pipz.Identity, transformer func(context.Context, *Envelope[T]) *Envelope[T], condition func(context.Context, *Envelope[T]) bool) pipz.Chainable[*Envelope[T]]
UseMutate creates a processor that conditionally transforms the envelope. The transformer is only applied if the condition returns true.
func UseRateLimit ¶ added in v0.0.4
func UseRateLimit[T any](rate float64, burst int, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]
UseRateLimit wraps a processor with rate limiting. Uses a token bucket algorithm with the specified rate (tokens per second) and burst size.
func UseRetry ¶ added in v0.0.4
func UseRetry[T any](maxAttempts int, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]
UseRetry wraps a processor with retry logic. Failed operations are retried immediately up to maxAttempts times.
Types ¶
type Codec ¶
type Codec interface {
// Marshal serializes a value to bytes.
Marshal(v any) ([]byte, error)
// Unmarshal deserializes bytes into a value.
Unmarshal(data []byte, v any) error
// ContentType returns the MIME type for metadata propagation.
ContentType() string
}
Codec defines the serialization contract for message payloads. Implement this interface to use alternative formats like Protobuf, MessagePack, or Avro.
type Envelope ¶ added in v0.0.4
type Envelope[T any] struct { // Value is the typed message payload. Value T // Metadata contains message headers/attributes. // For publishers: set headers to send with the message. // For subscribers: read headers received from the broker. Metadata Metadata }
Envelope wraps a value with metadata for pipeline processing. Provides type-safe access to message headers in middleware.
type Error ¶
type Error struct {
// Operation is the operation that failed: "publish", "subscribe", or "unmarshal"
Operation string `json:"operation"`
// Signal is the name of the user's signal involved in the error.
Signal string `json:"signal"`
// Err is the error message.
Err string `json:"error"`
// Nack is true if the message was nack'd for redelivery.
Nack bool `json:"nack"`
// Raw contains the original message bytes, if available.
// Populated for unmarshal errors to aid debugging.
Raw []byte `json:"raw,omitempty"`
}
Error represents an operational error in herald.
type JSONCodec ¶
type JSONCodec struct{}
JSONCodec implements Codec using encoding/json.
func (JSONCodec) ContentType ¶
ContentType returns the JSON MIME type.
type Message ¶
type Message struct {
// Data is the raw message payload.
Data []byte
// Metadata contains message headers/attributes.
// Maps to broker-native headers (Kafka headers, AMQP properties, SQS attributes, etc.)
Metadata Metadata
// Ack acknowledges successful processing.
// The broker will not redeliver this message.
Ack func() error
// Nack signals processing failure.
// The broker will typically redeliver the message (behavior varies by broker).
Nack func() error
}
Message represents a message received from a broker with acknowledgment controls. Ack confirms successful processing; Nack signals failure and typically triggers redelivery.
type Metadata ¶
Metadata holds message headers/attributes for cross-cutting concerns. Used for correlation IDs, tracing context, content types, and routing hints.
type Option ¶
Option modifies a pipeline for reliability features. Options wrap the terminal operation with additional behavior.
func WithBackoff ¶
WithBackoff adds retry logic with exponential backoff to the pipeline. Failed operations are retried with increasing delays between attempts. The delay starts at baseDelay and doubles after each failure.
func WithCircuitBreaker ¶
WithCircuitBreaker adds circuit breaker protection to the pipeline. After 'failures' consecutive failures, the circuit opens for 'recovery' duration.
func WithErrorHandler ¶
WithErrorHandler adds error handling to the pipeline. The error handler receives error context and can process/log/alert as needed.
func WithFallback ¶ added in v0.0.4
WithFallback wraps the pipeline with fallback alternatives. If the primary pipeline fails, each fallback is tried in order. Useful for broker failover scenarios.
func WithFilter ¶ added in v0.0.4
func WithFilter[T any](identity pipz.Identity, condition func(context.Context, *Envelope[T]) bool) Option[T]
WithFilter wraps the pipeline with a condition. If the condition returns false, the pipeline is skipped.
func WithMiddleware ¶ added in v0.0.4
WithMiddleware wraps the pipeline with a sequence of processors. Processors execute in order, with the wrapped pipeline last.
Example:
herald.NewPublisher[Event](
provider, signal, key,
[]herald.Option[Event]{
herald.WithMiddleware(
herald.UseEffect[Event](logID, logFn),
herald.UseApply[Event](validateID, validateFn),
),
},
)
func WithPipeline ¶
WithPipeline allows full control over the processing pipeline. Use this for advanced composition beyond the provided options. The provided pipeline replaces any default processing.
func WithRateLimit ¶
WithRateLimit adds rate limiting to the pipeline. rate = operations per second, burst = burst capacity.
type Provider ¶
type Provider interface {
// Publish sends raw bytes with metadata to the broker.
// Metadata is mapped to broker-native headers (Kafka headers, AMQP properties, etc.)
Publish(ctx context.Context, data []byte, metadata Metadata) error
// Subscribe returns a stream of messages from the broker.
// Each message includes Ack/Nack functions for explicit acknowledgment.
// Metadata is populated from broker-native headers.
Subscribe(ctx context.Context) <-chan Result[Message]
// Ping verifies broker connectivity.
// Returns nil if the connection is healthy, error otherwise.
// Use this for health checks and readiness probes.
Ping(ctx context.Context) error
// Close releases broker resources.
Close() error
}
Provider defines the interface for message broker implementations. Each provider handles broker-specific connection and message semantics.
Message ordering depends on the underlying broker implementation. Most brokers (Kafka, NATS, etc.) provide ordering guarantees within a partition or subject, but not globally. Consult your provider's documentation for specifics.
type Publisher ¶
type Publisher[T any] struct { // contains filtered or unexported fields }
Publisher observes a Capitan signal and publishes events to a broker. T is the struct type representing the message contract.
func NewPublisher ¶
func NewPublisher[T any](provider Provider, signal capitan.Signal, key capitan.GenericKey[T], pipelineOpts []Option[T], opts ...PublisherOption[T]) *Publisher[T]
NewPublisher creates a Publisher that observes the given signal and publishes T to the broker.
Parameters:
- provider: broker implementation (kafka, nats, sqs, etc.)
- signal: capitan signal to observe for events
- key: typed key for extracting T from events
- pipelineOpts: reliability middleware (retry, timeout, circuit breaker); nil for none
- opts: publisher configuration (custom codec, custom capitan instance)
type PublisherOption ¶
PublisherOption configures a Publisher.
func WithPublisherCapitan ¶
func WithPublisherCapitan[T any](c *capitan.Capitan) PublisherOption[T]
WithPublisherCapitan sets a custom Capitan instance for the publisher.
func WithPublisherCodec ¶
func WithPublisherCodec[T any](c Codec) PublisherOption[T]
WithPublisherCodec sets a custom codec for the publisher. If not specified, JSONCodec is used.
type Result ¶
type Result[T any] struct { // contains filtered or unexported fields }
Result represents either a successful value or an error. Used for stream-based message consumption where errors and values flow through the same channel.
func NewSuccess ¶
NewSuccess creates a successful Result containing the given value.
type Subscriber ¶
type Subscriber[T any] struct { // contains filtered or unexported fields }
Subscriber consumes from a broker and emits events to Capitan. T is the struct type representing the message contract.
func NewSubscriber ¶
func NewSubscriber[T any](provider Provider, signal capitan.Signal, key capitan.GenericKey[T], pipelineOpts []Option[T], opts ...SubscriberOption[T]) *Subscriber[T]
NewSubscriber creates a Subscriber that consumes from the broker and emits T to the given signal.
Parameters:
- provider: broker implementation (kafka, nats, sqs, etc.)
- signal: capitan signal to emit events to
- key: typed key for creating fields from T
- pipelineOpts: reliability middleware (retry, timeout, circuit breaker); nil for none
- opts: subscriber configuration (custom codec, custom capitan instance)
func (*Subscriber[T]) Close ¶
func (s *Subscriber[T]) Close() error
Close stops the subscriber and waits for the goroutine to exit.
func (*Subscriber[T]) Start ¶
func (s *Subscriber[T]) Start(ctx context.Context)
Start begins consuming from the broker and emitting to Capitan. The provided context controls the subscriber's lifetime; canceling it stops consumption.
type SubscriberOption ¶
type SubscriberOption[T any] func(*Subscriber[T])
SubscriberOption configures a Subscriber.
func WithSubscriberCapitan ¶
func WithSubscriberCapitan[T any](c *capitan.Capitan) SubscriberOption[T]
WithSubscriberCapitan sets a custom Capitan instance for the subscriber.
func WithSubscriberCodec ¶
func WithSubscriberCodec[T any](c Codec) SubscriberOption[T]
WithSubscriberCodec sets a custom codec for the subscriber. If not specified, JSONCodec is used.