core

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package core provides the fundamental interfaces, types, and errors for Weave.

Index

Constants

View Source
const (
	// ErrorContentType is the standard content type for structured error payloads.
	ErrorContentType = "application/vnd.weave.error+json"
	// DeadLetterContentType is the standard content type for dead-letter envelopes.
	DeadLetterContentType = "application/vnd.weave.dead-letter+json"
)
View Source
const (
	EventConnect         = "connect"
	EventDisconnect      = "disconnect"
	EventPublishFailed   = "publish_failed"
	EventSubscribeFailed = "subscribe_failed"
	EventTimeout         = "timeout"
)

Variables

View Source
var (
	ErrClosed           = errors.New("broker is closed")
	ErrNoReplyTo        = errors.New("no reply-to destination specified")
	ErrAlreadyConnected = errors.New("broker is already connected")
	ErrInvalidConfig    = errors.New("invalid configuration")
)

Sentinel errors for common conditions.

Functions

func AvailableBackends

func AvailableBackends() []string

AvailableBackends returns a sorted list of registered backend names.

func EncodeErrorPayload

func EncodeErrorPayload(payload ErrorPayload) ([]byte, error)

EncodeErrorPayload serializes a structured error payload.

func IsBackendAvailable

func IsBackendAvailable(name string) bool

IsBackendAvailable checks if a backend is registered.

func IsCircuitOpen

func IsCircuitOpen(err error) bool

IsCircuitOpen returns true if the error indicates an open circuit breaker.

func IsConnectionLost

func IsConnectionLost(err error) bool

IsConnectionLost returns true if the error indicates the connection was lost.

func IsNotConnected

func IsNotConnected(err error) bool

IsNotConnected returns true if the error indicates the broker is not connected.

func IsTimeout

func IsTimeout(err error) bool

IsTimeout returns true if the error indicates a timeout.

func IsUnknownBackend

func IsUnknownBackend(err error) bool

IsUnknownBackend returns true if the error indicates an unknown backend.

func IsUnsupportedOperation

func IsUnsupportedOperation(err error) bool

IsUnsupportedOperation returns true if the error indicates an unsupported operation.

func Register

func Register(name string, factory BrokerFactory)

Register registers a broker factory for a backend name.

func RetryAttempt

func RetryAttempt(ctx context.Context) int

RetryAttempt returns the current attempt number from context. The first attempt is 1. A missing value also reports 1.

Types

type AMQPConfig

type AMQPConfig struct {
	Host            string
	Port            int
	Username        string
	Password        string
	VHost           string
	Heartbeat       time.Duration
	TLS             *TLSConfig
	Exchange        string
	ExchangeType    string
	QueueDurable    bool
	QueueAutoDelete bool
	QueueExclusive  bool
}

AMQPConfig holds AMQP/RabbitMQ-specific configuration.

func DefaultAMQPConfig

func DefaultAMQPConfig() *AMQPConfig

DefaultAMQPConfig returns default AMQP configuration.

type BackoffStrategy

type BackoffStrategy func(attempt int) time.Duration

BackoffStrategy returns the delay to wait before the next retry attempt. The attempt argument is one-based and represents the retry count after the initial attempt has already failed.

func ExponentialBackoff

func ExponentialBackoff(initialDelay, maxDelay time.Duration, multiplier float64) BackoffStrategy

ExponentialBackoff returns an exponential backoff strategy capped at maxDelay.

func FixedBackoff

func FixedBackoff(delay time.Duration) BackoffStrategy

FixedBackoff returns a backoff strategy that always returns the same delay.

type BrokerFactory

type BrokerFactory func(config *Config) (MessageBroker, error)

BrokerFactory is a function that creates a new MessageBroker instance.

type Caller

type Caller interface {
	// Call performs a synchronous request-reply operation.
	Call(ctx context.Context, destination string, message *Message, opts ...PublishOption) (*Message, error)
}

Caller defines request-reply operations (synchronous RPC).

type Client

type Client interface {
	Connector
	Publisher
	Caller
}

Client is a subset of MessageBroker for client-only operations. Use this interface when you only need to publish messages or make RPC calls, without subscribing to messages (server-side).

type Config

type Config struct {
	Backend         string
	ConnectionName  string
	ConnectionRetry int
	RetryDelay      time.Duration

	// Logger receives structured runtime and transport events.
	Logger EventLogger
	// EventHook receives structured runtime and transport events.
	EventHook EventHook
	// Metrics receives basic counters and duration measurements.
	Metrics MetricsHook
	// Tracing receives span lifecycle callbacks for high-level runtime operations.
	Tracing TracingHook
	// HealthReporter receives runtime health snapshots.
	HealthReporter HealthReporter
	// HealthHook receives runtime health snapshots as a callback.
	HealthHook HealthHook

	AMQP  *AMQPConfig
	Kafka *KafkaConfig

	// Shorthand for AMQP
	Host     string
	Port     int
	Username string
	Password string
	VHost    string
}

Config holds the configuration for a message broker.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a Config with sensible defaults for AMQP.

func (*Config) EmitCounter

func (c *Config) EmitCounter(name string, value float64, labels map[string]string)

EmitCounter records a counter metric if a metrics hook is configured.

func (*Config) EmitDuration

func (c *Config) EmitDuration(name string, value time.Duration, labels map[string]string)

EmitDuration records a duration metric if a metrics hook is configured.

func (*Config) EmitEvent

func (c *Config) EmitEvent(ctx context.Context, event Event)

EmitEvent sends an event to all configured observability sinks.

func (*Config) EmitHealth

func (c *Config) EmitHealth(ctx context.Context, report HealthReport)

EmitHealth sends a health report to all configured health sinks.

func (*Config) StartSpan

func (c *Config) StartSpan(ctx context.Context, span TraceSpanStart) (context.Context, TraceSpan)

StartSpan starts a trace span if tracing is configured.

func (*Config) WithAMQP

func (c *Config) WithAMQP(amqp *AMQPConfig) *Config

WithAMQP sets AMQP configuration and returns the config for chaining.

func (*Config) WithBackend

func (c *Config) WithBackend(backend string) *Config

WithBackend sets the backend and returns the config for chaining.

func (*Config) WithKafka

func (c *Config) WithKafka(kafka *KafkaConfig) *Config

WithKafka sets Kafka configuration and returns the config for chaining.

type Connector

type Connector interface {
	// Connect establishes a connection to the message broker.
	Connect(ctx context.Context) error

	// Close gracefully closes the connection to the broker.
	Close() error

	// IsConnected returns true if the broker connection is active.
	IsConnected() bool

	// IsRecovering returns true if the broker is actively recovering from a connection loss.
	// When recovering is true, publish/call/subscribe operations will fail.
	IsRecovering() bool

	// Backend returns the name of the backend (e.g., "amqp", "kafka").
	Backend() string
}

Connector defines connection lifecycle operations.

type DeadLetterEnvelope

type DeadLetterEnvelope struct {
	FailedAt    time.Time         `json:"failed_at"`
	Backend     string            `json:"backend,omitempty"`
	Destination string            `json:"destination,omitempty"`
	Handler     string            `json:"handler,omitempty"`
	Attempt     int               `json:"attempt,omitempty"`
	Error       ErrorPayload      `json:"error"`
	Original    DeadLetterMessage `json:"original"`
}

DeadLetterEnvelope is a transport-agnostic dead-letter payload format.

func DecodeDeadLetterMessage

func DecodeDeadLetterMessage(msg *Message) (DeadLetterEnvelope, error)

DecodeDeadLetterMessage decodes a dead-letter envelope from a message body.

func NewDeadLetterEnvelope

func NewDeadLetterEnvelope(msg *Message, err error, opts DeadLetterOptions) DeadLetterEnvelope

NewDeadLetterEnvelope builds a dead-letter envelope for a failed message.

func (DeadLetterEnvelope) Message

func (d DeadLetterEnvelope) Message() (*Message, error)

Message encodes the dead-letter envelope as a Weave message.

type DeadLetterMessage

type DeadLetterMessage struct {
	Body          []byte            `json:"body"`
	CorrelationID string            `json:"correlation_id,omitempty"`
	ReplyTo       string            `json:"reply_to,omitempty"`
	Headers       map[string]string `json:"headers,omitempty"`
	ContentType   string            `json:"content_type,omitempty"`
	MessageID     string            `json:"message_id,omitempty"`
	Timestamp     time.Time         `json:"timestamp,omitempty"`
	Subject       string            `json:"subject,omitempty"`
	Partition     int32             `json:"partition,omitempty"`
	Offset        int64             `json:"offset,omitempty"`
}

DeadLetterMessage captures the original message metadata for dead-lettering.

type DeadLetterOptions

type DeadLetterOptions struct {
	Backend     string
	Destination string
	Handler     string
	Attempt     int
	Code        string
	Retryable   bool
	Details     map[string]any
}

DeadLetterOptions annotates dead-letter envelopes with failure context.

type ErrCircuitOpen

type ErrCircuitOpen struct {
	Operation  string
	RetryAfter string
}

ErrCircuitOpen is returned when RPC retries are blocked by an open circuit breaker.

func (*ErrCircuitOpen) Error

func (e *ErrCircuitOpen) Error() string

type ErrConnectionFailed

type ErrConnectionFailed struct {
	Backend string
	Address string
	Cause   error
}

ErrConnectionFailed is returned when initial connection fails.

func (*ErrConnectionFailed) Error

func (e *ErrConnectionFailed) Error() string

func (*ErrConnectionFailed) Unwrap

func (e *ErrConnectionFailed) Unwrap() error

type ErrConnectionLost

type ErrConnectionLost struct {
	Backend string
	Cause   error
}

ErrConnectionLost is returned when the connection to the broker is lost.

func (*ErrConnectionLost) Error

func (e *ErrConnectionLost) Error() string

func (*ErrConnectionLost) Unwrap

func (e *ErrConnectionLost) Unwrap() error

type ErrNotConnected

type ErrNotConnected struct {
	Backend string
}

ErrNotConnected is returned when an operation is attempted on a disconnected broker.

func (*ErrNotConnected) Error

func (e *ErrNotConnected) Error() string

type ErrPublishFailed

type ErrPublishFailed struct {
	Backend     string
	Destination string
	Cause       error
}

ErrPublishFailed is returned when message publishing fails.

func (*ErrPublishFailed) Error

func (e *ErrPublishFailed) Error() string

func (*ErrPublishFailed) Unwrap

func (e *ErrPublishFailed) Unwrap() error

type ErrSubscribeFailed

type ErrSubscribeFailed struct {
	Backend     string
	Destination string
	Cause       error
}

ErrSubscribeFailed is returned when subscription fails.

func (*ErrSubscribeFailed) Error

func (e *ErrSubscribeFailed) Error() string

func (*ErrSubscribeFailed) Unwrap

func (e *ErrSubscribeFailed) Unwrap() error

type ErrTimeout

type ErrTimeout struct {
	Operation string
	Duration  string
}

ErrTimeout is returned when an operation times out.

func (*ErrTimeout) Error

func (e *ErrTimeout) Error() string

type ErrUnknownBackend

type ErrUnknownBackend struct {
	Backend string
}

ErrUnknownBackend is returned when an unknown backend is requested.

func (*ErrUnknownBackend) Error

func (e *ErrUnknownBackend) Error() string

type ErrUnsupportedOperation

type ErrUnsupportedOperation struct {
	Backend   string
	Operation string
}

ErrUnsupportedOperation is returned when a backend doesn't support an operation.

func (*ErrUnsupportedOperation) Error

func (e *ErrUnsupportedOperation) Error() string

type ErrorPayload

type ErrorPayload struct {
	Code          string         `json:"code,omitempty"`
	Message       string         `json:"message"`
	Retryable     bool           `json:"retryable,omitempty"`
	CorrelationID string         `json:"correlation_id,omitempty"`
	Details       map[string]any `json:"details,omitempty"`
	Timestamp     time.Time      `json:"timestamp"`
}

ErrorPayload is a standardized structured error body for RPC and dead-letter usage.

func DecodeErrorMessage

func DecodeErrorMessage(msg *Message) (ErrorPayload, error)

DecodeErrorMessage decodes a standardized structured error message.

func DecodeErrorPayload

func DecodeErrorPayload(data []byte) (ErrorPayload, error)

DecodeErrorPayload deserializes a structured error payload.

func NewErrorPayload

func NewErrorPayload(code string, err error) ErrorPayload

NewErrorPayload constructs a structured error payload with a stable timestamp.

type Event

type Event struct {
	Timestamp     time.Time
	Level         EventLevel
	Name          string
	Backend       string
	Component     string
	Operation     string
	Destination   string
	CorrelationID string
	Err           error
	Fields        map[string]any
}

Event is a structured signal emitted by runtime and transport layers.

type EventHook

type EventHook func(ctx context.Context, event Event)

EventHook receives structured events as a lightweight callback.

type EventLevel

type EventLevel string

EventLevel indicates the severity of an emitted event.

const (
	EventLevelDebug EventLevel = "debug"
	EventLevelInfo  EventLevel = "info"
	EventLevelWarn  EventLevel = "warn"
	EventLevelError EventLevel = "error"
)

type EventLogger

type EventLogger interface {
	LogEvent(ctx context.Context, event Event)
}

EventLogger receives structured events for logging.

type Handler

type Handler func(ctx context.Context, msg *Message) error

Handler is a function that processes incoming messages.

func RetryHandler

func RetryHandler(handler Handler, policy RetryPolicy) Handler

RetryHandler wraps a handler with bounded in-process retries. Use it when you want consistent application-level retry semantics across transports.

type HandlerErrorPolicy

type HandlerErrorPolicy string

HandlerErrorPolicy controls what happens when a handler returns an error.

const (
	// HandlerErrorNoRetry does not request transport-level retries.
	HandlerErrorNoRetry HandlerErrorPolicy = "no_retry"
	// HandlerErrorRetry requests transport-level retry behavior where supported.
	HandlerErrorRetry HandlerErrorPolicy = "retry"
)

type HealthHook

type HealthHook func(ctx context.Context, report HealthReport)

HealthHook receives health snapshots as a lightweight callback.

type HealthReport

type HealthReport struct {
	Timestamp time.Time
	Status    HealthStatus
	Backend   string
	Component string
	Connected bool
	Started   bool
	Details   map[string]any
}

HealthReport captures a runtime health snapshot.

type HealthReporter

type HealthReporter interface {
	ReportHealth(ctx context.Context, report HealthReport)
}

HealthReporter receives health snapshots for external reporting.

type HealthStatus

type HealthStatus string

HealthStatus represents coarse-grained component health.

const (
	HealthStatusHealthy   HealthStatus = "healthy"
	HealthStatusDegraded  HealthStatus = "degraded"
	HealthStatusUnhealthy HealthStatus = "unhealthy"
)

type KafkaConfig

type KafkaConfig struct {
	Brokers           []string
	ClientID          string
	ConsumerGroup     string
	TLS               *TLSConfig
	RequiredAcks      int
	MaxRetries        int
	RetryBackoff      time.Duration
	CompressionType   string
	AutoOffsetReset   string
	SessionTimeout    time.Duration
	HeartbeatInterval time.Duration
	SASL              *SASLConfig
}

KafkaConfig holds Apache Kafka-specific configuration.

func DefaultKafkaConfig

func DefaultKafkaConfig() *KafkaConfig

DefaultKafkaConfig returns default Kafka configuration.

type Message

type Message struct {
	Body          []byte
	CorrelationID string
	ReplyTo       string
	Headers       map[string]string
	ContentType   string
	MessageID     string
	Timestamp     time.Time
	Subject       string
	Partition     int32
	Offset        int64
}

Message represents a message to be sent or received from a message broker.

func NewErrorMessage

func NewErrorMessage(payload ErrorPayload) (*Message, error)

NewErrorMessage creates a message with the standardized structured error body.

func NewMessage

func NewMessage(body []byte) *Message

NewMessage creates a new Message with the given body.

func NewTextMessage

func NewTextMessage(body string) *Message

NewTextMessage creates a new Message with a string body.

func (*Message) BodyString

func (m *Message) BodyString() string

BodyString returns the body as a string.

func (*Message) Clone

func (m *Message) Clone() *Message

Clone creates a deep copy of the message.

func (*Message) GetHeader

func (m *Message) GetHeader(key string) string

GetHeader returns a header value, or empty string if not found.

func (*Message) WithContentType

func (m *Message) WithContentType(contentType string) *Message

WithContentType sets the content type and returns the message for chaining.

func (*Message) WithCorrelationID

func (m *Message) WithCorrelationID(id string) *Message

WithCorrelationID sets the correlation ID and returns the message for chaining.

func (*Message) WithHeader

func (m *Message) WithHeader(key, value string) *Message

WithHeader adds a header and returns the message for chaining.

func (*Message) WithReplyTo

func (m *Message) WithReplyTo(replyTo string) *Message

WithReplyTo sets the reply destination and returns the message for chaining.

func (*Message) WithSubject

func (m *Message) WithSubject(subject string) *Message

WithSubject sets the subject/routing key and returns the message for chaining.

type MessageBroker

type MessageBroker interface {
	Connector
	Publisher
	Caller
	Subscriber
}

MessageBroker defines the interface that all message broker backends must implement. It combines all broker capabilities: connection management, publishing, subscribing, and RPC.

func MustNew

func MustNew(config *Config) MessageBroker

MustNew creates a new MessageBroker or panics if creation fails.

func New

func New(config *Config) (MessageBroker, error)

New creates a new MessageBroker using the backend specified in config.

func NewWithBackend

func NewWithBackend(backend string, config *Config) (MessageBroker, error)

NewWithBackend creates a new MessageBroker with explicit backend name.

type MetricsHook

type MetricsHook interface {
	IncrementCounter(name string, value float64, labels map[string]string)
	ObserveDuration(name string, value time.Duration, labels map[string]string)
}

MetricsHook is an extension point for counters and duration measurements.

type PublishOption

type PublishOption func(*PublishOptions)

PublishOption is a functional option for configuring publish operations.

func WithExchange

func WithExchange(exchange string) PublishOption

WithExchange specifies the target exchange (AMQP).

func WithExpiration

func WithExpiration(ttl string) PublishOption

WithExpiration sets the message TTL.

func WithKey

func WithKey(key string) PublishOption

WithKey sets the message key for partitioning (Kafka).

func WithMandatory

func WithMandatory() PublishOption

WithMandatory requires message routing to a queue.

func WithPartition

func WithPartition(partition int) PublishOption

WithPartition specifies the target partition (Kafka).

func WithPersistent

func WithPersistent() PublishOption

WithPersistent makes the message persistent (AMQP delivery mode 2).

func WithPriority

func WithPriority(priority uint8) PublishOption

WithPriority sets the message priority (0-9).

func WithTimeout

func WithTimeout(timeout time.Duration) PublishOption

WithTimeout sets the publish timeout.

type PublishOptions

type PublishOptions struct {
	Timeout      time.Duration
	Mandatory    bool
	Immediate    bool
	Exchange     string
	Partition    int
	Key          string
	DeliveryMode uint8
	Priority     uint8
	Expiration   string
}

PublishOptions holds configuration for publish operations.

func ApplyPublishOptions

func ApplyPublishOptions(opts ...PublishOption) *PublishOptions

ApplyPublishOptions applies all functional options and returns the result.

type Publisher

type Publisher interface {
	// Publish sends a message to a destination (queue/topic/stream).
	Publish(ctx context.Context, destination string, message *Message, opts ...PublishOption) error
}

Publisher defines message publishing operations (fire-and-forget).

type RetryHook

type RetryHook func(ctx context.Context, attempt int, err error, delay time.Duration)

RetryHook observes each scheduled retry.

type RetryPolicy

type RetryPolicy struct {
	// MaxAttempts is the total number of attempts including the initial call.
	MaxAttempts int
	// Backoff calculates the delay before each retry attempt.
	Backoff BackoffStrategy
	// Retryable decides whether an error is retryable.
	Retryable RetryPredicate
	// OnRetry is called whenever another retry attempt is scheduled.
	OnRetry RetryHook
}

RetryPolicy controls transport-agnostic in-process retry behavior.

func DefaultRetryPolicy

func DefaultRetryPolicy() RetryPolicy

DefaultRetryPolicy returns a conservative retry policy suitable for transient handler failures.

type RetryPredicate

type RetryPredicate func(error) bool

RetryPredicate decides whether another retry should be attempted.

type SASLConfig

type SASLConfig struct {
	Enable    bool
	Mechanism string
	Username  string
	Password  string
}

SASLConfig holds SASL authentication configuration (for Kafka).

type Server

type Server interface {
	Connector
	Subscriber
}

Server is a subset of MessageBroker for server-only operations. Use this interface when you only need to subscribe to messages, without publishing (client-side).

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

SubscribeOption is a functional option for configuring subscriptions.

func WithAutoAck

func WithAutoAck() SubscribeOption

WithAutoAck enables automatic message acknowledgment.

func WithConsumerGroup

func WithConsumerGroup(group string) SubscribeOption

WithConsumerGroup sets the consumer group (Kafka/Redis/NATS).

func WithConsumerTag

func WithConsumerTag(tag string) SubscribeOption

WithConsumerTag sets the consumer identifier.

func WithExclusive

func WithExclusive() SubscribeOption

WithExclusive makes the subscription exclusive to this consumer.

func WithHandlerErrorNoRetry

func WithHandlerErrorNoRetry() SubscribeOption

WithHandlerErrorNoRetry disables transport-level retry when a handler returns an error.

func WithHandlerErrorRetry

func WithHandlerErrorRetry() SubscribeOption

WithHandlerErrorRetry requests transport-level retry when a handler returns an error.

func WithPrefetchCount

func WithPrefetchCount(count int) SubscribeOption

WithPrefetchCount sets the prefetch limit.

func WithQueueBind

func WithQueueBind(exchange, routingKey string) SubscribeOption

WithQueueBind configures exchange binding (AMQP-specific).

func WithStartFromBeginning

func WithStartFromBeginning() SubscribeOption

WithStartFromBeginning starts consuming from the earliest offset.

type SubscribeOptions

type SubscribeOptions struct {
	AutoAck            bool
	Exclusive          bool
	ConsumerTag        string
	PrefetchCount      int
	QueueBind          bool
	Exchange           string
	RoutingKey         string
	ConsumerGroup      string
	StartFromBeginning bool
	HandlerErrorPolicy HandlerErrorPolicy
}

SubscribeOptions holds configuration for subscription operations.

func ApplySubscribeOptions

func ApplySubscribeOptions(opts ...SubscribeOption) *SubscribeOptions

ApplySubscribeOptions applies all functional options and returns the result.

type Subscriber

type Subscriber interface {
	// Subscribe starts consuming messages from a destination.
	Subscribe(ctx context.Context, destination string, handler Handler, opts ...SubscribeOption) error
}

Subscriber defines message subscription operations (server-side consumption).

type TLSConfig

type TLSConfig struct {
	Enable             bool
	CertFile           string
	KeyFile            string
	CAFile             string
	InsecureSkipVerify bool
}

TLSConfig holds TLS/SSL configuration.

type TraceSpan

type TraceSpan interface {
	Finish(finish TraceSpanFinish)
}

TraceSpan represents an in-flight span.

type TraceSpanFinish

type TraceSpanFinish struct {
	Err        error
	Attributes map[string]string
}

TraceSpanFinish describes how a span completed.

type TraceSpanStart

type TraceSpanStart struct {
	Name          string
	Backend       string
	Component     string
	Operation     string
	Destination   string
	CorrelationID string
	Attributes    map[string]string
}

TraceSpanStart describes a span that is about to begin.

type TracingHook

type TracingHook interface {
	StartSpan(ctx context.Context, span TraceSpanStart) (context.Context, TraceSpan)
}

TracingHook is an extension point for tracing span lifecycle callbacks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL