Documentation
¶
Overview ¶
Package core provides the fundamental interfaces, types, and errors for Weave.
Index ¶
- Constants
- Variables
- func AvailableBackends() []string
- func EncodeErrorPayload(payload ErrorPayload) ([]byte, error)
- func IsBackendAvailable(name string) bool
- func IsCircuitOpen(err error) bool
- func IsConnectionLost(err error) bool
- func IsNotConnected(err error) bool
- func IsTimeout(err error) bool
- func IsUnknownBackend(err error) bool
- func IsUnsupportedOperation(err error) bool
- func Register(name string, factory BrokerFactory)
- func RetryAttempt(ctx context.Context) int
- type AMQPConfig
- type BackoffStrategy
- type BrokerFactory
- type Caller
- type Client
- type Config
- func (c *Config) EmitCounter(name string, value float64, labels map[string]string)
- func (c *Config) EmitDuration(name string, value time.Duration, labels map[string]string)
- func (c *Config) EmitEvent(ctx context.Context, event Event)
- func (c *Config) EmitHealth(ctx context.Context, report HealthReport)
- func (c *Config) StartSpan(ctx context.Context, span TraceSpanStart) (context.Context, TraceSpan)
- func (c *Config) WithAMQP(amqp *AMQPConfig) *Config
- func (c *Config) WithBackend(backend string) *Config
- func (c *Config) WithKafka(kafka *KafkaConfig) *Config
- type Connector
- type DeadLetterEnvelope
- type DeadLetterMessage
- type DeadLetterOptions
- type ErrCircuitOpen
- type ErrConnectionFailed
- type ErrConnectionLost
- type ErrNotConnected
- type ErrPublishFailed
- type ErrSubscribeFailed
- type ErrTimeout
- type ErrUnknownBackend
- type ErrUnsupportedOperation
- type ErrorPayload
- type Event
- type EventHook
- type EventLevel
- type EventLogger
- type Handler
- type HandlerErrorPolicy
- type HealthHook
- type HealthReport
- type HealthReporter
- type HealthStatus
- type KafkaConfig
- type Message
- func (m *Message) BodyString() string
- func (m *Message) Clone() *Message
- func (m *Message) GetHeader(key string) string
- func (m *Message) WithContentType(contentType string) *Message
- func (m *Message) WithCorrelationID(id string) *Message
- func (m *Message) WithHeader(key, value string) *Message
- func (m *Message) WithReplyTo(replyTo string) *Message
- func (m *Message) WithSubject(subject string) *Message
- type MessageBroker
- type MetricsHook
- type PublishOption
- func WithExchange(exchange string) PublishOption
- func WithExpiration(ttl string) PublishOption
- func WithKey(key string) PublishOption
- func WithMandatory() PublishOption
- func WithPartition(partition int) PublishOption
- func WithPersistent() PublishOption
- func WithPriority(priority uint8) PublishOption
- func WithTimeout(timeout time.Duration) PublishOption
- type PublishOptions
- type Publisher
- type RetryHook
- type RetryPolicy
- type RetryPredicate
- type SASLConfig
- type Server
- type SubscribeOption
- func WithAutoAck() SubscribeOption
- func WithConsumerGroup(group string) SubscribeOption
- func WithConsumerTag(tag string) SubscribeOption
- func WithExclusive() SubscribeOption
- func WithHandlerErrorNoRetry() SubscribeOption
- func WithHandlerErrorRetry() SubscribeOption
- func WithPrefetchCount(count int) SubscribeOption
- func WithQueueBind(exchange, routingKey string) SubscribeOption
- func WithStartFromBeginning() SubscribeOption
- type SubscribeOptions
- type Subscriber
- type TLSConfig
- type TraceSpan
- type TraceSpanFinish
- type TraceSpanStart
- type TracingHook
Constants ¶
const ( // ErrorContentType is the standard content type for structured error payloads. ErrorContentType = "application/vnd.weave.error+json" // DeadLetterContentType is the standard content type for dead-letter envelopes. DeadLetterContentType = "application/vnd.weave.dead-letter+json" )
const ( EventConnect = "connect" EventDisconnect = "disconnect" EventPublishFailed = "publish_failed" EventSubscribeFailed = "subscribe_failed" EventTimeout = "timeout" )
Variables ¶
var ( ErrClosed = errors.New("broker is closed") ErrNoReplyTo = errors.New("no reply-to destination specified") ErrAlreadyConnected = errors.New("broker is already connected") ErrInvalidConfig = errors.New("invalid configuration") )
Sentinel errors for common conditions.
Functions ¶
func AvailableBackends ¶
func AvailableBackends() []string
AvailableBackends returns a sorted list of registered backend names.
func EncodeErrorPayload ¶
func EncodeErrorPayload(payload ErrorPayload) ([]byte, error)
EncodeErrorPayload serializes a structured error payload.
func IsBackendAvailable ¶
IsBackendAvailable checks if a backend is registered.
func IsCircuitOpen ¶
IsCircuitOpen returns true if the error indicates an open circuit breaker.
func IsConnectionLost ¶
IsConnectionLost returns true if the error indicates the connection was lost.
func IsNotConnected ¶
IsNotConnected returns true if the error indicates the broker is not connected.
func IsUnknownBackend ¶
IsUnknownBackend returns true if the error indicates an unknown backend.
func IsUnsupportedOperation ¶
IsUnsupportedOperation returns true if the error indicates an unsupported operation.
func Register ¶
func Register(name string, factory BrokerFactory)
Register registers a broker factory for a backend name.
func RetryAttempt ¶
RetryAttempt returns the current attempt number from context. The first attempt is 1. A missing value also reports 1.
Types ¶
type AMQPConfig ¶
type AMQPConfig struct {
Host string
Port int
Username string
Password string
VHost string
Heartbeat time.Duration
TLS *TLSConfig
Exchange string
ExchangeType string
QueueDurable bool
QueueAutoDelete bool
QueueExclusive bool
}
AMQPConfig holds AMQP/RabbitMQ-specific configuration.
func DefaultAMQPConfig ¶
func DefaultAMQPConfig() *AMQPConfig
DefaultAMQPConfig returns default AMQP configuration.
type BackoffStrategy ¶
BackoffStrategy returns the delay to wait before the next retry attempt. The attempt argument is one-based and represents the retry count after the initial attempt has already failed.
func ExponentialBackoff ¶
func ExponentialBackoff(initialDelay, maxDelay time.Duration, multiplier float64) BackoffStrategy
ExponentialBackoff returns an exponential backoff strategy capped at maxDelay.
func FixedBackoff ¶
func FixedBackoff(delay time.Duration) BackoffStrategy
FixedBackoff returns a backoff strategy that always returns the same delay.
type BrokerFactory ¶
type BrokerFactory func(config *Config) (MessageBroker, error)
BrokerFactory is a function that creates a new MessageBroker instance.
type Caller ¶
type Caller interface {
// Call performs a synchronous request-reply operation.
Call(ctx context.Context, destination string, message *Message, opts ...PublishOption) (*Message, error)
}
Caller defines request-reply operations (synchronous RPC).
type Client ¶
Client is a subset of MessageBroker for client-only operations. Use this interface when you only need to publish messages or make RPC calls, without subscribing to messages (server-side).
type Config ¶
type Config struct {
Backend string
ConnectionName string
ConnectionRetry int
RetryDelay time.Duration
// Logger receives structured runtime and transport events.
Logger EventLogger
// EventHook receives structured runtime and transport events.
EventHook EventHook
// Metrics receives basic counters and duration measurements.
Metrics MetricsHook
// Tracing receives span lifecycle callbacks for high-level runtime operations.
Tracing TracingHook
// HealthReporter receives runtime health snapshots.
HealthReporter HealthReporter
// HealthHook receives runtime health snapshots as a callback.
HealthHook HealthHook
AMQP *AMQPConfig
Kafka *KafkaConfig
// Shorthand for AMQP
Host string
Port int
Username string
Password string
VHost string
}
Config holds the configuration for a message broker.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a Config with sensible defaults for AMQP.
func (*Config) EmitCounter ¶
EmitCounter records a counter metric if a metrics hook is configured.
func (*Config) EmitDuration ¶
EmitDuration records a duration metric if a metrics hook is configured.
func (*Config) EmitHealth ¶
func (c *Config) EmitHealth(ctx context.Context, report HealthReport)
EmitHealth sends a health report to all configured health sinks.
func (*Config) WithAMQP ¶
func (c *Config) WithAMQP(amqp *AMQPConfig) *Config
WithAMQP sets AMQP configuration and returns the config for chaining.
func (*Config) WithBackend ¶
WithBackend sets the backend and returns the config for chaining.
func (*Config) WithKafka ¶
func (c *Config) WithKafka(kafka *KafkaConfig) *Config
WithKafka sets Kafka configuration and returns the config for chaining.
type Connector ¶
type Connector interface {
// Connect establishes a connection to the message broker.
Connect(ctx context.Context) error
// Close gracefully closes the connection to the broker.
Close() error
// IsConnected returns true if the broker connection is active.
IsConnected() bool
// IsRecovering returns true if the broker is actively recovering from a connection loss.
// When recovering is true, publish/call/subscribe operations will fail.
IsRecovering() bool
// Backend returns the name of the backend (e.g., "amqp", "kafka").
Backend() string
}
Connector defines connection lifecycle operations.
type DeadLetterEnvelope ¶
type DeadLetterEnvelope struct {
FailedAt time.Time `json:"failed_at"`
Backend string `json:"backend,omitempty"`
Destination string `json:"destination,omitempty"`
Handler string `json:"handler,omitempty"`
Attempt int `json:"attempt,omitempty"`
Error ErrorPayload `json:"error"`
Original DeadLetterMessage `json:"original"`
}
DeadLetterEnvelope is a transport-agnostic dead-letter payload format.
func DecodeDeadLetterMessage ¶
func DecodeDeadLetterMessage(msg *Message) (DeadLetterEnvelope, error)
DecodeDeadLetterMessage decodes a dead-letter envelope from a message body.
func NewDeadLetterEnvelope ¶
func NewDeadLetterEnvelope(msg *Message, err error, opts DeadLetterOptions) DeadLetterEnvelope
NewDeadLetterEnvelope builds a dead-letter envelope for a failed message.
func (DeadLetterEnvelope) Message ¶
func (d DeadLetterEnvelope) Message() (*Message, error)
Message encodes the dead-letter envelope as a Weave message.
type DeadLetterMessage ¶
type DeadLetterMessage struct {
Body []byte `json:"body"`
CorrelationID string `json:"correlation_id,omitempty"`
ReplyTo string `json:"reply_to,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
ContentType string `json:"content_type,omitempty"`
MessageID string `json:"message_id,omitempty"`
Timestamp time.Time `json:"timestamp,omitempty"`
Subject string `json:"subject,omitempty"`
Partition int32 `json:"partition,omitempty"`
Offset int64 `json:"offset,omitempty"`
}
DeadLetterMessage captures the original message metadata for dead-lettering.
type DeadLetterOptions ¶
type DeadLetterOptions struct {
Backend string
Destination string
Handler string
Attempt int
Code string
Retryable bool
Details map[string]any
}
DeadLetterOptions annotates dead-letter envelopes with failure context.
type ErrCircuitOpen ¶
ErrCircuitOpen is returned when RPC retries are blocked by an open circuit breaker.
func (*ErrCircuitOpen) Error ¶
func (e *ErrCircuitOpen) Error() string
type ErrConnectionFailed ¶
ErrConnectionFailed is returned when initial connection fails.
func (*ErrConnectionFailed) Error ¶
func (e *ErrConnectionFailed) Error() string
func (*ErrConnectionFailed) Unwrap ¶
func (e *ErrConnectionFailed) Unwrap() error
type ErrConnectionLost ¶
ErrConnectionLost is returned when the connection to the broker is lost.
func (*ErrConnectionLost) Error ¶
func (e *ErrConnectionLost) Error() string
func (*ErrConnectionLost) Unwrap ¶
func (e *ErrConnectionLost) Unwrap() error
type ErrNotConnected ¶
type ErrNotConnected struct {
Backend string
}
ErrNotConnected is returned when an operation is attempted on a disconnected broker.
func (*ErrNotConnected) Error ¶
func (e *ErrNotConnected) Error() string
type ErrPublishFailed ¶
ErrPublishFailed is returned when message publishing fails.
func (*ErrPublishFailed) Error ¶
func (e *ErrPublishFailed) Error() string
func (*ErrPublishFailed) Unwrap ¶
func (e *ErrPublishFailed) Unwrap() error
type ErrSubscribeFailed ¶
ErrSubscribeFailed is returned when subscription fails.
func (*ErrSubscribeFailed) Error ¶
func (e *ErrSubscribeFailed) Error() string
func (*ErrSubscribeFailed) Unwrap ¶
func (e *ErrSubscribeFailed) Unwrap() error
type ErrTimeout ¶
ErrTimeout is returned when an operation times out.
func (*ErrTimeout) Error ¶
func (e *ErrTimeout) Error() string
type ErrUnknownBackend ¶
type ErrUnknownBackend struct {
Backend string
}
ErrUnknownBackend is returned when an unknown backend is requested.
func (*ErrUnknownBackend) Error ¶
func (e *ErrUnknownBackend) Error() string
type ErrUnsupportedOperation ¶
ErrUnsupportedOperation is returned when a backend doesn't support an operation.
func (*ErrUnsupportedOperation) Error ¶
func (e *ErrUnsupportedOperation) Error() string
type ErrorPayload ¶
type ErrorPayload struct {
Code string `json:"code,omitempty"`
Message string `json:"message"`
Retryable bool `json:"retryable,omitempty"`
CorrelationID string `json:"correlation_id,omitempty"`
Details map[string]any `json:"details,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
ErrorPayload is a standardized structured error body for RPC and dead-letter usage.
func DecodeErrorMessage ¶
func DecodeErrorMessage(msg *Message) (ErrorPayload, error)
DecodeErrorMessage decodes a standardized structured error message.
func DecodeErrorPayload ¶
func DecodeErrorPayload(data []byte) (ErrorPayload, error)
DecodeErrorPayload deserializes a structured error payload.
func NewErrorPayload ¶
func NewErrorPayload(code string, err error) ErrorPayload
NewErrorPayload constructs a structured error payload with a stable timestamp.
type Event ¶
type Event struct {
Timestamp time.Time
Level EventLevel
Name string
Backend string
Component string
Operation string
Destination string
CorrelationID string
Err error
Fields map[string]any
}
Event is a structured signal emitted by runtime and transport layers.
type EventLevel ¶
type EventLevel string
EventLevel indicates the severity of an emitted event.
const ( EventLevelDebug EventLevel = "debug" EventLevelInfo EventLevel = "info" EventLevelWarn EventLevel = "warn" EventLevelError EventLevel = "error" )
type EventLogger ¶
EventLogger receives structured events for logging.
type Handler ¶
Handler is a function that processes incoming messages.
func RetryHandler ¶
func RetryHandler(handler Handler, policy RetryPolicy) Handler
RetryHandler wraps a handler with bounded in-process retries. Use it when you want consistent application-level retry semantics across transports.
type HandlerErrorPolicy ¶
type HandlerErrorPolicy string
HandlerErrorPolicy controls what happens when a handler returns an error.
const ( // HandlerErrorNoRetry does not request transport-level retries. HandlerErrorNoRetry HandlerErrorPolicy = "no_retry" // HandlerErrorRetry requests transport-level retry behavior where supported. HandlerErrorRetry HandlerErrorPolicy = "retry" )
type HealthHook ¶
type HealthHook func(ctx context.Context, report HealthReport)
HealthHook receives health snapshots as a lightweight callback.
type HealthReport ¶
type HealthReport struct {
Timestamp time.Time
Status HealthStatus
Backend string
Component string
Connected bool
Started bool
Details map[string]any
}
HealthReport captures a runtime health snapshot.
type HealthReporter ¶
type HealthReporter interface {
ReportHealth(ctx context.Context, report HealthReport)
}
HealthReporter receives health snapshots for external reporting.
type HealthStatus ¶
type HealthStatus string
HealthStatus represents coarse-grained component health.
const ( HealthStatusHealthy HealthStatus = "healthy" HealthStatusDegraded HealthStatus = "degraded" HealthStatusUnhealthy HealthStatus = "unhealthy" )
type KafkaConfig ¶
type KafkaConfig struct {
Brokers []string
ClientID string
ConsumerGroup string
TLS *TLSConfig
RequiredAcks int
MaxRetries int
RetryBackoff time.Duration
CompressionType string
AutoOffsetReset string
SessionTimeout time.Duration
HeartbeatInterval time.Duration
SASL *SASLConfig
}
KafkaConfig holds Apache Kafka-specific configuration.
func DefaultKafkaConfig ¶
func DefaultKafkaConfig() *KafkaConfig
DefaultKafkaConfig returns default Kafka configuration.
type Message ¶
type Message struct {
Body []byte
CorrelationID string
ReplyTo string
Headers map[string]string
ContentType string
MessageID string
Timestamp time.Time
Subject string
Partition int32
Offset int64
}
Message represents a message to be sent or received from a message broker.
func NewErrorMessage ¶
func NewErrorMessage(payload ErrorPayload) (*Message, error)
NewErrorMessage creates a message with the standardized structured error body.
func NewMessage ¶
NewMessage creates a new Message with the given body.
func NewTextMessage ¶
NewTextMessage creates a new Message with a string body.
func (*Message) BodyString ¶
BodyString returns the body as a string.
func (*Message) WithContentType ¶
WithContentType sets the content type and returns the message for chaining.
func (*Message) WithCorrelationID ¶
WithCorrelationID sets the correlation ID and returns the message for chaining.
func (*Message) WithHeader ¶
WithHeader adds a header and returns the message for chaining.
func (*Message) WithReplyTo ¶
WithReplyTo sets the reply destination and returns the message for chaining.
func (*Message) WithSubject ¶
WithSubject sets the subject/routing key and returns the message for chaining.
type MessageBroker ¶
type MessageBroker interface {
Connector
Publisher
Caller
Subscriber
}
MessageBroker defines the interface that all message broker backends must implement. It combines all broker capabilities: connection management, publishing, subscribing, and RPC.
func MustNew ¶
func MustNew(config *Config) MessageBroker
MustNew creates a new MessageBroker or panics if creation fails.
func New ¶
func New(config *Config) (MessageBroker, error)
New creates a new MessageBroker using the backend specified in config.
func NewWithBackend ¶
func NewWithBackend(backend string, config *Config) (MessageBroker, error)
NewWithBackend creates a new MessageBroker with explicit backend name.
type MetricsHook ¶
type MetricsHook interface {
IncrementCounter(name string, value float64, labels map[string]string)
ObserveDuration(name string, value time.Duration, labels map[string]string)
}
MetricsHook is an extension point for counters and duration measurements.
type PublishOption ¶
type PublishOption func(*PublishOptions)
PublishOption is a functional option for configuring publish operations.
func WithExchange ¶
func WithExchange(exchange string) PublishOption
WithExchange specifies the target exchange (AMQP).
func WithExpiration ¶
func WithExpiration(ttl string) PublishOption
WithExpiration sets the message TTL.
func WithKey ¶
func WithKey(key string) PublishOption
WithKey sets the message key for partitioning (Kafka).
func WithMandatory ¶
func WithMandatory() PublishOption
WithMandatory requires message routing to a queue.
func WithPartition ¶
func WithPartition(partition int) PublishOption
WithPartition specifies the target partition (Kafka).
func WithPersistent ¶
func WithPersistent() PublishOption
WithPersistent makes the message persistent (AMQP delivery mode 2).
func WithPriority ¶
func WithPriority(priority uint8) PublishOption
WithPriority sets the message priority (0-9).
func WithTimeout ¶
func WithTimeout(timeout time.Duration) PublishOption
WithTimeout sets the publish timeout.
type PublishOptions ¶
type PublishOptions struct {
Timeout time.Duration
Mandatory bool
Immediate bool
Exchange string
Partition int
Key string
DeliveryMode uint8
Priority uint8
Expiration string
}
PublishOptions holds configuration for publish operations.
func ApplyPublishOptions ¶
func ApplyPublishOptions(opts ...PublishOption) *PublishOptions
ApplyPublishOptions applies all functional options and returns the result.
type Publisher ¶
type Publisher interface {
// Publish sends a message to a destination (queue/topic/stream).
Publish(ctx context.Context, destination string, message *Message, opts ...PublishOption) error
}
Publisher defines message publishing operations (fire-and-forget).
type RetryPolicy ¶
type RetryPolicy struct {
// MaxAttempts is the total number of attempts including the initial call.
MaxAttempts int
// Backoff calculates the delay before each retry attempt.
Backoff BackoffStrategy
// Retryable decides whether an error is retryable.
Retryable RetryPredicate
// OnRetry is called whenever another retry attempt is scheduled.
OnRetry RetryHook
}
RetryPolicy controls transport-agnostic in-process retry behavior.
func DefaultRetryPolicy ¶
func DefaultRetryPolicy() RetryPolicy
DefaultRetryPolicy returns a conservative retry policy suitable for transient handler failures.
type RetryPredicate ¶
RetryPredicate decides whether another retry should be attempted.
type SASLConfig ¶
SASLConfig holds SASL authentication configuration (for Kafka).
type Server ¶
type Server interface {
Connector
Subscriber
}
Server is a subset of MessageBroker for server-only operations. Use this interface when you only need to subscribe to messages, without publishing (client-side).
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
SubscribeOption is a functional option for configuring subscriptions.
func WithAutoAck ¶
func WithAutoAck() SubscribeOption
WithAutoAck enables automatic message acknowledgment.
func WithConsumerGroup ¶
func WithConsumerGroup(group string) SubscribeOption
WithConsumerGroup sets the consumer group (Kafka/Redis/NATS).
func WithConsumerTag ¶
func WithConsumerTag(tag string) SubscribeOption
WithConsumerTag sets the consumer identifier.
func WithExclusive ¶
func WithExclusive() SubscribeOption
WithExclusive makes the subscription exclusive to this consumer.
func WithHandlerErrorNoRetry ¶
func WithHandlerErrorNoRetry() SubscribeOption
WithHandlerErrorNoRetry disables transport-level retry when a handler returns an error.
func WithHandlerErrorRetry ¶
func WithHandlerErrorRetry() SubscribeOption
WithHandlerErrorRetry requests transport-level retry when a handler returns an error.
func WithPrefetchCount ¶
func WithPrefetchCount(count int) SubscribeOption
WithPrefetchCount sets the prefetch limit.
func WithQueueBind ¶
func WithQueueBind(exchange, routingKey string) SubscribeOption
WithQueueBind configures exchange binding (AMQP-specific).
func WithStartFromBeginning ¶
func WithStartFromBeginning() SubscribeOption
WithStartFromBeginning starts consuming from the earliest offset.
type SubscribeOptions ¶
type SubscribeOptions struct {
AutoAck bool
Exclusive bool
ConsumerTag string
PrefetchCount int
QueueBind bool
Exchange string
RoutingKey string
ConsumerGroup string
StartFromBeginning bool
HandlerErrorPolicy HandlerErrorPolicy
}
SubscribeOptions holds configuration for subscription operations.
func ApplySubscribeOptions ¶
func ApplySubscribeOptions(opts ...SubscribeOption) *SubscribeOptions
ApplySubscribeOptions applies all functional options and returns the result.
type Subscriber ¶
type Subscriber interface {
// Subscribe starts consuming messages from a destination.
Subscribe(ctx context.Context, destination string, handler Handler, opts ...SubscribeOption) error
}
Subscriber defines message subscription operations (server-side consumption).
type TLSConfig ¶
type TLSConfig struct {
Enable bool
CertFile string
KeyFile string
CAFile string
InsecureSkipVerify bool
}
TLSConfig holds TLS/SSL configuration.
type TraceSpan ¶
type TraceSpan interface {
Finish(finish TraceSpanFinish)
}
TraceSpan represents an in-flight span.
type TraceSpanFinish ¶
TraceSpanFinish describes how a span completed.
type TraceSpanStart ¶
type TraceSpanStart struct {
Name string
Backend string
Component string
Operation string
Destination string
CorrelationID string
Attributes map[string]string
}
TraceSpanStart describes a span that is about to begin.
type TracingHook ¶
type TracingHook interface {
StartSpan(ctx context.Context, span TraceSpanStart) (context.Context, TraceSpan)
}
TracingHook is an extension point for tracing span lifecycle callbacks.