weave

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2026 License: Apache-2.0 Imports: 3 Imported by: 0

README

Weave

Go Reference License Go Report Card

A unified abstraction layer for building message-driven microservices across message queue backends. Today, the implemented transports are RabbitMQ/AMQP and Apache Kafka.

Feature Status

Feature Status Details
Client & Server abstractions ✅ Stable Documented API with automated unit and integration-style coverage
AMQP (RabbitMQ) transport ✅ Stable Covered by automated transport, reconnect, and race-detector tests
Kafka transport ✅ Stable Covered by automated transport, reconnect, and race-detector tests
Connection retry (basic) ✅ Stable Implemented for both backends
Observability hooks ✅ Stable Logging, metrics, and tracing extension points
Request-reply RPC ✅ Stable Core feature
Timeout support ✅ Stable Context-based
Testing utilities ✅ Stable Mock broker available
Connection recovery & reconnect ✅ Stable Recovery paths are covered by automated reconnect tests; semantics differ by backend
Protobuf integration ✅ Stable Built-in protobuf codec and codec-aware helpers; no typed APIs
Dead-letter handling ✅ Stable Standard dead-letter envelope helpers are implemented; broker-native routing remains backend-specific
Worker pool abstractions 📋 Planned Future enhancement
Schema validation helpers 📋 Planned Future enhancement
Tracing integration ✅ Stable Hook-based span integration with adapter examples
Health check endpoints 📋 Planned Future enhancement
Additional backends (NATS, Redis, etc.) 📋 Reserved Design-phase only, not under development

Status Legend:

  • ✅ Stable: Implemented, documented, and covered by automated validation for the currently supported surface
  • 🔶 In Progress: Partially implemented, testing in progress
  • 📋 Planned: Designed but not yet implemented
  • 📦 Reserved: Considered for future work but not yet designed

Features (Stable)

  • 🔌 Multi-Backend - Single API for AMQP (RabbitMQ) and Apache Kafka
  • 🖥️ Client & Server - Dedicated abstractions for both client and server roles
  • 🚀 Simple API - REST-like request-response pattern over any message queue
  • ⏱️ Timeout Support - Context-based timeout handling for all requests
  • 📊 Connection Management - Automatic connection retry (basic) and monitoring
  • 🎯 Flexible Payloads - Works with JSON, Protocol Buffers, and other payload formats via Message.Body
  • 🧪 Testable - Built-in mock broker for unit testing
  • 📊 Concurrent - Handles multiple concurrent requests efficiently
  • 🧰 Extensible - Registry-based transport design for adding future backends
  • 🔍 Observability - Structured logging, metrics, and tracing hooks

Installation

go get github.com/prabhatdotdev/weave@v0.1.0

For release history and support expectations, see CHANGELOG.md and COMPATIBILITY.md.

Import the transport(s) you need:

import (
    "github.com/prabhatdotdev/weave"
    
    // Import one or more implemented transports
    _ "github.com/prabhatdotdev/weave/transport/amqp"   // RabbitMQ
    _ "github.com/prabhatdotdev/weave/transport/kafka"  // Apache Kafka
)

Quick Start

Weave provides two high-level abstractions:

  • Client - For sending messages and making RPC calls (no subscriptions)
  • Server - For subscribing to queues/topics and handling incoming messages

Payload format is intentionally transport-agnostic. Weave now ships built-in JSON and Protocol Buffers codecs, plus helpers like weave.MarshalMessage(...), weave.UnmarshalMessage(...), Client.PublishWithCodec(...), and Client.CallWithCodec(...) so you can keep serialization concerns out of most call sites.

Client Example (Sending Messages / RPC)

Use Client when your application only needs to send messages:

package main

import (
    "context"
    "fmt"
    "log"
    "time"
    
    "github.com/prabhatdotdev/weave"
    _ "github.com/prabhatdotdev/weave/transport/amqp"
)

func main() {
    // Create client
    client, err := weave.NewClient(weave.DefaultConfig())
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()
    
    ctx := context.Background()
    if err := client.Connect(ctx); err != nil {
        log.Fatal(err)
    }

    // Fire-and-forget publish
    msg := weave.NewMessage([]byte(`{"order_id": 123}`))
    if err := client.Publish(ctx, "orders", msg); err != nil {
        log.Fatal(err)
    }
    fmt.Println("Order sent!")

    // Request-reply RPC call
    request := weave.NewMessage([]byte(`{"user_id": 456}`))
    response, err := client.Call(ctx, "users.get", request, 
        weave.WithTimeout(5*time.Second))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("User data: %s\n", string(response.Body))
}
Server Example (Handling Messages)

Use Server when building message-driven services:

package main

import (
    "context"
    "fmt"
    "log"
    
    "github.com/prabhatdotdev/weave"
    _ "github.com/prabhatdotdev/weave/transport/amqp"
)

func main() {
    // Create server
    server, err := weave.NewServer(weave.DefaultConfig())
    if err != nil {
        log.Fatal(err)
    }
    
    // Register handlers
    server.Handle("orders", func(ctx context.Context, msg *weave.Message) error {
        fmt.Printf("Processing order: %s\n", string(msg.Body))
        return nil
    })
    
    server.Handle("users.get", func(ctx context.Context, msg *weave.Message) error {
        fmt.Printf("User request: %s\n", string(msg.Body))
        // Reply is handled automatically via ReplyTo field
        return nil
    })

    // Start server (connects and subscribes to all handlers)
    if err := server.Start(context.Background()); err != nil {
        log.Fatal(err)
    }
    defer server.Stop()
    
    fmt.Println("Server running...")
    select {} // Keep running
}

Architecture

Client vs Server
Feature Client Server
Connect()
Close() ✅ (via Stop())
Publish()
Call() (RPC)
Subscribe() ✅ (via Handle())
Use case Send messages, make RPC calls Handle incoming messages
When to Use What
  • Use Client when your application:

    • Only sends messages (e.g., a web API publishing events)
    • Makes RPC calls to other services
    • Doesn't need to handle incoming messages
  • Use Server when your application:

    • Processes incoming messages from queues/topics
    • Implements a microservice that handles requests
    • Needs to subscribe to multiple destinations
  • Use raw MessageBroker when you need:

    • Full control over both client and server operations
    • Direct access to all broker methods
    • Custom connection management

Package Structure

weave/
├── weave.go           # Root package - re-exports core types
├── core/              # Core interfaces and types
│   ├── broker.go      # MessageBroker interface (Connector, Publisher, Caller, Subscriber)
│   ├── message.go     # Message type
│   ├── config.go      # Configuration types
│   ├── options.go     # Functional options
│   ├── errors.go      # Error types
│   └── registry.go    # Backend registry
├── transport/         # Transport implementations
│   ├── amqp/          # RabbitMQ/AMQP transport
│   └── kafka/         # Apache Kafka transport
├── runtime/           # High-level abstractions
│   ├── client.go      # Client (publish, call only)
│   └── service.go     # Server (subscribe, handle)
├── codec/             # Message encoding/decoding
│   └── codec.go       # JSON + protobuf codecs
└── testkit/           # Testing utilities
    └── mock.go        # Mock broker for testing

Interface Composition

Weave uses composable interfaces for flexibility:

// Connector - connection lifecycle
type Connector interface {
    Connect(ctx context.Context) error
    Close() error
    IsConnected() bool
    Backend() string
}

// Publisher - fire-and-forget messaging
type Publisher interface {
    Publish(ctx context.Context, destination string, message *Message, opts ...PublishOption) error
}

// Caller - request-reply RPC
type Caller interface {
    Call(ctx context.Context, destination string, message *Message, opts ...PublishOption) (*Message, error)
}

// Subscriber - message consumption
type Subscriber interface {
    Subscribe(ctx context.Context, destination string, handler Handler, opts ...SubscribeOption) error
}

// MessageBroker combines all interfaces
type MessageBroker interface {
    Connector
    Publisher
    Caller
    Subscriber
}

// Client interface (no Subscribe)
type Client interface {
    Connector
    Publisher
    Caller
}

// Server interface (no Publish/Call)
type Server interface {
    Connector
    Subscriber
}

This allows type-safe function signatures:

// Function that only needs to publish
func SendNotification(pub weave.Publisher, msg *weave.Message) error {
    return pub.Publish(ctx, "notifications", msg)
}

// Function that only needs to make RPC calls
func GetUser(caller weave.Caller, userID string) (*User, error) {
    resp, err := caller.Call(ctx, "users.get", weave.NewTextMessage(userID))
    // ...
}

Configuration

Observability Hooks

Weave emits structured runtime and transport events through optional hooks on Config.

type Config struct {
    Logger    weave.EventLogger
    EventHook weave.EventHook
    Metrics   weave.MetricsHook
    Tracing   weave.TracingHook
}

Standard event names include:

  • weave.EventConnect
  • weave.EventDisconnect
  • weave.EventPublishFailed
  • weave.EventSubscribeFailed
  • weave.EventTimeout

Use Logger when you want a consistent structured logging sink, EventHook for lightweight callbacks, and Metrics to bridge counters/durations into your monitoring system. Use Tracing to start spans for high-level Client and Server operations and bridge them into OpenTelemetry or another tracing backend.

For production wiring examples with slog, Prometheus-style metrics, and OpenTelemetry adapters, see docs/OBSERVABILITY.md.

Transport Capabilities

Each backend provides different guarantees. See Transport Capability Matrix for detailed comparison of:

  • Message ordering and delivery guarantees
  • Dead-letter and error handling
  • Consumer group and partition behavior
  • Connection recovery semantics

Choose your backend based on whether you need AMQP's simplicity and priority support or Kafka's horizontal scaling and topic retention.

AMQP (RabbitMQ)
config := &weave.Config{
    Backend: "amqp",
    AMQP: &weave.AMQPConfig{
        Host:         "localhost",
        Port:         5672,
        Username:     "guest",
        Password:     "guest",
        VHost:        "/",
        Heartbeat:    10 * time.Second,
        Exchange:     "",           // Default exchange
        QueueDurable: true,
    },
    ConnectionRetry: 3,
    RetryDelay:      2 * time.Second,
}
Kafka
config := &weave.Config{
    Backend: "kafka",
    Kafka: &weave.KafkaConfig{
        Brokers:       []string{"localhost:9092"},
        ConsumerGroup: "my-service",
        ClientID:      "my-client",
        RequiredAcks:  1,
        AutoOffsetReset: "latest",
    },
}

Publish Options

// Set timeout
client.Publish(ctx, "queue", msg, weave.WithTimeout(5*time.Second))

// Make message persistent (AMQP)
client.Publish(ctx, "queue", msg, weave.WithPersistent())

// Set partition key (Kafka)
client.Publish(ctx, "topic", msg, weave.WithKey("user-123"))

// Set priority (AMQP)
client.Publish(ctx, "queue", msg, weave.WithPriority(5))

Subscribe Options (Server)

// Auto-acknowledge messages
server.Handle("queue", handler) // Then configure via broker

// Via raw broker:
broker.Subscribe(ctx, "queue", handler, weave.WithAutoAck())
broker.Subscribe(ctx, "queue", handler, weave.WithPrefetchCount(10))
broker.Subscribe(ctx, "topic", handler, weave.WithConsumerGroup("my-group"))
broker.Subscribe(ctx, "topic", handler, weave.WithStartFromBeginning())

Testing

Testing Client Code
import (
    "testing"
    "context"
    
    "github.com/prabhatdotdev/weave/testkit"
    "github.com/prabhatdotdev/weave/runtime"
)

func TestOrderService(t *testing.T) {
    // Create mock broker
    broker := testkit.NewMockBroker()
    broker.Connect(context.Background())
    
    // Create client with mock
    client := runtime.NewClientWithBroker(broker, nil)
    
    // Test your code
    orderService := NewOrderService(client)
    err := orderService.PlaceOrder(ctx, order)
    
    // Verify publish was called
    broker.AssertPublished(t, "orders")
    broker.AssertPublishCount(t, "orders", 1)
}
Testing RPC Calls
func TestUserService(t *testing.T) {
    broker := testkit.NewMockBroker()
    broker.Connect(context.Background())
    
    // Set up mock response
    broker.SetCallResponse("users.get", &core.Message{
        Body: []byte(`{"id": 1, "name": "John"}`),
    })
    
    client := runtime.NewClientWithBroker(broker, nil)
    userService := NewUserService(client)
    
    user, err := userService.GetUser(ctx, "1")
    if err != nil {
        t.Fatal(err)
    }
    if user.Name != "John" {
        t.Errorf("expected John, got %s", user.Name)
    }
}
Testing Server Handlers
func TestOrderHandler(t *testing.T) {
    broker := testkit.NewMockBroker()
    broker.Connect(context.Background())
    
    server := runtime.NewServerWithBroker(broker, nil)
    server.Handle("orders", orderHandler)
    server.Start(context.Background())
    
    // Simulate incoming message
    err := broker.SimulateMessage(ctx, "orders", &core.Message{
        Body: []byte(`{"order_id": 123}`),
    })
    if err != nil {
        t.Fatal(err)
    }
}

Error Handling

import "github.com/prabhatdotdev/weave"

err := client.Publish(ctx, "queue", msg)
if err != nil {
    if weave.IsNotConnected(err) {
        // Handle disconnection - reconnect or retry
    }
    if weave.IsTimeout(err) {
        // Handle timeout - retry with backoff
    }
}

Migration from Service to Server

If you were using runtime.Service, rename to runtime.Server:

// Before (still works, but deprecated)
service, _ := weave.NewService(config)
service.Handle("queue", handler)
service.Start(ctx)

// After (preferred)
server, _ := weave.NewServer(config)
server.Handle("queue", handler)
server.Start(ctx)

Docker Compose

Start local message brokers for development:

# Start RabbitMQ
make rabbitmq-start

# Start Kafka
make kafka-start

Contributing

See CONTRIBUTING.md for guidelines.

License

Apache License 2.0 - see LICENSE for details.

Documentation

Overview

Package weave provides a unified message broker abstraction for Go applications.

Weave supports multiple message queue backends through a pluggable transport architecture. Currently supported backends include:

  • AMQP (RabbitMQ) - github.com/prabhatdotdev/weave/transport/amqp
  • Apache Kafka - github.com/prabhatdotdev/weave/transport/kafka

Quick Start

Import Weave and the desired transport:

import (
    "github.com/prabhatdotdev/weave"
    _ "github.com/prabhatdotdev/weave/transport/amqp"
)

Create a broker and start using it:

config := weave.DefaultConfig()
broker, err := weave.New(config)
if err != nil {
    log.Fatal(err)
}
defer broker.Close()

if err := broker.Connect(ctx); err != nil {
    log.Fatal(err)
}

// Publish a message
msg := weave.NewMessage([]byte("Hello, World!"))
broker.Publish(ctx, "my-queue", msg)

// Subscribe to messages
broker.Subscribe(ctx, "my-queue", func(ctx context.Context, msg *weave.Message) error {
    fmt.Println("Received:", string(msg.Body))
    return nil
})

Package Structure

Weave is organized into several packages:

  • weave (this package) - Re-exports core types for convenience
  • weave/core - Core interfaces, types, and errors
  • weave/transport - Transport implementations (amqp, kafka)
  • weave/runtime - High-level service abstractions
  • weave/codec - Message encoding/decoding utilities
  • weave/testkit - Testing utilities and mocks

Using Different Backends

To use a specific backend, import its transport package:

// For RabbitMQ/AMQP
import _ "github.com/prabhatdotdev/weave/transport/amqp"

// For Apache Kafka
import _ "github.com/prabhatdotdev/weave/transport/kafka"

Then configure the backend:

// AMQP
config := &weave.Config{
    Backend: "amqp",
    AMQP: &weave.AMQPConfig{
        Host: "localhost",
        Port: 5672,
    },
}

// Kafka
config := &weave.Config{
    Backend: "kafka",
    Kafka: &weave.KafkaConfig{
        Brokers: []string{"localhost:9092"},
        ConsumerGroup: "my-group",
    },
}

Payload Formats and Serialization

Weave is transport-agnostic regarding payload encoding. The library provides generic codec helpers for JSON and Protocol Buffers while still treating message bodies as opaque byte slices on the wire.

To use Protocol Buffers or JSON:

// Marshal to bytes before publishing
data, _ := proto.Marshal(&myMessage)
msg := weave.NewMessage(data)
broker.Publish(ctx, "topic", msg)

// Unmarshal bytes after receiving
received, _ := broker.Call(ctx, "topic", msg)
proto.Unmarshal(received.Body, &result)

See docs/PROTOBUF.md for detailed protobuf examples and patterns.

Index

Constants

This section is empty.

Variables

View Source
var (
	// Event level constants.
	EventLevelDebug = core.EventLevelDebug
	EventLevelInfo  = core.EventLevelInfo
	EventLevelWarn  = core.EventLevelWarn
	EventLevelError = core.EventLevelError

	// Structured payload content types.
	ErrorContentType      = core.ErrorContentType
	DeadLetterContentType = core.DeadLetterContentType

	HealthStatusHealthy   = core.HealthStatusHealthy
	HealthStatusDegraded  = core.HealthStatusDegraded
	HealthStatusUnhealthy = core.HealthStatusUnhealthy

	// Standard event names.
	EventConnect         = core.EventConnect
	EventDisconnect      = core.EventDisconnect
	EventPublishFailed   = core.EventPublishFailed
	EventSubscribeFailed = core.EventSubscribeFailed
	EventTimeout         = core.EventTimeout

	// Handler error policy constants.
	HandlerErrorNoRetry = core.HandlerErrorNoRetry
	HandlerErrorRetry   = core.HandlerErrorRetry

	// New creates a new MessageBroker from configuration.
	New = core.New

	// NewWithBackend creates a new MessageBroker with explicit backend name.
	NewWithBackend = core.NewWithBackend

	// MustNew creates a new MessageBroker or panics.
	MustNew = core.MustNew

	// Register registers a broker factory.
	Register = core.Register

	// AvailableBackends returns registered backend names.
	AvailableBackends = core.AvailableBackends

	// IsBackendAvailable checks if a backend is registered.
	IsBackendAvailable = core.IsBackendAvailable

	// NewClient creates a new Client for publishing and RPC calls.
	NewClient = runtime.NewClient

	// NewClientWithBroker creates a new Client with an existing broker.
	NewClientWithBroker = runtime.NewClientWithBroker

	// NewServer creates a new Server for handling incoming messages.
	NewServer = runtime.NewServer

	// NewServerWithBroker creates a new Server with an existing broker.
	NewServerWithBroker = runtime.NewServerWithBroker

	// NewCircuitBreaker creates a circuit breaker for RPC call policies.
	NewCircuitBreaker = runtime.NewCircuitBreaker

	// NewService creates a new Server (deprecated, use NewServer).
	// Deprecated: Use NewServer instead.
	NewService = runtime.NewService

	// NewServiceWithBroker creates a new Server with an existing broker (deprecated).
	// Deprecated: Use NewServerWithBroker instead.
	NewServiceWithBroker = runtime.NewServiceWithBroker
)

Re-export constructor functions.

View Source
var (
	// NewMessage creates a new message with the given body.
	NewMessage = core.NewMessage

	// NewTextMessage creates a new message with a string body.
	NewTextMessage = core.NewTextMessage

	// JSON is the built-in JSON codec.
	JSON = codec.JSON

	// Protobuf is the built-in Protocol Buffers codec.
	Protobuf = codec.Protobuf

	// EncodeJSON serializes a value as JSON.
	EncodeJSON = codec.EncodeJSON

	// DecodeJSON deserializes a JSON payload.
	DecodeJSON = codec.DecodeJSON

	// EncodeProtobuf serializes a Protocol Buffers message.
	EncodeProtobuf = codec.EncodeProtobuf

	// DecodeProtobuf deserializes a Protocol Buffers payload.
	DecodeProtobuf = codec.DecodeProtobuf

	// MarshalMessage encodes a value into a message and sets ContentType.
	MarshalMessage = codec.MarshalMessage

	// UnmarshalMessage decodes a message body with the provided codec.
	UnmarshalMessage = codec.UnmarshalMessage

	// RetryAttempt returns the current in-process retry attempt from context.
	RetryAttempt = core.RetryAttempt

	// NewErrorPayload creates a structured error payload.
	NewErrorPayload = core.NewErrorPayload

	// EncodeErrorPayload serializes a structured error payload.
	EncodeErrorPayload = core.EncodeErrorPayload

	// DecodeErrorPayload deserializes a structured error payload.
	DecodeErrorPayload = core.DecodeErrorPayload

	// NewErrorMessage creates a message containing a structured error payload.
	NewErrorMessage = core.NewErrorMessage

	// DecodeErrorMessage decodes a structured error payload from a message.
	DecodeErrorMessage = core.DecodeErrorMessage

	// NewDeadLetterEnvelope creates a standardized dead-letter envelope.
	NewDeadLetterEnvelope = core.NewDeadLetterEnvelope

	// DecodeDeadLetterMessage decodes a dead-letter envelope from a message.
	DecodeDeadLetterMessage = core.DecodeDeadLetterMessage
)

Re-export message constructors.

View Source
var (
	// DefaultConfig returns default configuration.
	DefaultConfig = core.DefaultConfig

	// DefaultAMQPConfig returns default AMQP configuration.
	DefaultAMQPConfig = core.DefaultAMQPConfig

	// DefaultKafkaConfig returns default Kafka configuration.
	DefaultKafkaConfig = core.DefaultKafkaConfig

	// DefaultRetryPolicy returns a conservative retry policy for handlers.
	DefaultRetryPolicy = core.DefaultRetryPolicy

	// DefaultCallPolicy returns a conservative retry policy for RPC calls.
	DefaultCallPolicy = runtime.DefaultCallPolicy
)

Re-export configuration defaults.

View Source
var (
	// WithAutoAck enables automatic message acknowledgment.
	WithAutoAck = core.WithAutoAck

	// WithExclusive makes the subscription exclusive.
	WithExclusive = core.WithExclusive

	// WithConsumerTag sets the consumer identifier.
	WithConsumerTag = core.WithConsumerTag

	// WithPrefetchCount sets the prefetch limit.
	WithPrefetchCount = core.WithPrefetchCount

	// WithQueueBind configures exchange binding (AMQP).
	WithQueueBind = core.WithQueueBind

	// WithConsumerGroup sets the consumer group.
	WithConsumerGroup = core.WithConsumerGroup

	// WithStartFromBeginning starts from earliest offset.
	WithStartFromBeginning = core.WithStartFromBeginning

	// WithHandlerErrorRetry enables transport-level retry on handler error.
	WithHandlerErrorRetry = core.WithHandlerErrorRetry

	// WithHandlerErrorNoRetry disables transport-level retry on handler error.
	WithHandlerErrorNoRetry = core.WithHandlerErrorNoRetry

	// RetryHandler wraps a handler with bounded in-process retries.
	RetryHandler = core.RetryHandler
)

Re-export subscribe options.

View Source
var (
	// WithTimeout sets the publish timeout.
	WithTimeout = core.WithTimeout

	// WithMandatory requires message routing (AMQP).
	WithMandatory = core.WithMandatory

	// WithExchange specifies target exchange (AMQP).
	WithExchange = core.WithExchange

	// WithPartition specifies target partition (Kafka).
	WithPartition = core.WithPartition

	// WithKey sets the message key (Kafka).
	WithKey = core.WithKey

	// WithPersistent makes the message persistent (AMQP).
	WithPersistent = core.WithPersistent

	// WithPriority sets message priority.
	WithPriority = core.WithPriority

	// WithExpiration sets message TTL.
	WithExpiration = core.WithExpiration

	// FixedBackoff returns a constant backoff strategy.
	FixedBackoff = core.FixedBackoff

	// ExponentialBackoff returns an exponential backoff strategy.
	ExponentialBackoff = core.ExponentialBackoff
)

Re-export publish options.

View Source
var (
	// ErrClosed is returned when operating on a closed broker.
	ErrClosed = core.ErrClosed

	// ErrNoReplyTo is returned when Call() has no reply mechanism.
	ErrNoReplyTo = core.ErrNoReplyTo

	// ErrAlreadyConnected is returned when Connect() is called twice.
	ErrAlreadyConnected = core.ErrAlreadyConnected

	// ErrInvalidConfig is returned for invalid configuration.
	ErrInvalidConfig = core.ErrInvalidConfig
)

Re-export common errors.

View Source
var (
	// IsNotConnected returns true if the error indicates not connected.
	IsNotConnected = core.IsNotConnected

	// IsConnectionLost returns true if the error indicates connection lost.
	IsConnectionLost = core.IsConnectionLost

	// IsTimeout returns true if the error indicates a timeout.
	IsTimeout = core.IsTimeout

	// IsCircuitOpen returns true if the error indicates an open circuit breaker.
	IsCircuitOpen = core.IsCircuitOpen

	// IsUnknownBackend returns true if the error indicates unknown backend.
	IsUnknownBackend = core.IsUnknownBackend

	// IsUnsupportedOperation returns true if operation is not supported.
	IsUnsupportedOperation = core.IsUnsupportedOperation
)

Re-export error checking functions.

Functions

This section is empty.

Types

type AMQPConfig

type AMQPConfig = core.AMQPConfig

AMQPConfig holds AMQP-specific configuration.

type BackoffStrategy

type BackoffStrategy = core.BackoffStrategy

BackoffStrategy calculates retry backoff delays.

type CallPolicy

type CallPolicy = runtime.CallPolicy

CallPolicy controls RPC retries and circuit-breaking behavior.

type CallRetryHook

type CallRetryHook = runtime.CallRetryHook

CallRetryHook observes scheduled RPC retries.

type Caller

type Caller = core.Caller

Caller defines request-reply operations (synchronous RPC).

type CircuitBreaker

type CircuitBreaker = runtime.CircuitBreaker

CircuitBreaker manages RPC circuit-breaking state.

type CircuitBreakerOptions

type CircuitBreakerOptions = runtime.CircuitBreakerOptions

CircuitBreakerOptions configures a circuit breaker.

type CircuitState

type CircuitState = runtime.CircuitState

CircuitState describes the state of a circuit breaker.

type Client

type Client = runtime.Client

Client provides a client-only interface for publishing and RPC calls. Use Client when your application only needs to send messages.

type ClientBroker

type ClientBroker = core.Client

ClientBroker is a subset of MessageBroker for client-only operations. Use when you only need to publish or make RPC calls.

type Codec

type Codec = codec.Codec

Codec encodes and decodes message payloads.

type Config

type Config = core.Config

Config holds broker configuration.

type Connector

type Connector = core.Connector

Connector defines connection lifecycle operations.

type DeadLetterEnvelope

type DeadLetterEnvelope = core.DeadLetterEnvelope

DeadLetterEnvelope is the standardized dead-letter payload.

type DeadLetterMessage

type DeadLetterMessage = core.DeadLetterMessage

DeadLetterMessage captures the original failed message.

type DeadLetterOptions

type DeadLetterOptions = core.DeadLetterOptions

DeadLetterOptions annotates a dead-letter envelope.

type ErrorPayload

type ErrorPayload = core.ErrorPayload

ErrorPayload is the standardized structured error body.

type Event

type Event = core.Event

Event describes a structured observability signal.

type EventHook

type EventHook = core.EventHook

EventHook receives structured events as a callback.

type EventLevel

type EventLevel = core.EventLevel

EventLevel indicates event severity.

type EventLogger

type EventLogger = core.EventLogger

EventLogger receives structured events for logging.

type Handler

type Handler = core.Handler

Handler is a function that processes incoming messages.

type HandlerErrorPolicy

type HandlerErrorPolicy = core.HandlerErrorPolicy

HandlerErrorPolicy controls retry behavior after handler failures.

type HealthHook

type HealthHook = core.HealthHook

HealthHook receives health snapshots as a callback.

type HealthReport

type HealthReport = core.HealthReport

HealthReport captures a runtime health snapshot.

type HealthReporter

type HealthReporter = core.HealthReporter

HealthReporter receives health snapshots for reporting sinks.

type HealthStatus

type HealthStatus = core.HealthStatus

HealthStatus represents coarse-grained component health.

type KafkaConfig

type KafkaConfig = core.KafkaConfig

KafkaConfig holds Kafka-specific configuration.

type Message

type Message = core.Message

Message represents a message to be sent or received.

type MessageBroker

type MessageBroker = core.MessageBroker

MessageBroker is the main interface for message broker operations. It combines all broker capabilities: connection, publishing, subscribing, and RPC.

type MetricsHook

type MetricsHook = core.MetricsHook

MetricsHook receives counters and durations as extension points.

type PublishOption

type PublishOption = core.PublishOption

PublishOption is a functional option for publishing.

type Publisher

type Publisher = core.Publisher

Publisher defines message publishing operations (fire-and-forget).

type RetryHook

type RetryHook = core.RetryHook

RetryHook observes scheduled retries.

type RetryPolicy

type RetryPolicy = core.RetryPolicy

RetryPolicy controls bounded in-process retry behavior.

type RetryPredicate

type RetryPredicate = core.RetryPredicate

RetryPredicate decides whether another retry should happen.

type Server

type Server = runtime.Server

Server provides a server interface for handling incoming messages. Use Server when building message-driven services.

type ServerBroker

type ServerBroker = core.Server

ServerBroker is a subset of MessageBroker for server-only operations. Use when you only need to subscribe to messages.

type Service

type Service = runtime.Service

Service is an alias for Server for backward compatibility. Deprecated: Use Server instead.

type SubscribeOption

type SubscribeOption = core.SubscribeOption

SubscribeOption is a functional option for subscriptions.

type Subscriber

type Subscriber = core.Subscriber

Subscriber defines message subscription operations (server-side).

type TLSConfig

type TLSConfig = core.TLSConfig

TLSConfig holds TLS configuration.

type TraceSpan

type TraceSpan = core.TraceSpan

TraceSpan represents an in-flight trace span.

type TraceSpanFinish

type TraceSpanFinish = core.TraceSpanFinish

TraceSpanFinish describes how a span completed.

type TraceSpanStart

type TraceSpanStart = core.TraceSpanStart

TraceSpanStart describes a span that is about to begin.

type TracingHook

type TracingHook = core.TracingHook

TracingHook receives span lifecycle callbacks as an extension point.

Directories

Path Synopsis
Package codec provides encoding and decoding utilities for message payloads.
Package codec provides encoding and decoding utilities for message payloads.
Package core provides the fundamental interfaces, types, and errors for Weave.
Package core provides the fundamental interfaces, types, and errors for Weave.
examples
json/client command
protobuf/client command
Package runtime provides higher-level service abstractions for Weave.
Package runtime provides higher-level service abstractions for Weave.
Package testkit provides testing utilities for Weave.
Package testkit provides testing utilities for Weave.
transport
amqp
Package amqp provides an AMQP/RabbitMQ transport implementation for Weave.
Package amqp provides an AMQP/RabbitMQ transport implementation for Weave.
kafka
Package kafka provides an Apache Kafka transport implementation for Weave.
Package kafka provides an Apache Kafka transport implementation for Weave.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL