xkafka

package module
v0.0.0-...-8c522c3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: Apache-2.0 Imports: 21 Imported by: 1

README

xkafka

Sarama-based Kafka extension library with three facades:

  • GroupConsumer (ConsumerGroup mode)
  • PartitionConsumer (single topic+partition mode with checkpoint store)
  • Producer (sync + batch + async)

It provides middleware pipelines (logger -> retry -> user -> transport), sharded same-key ordering, and at-least-once semantics.

Installation

go get github.com/codesjoy/pkg/basic/xkafka

Examples

Runnable examples live in examples/.

From that directory:

go run ./group
go run ./partition
go run ./producer
go run ./trace

Supported environment variables:

  • XKAFKA_BROKERS (default 127.0.0.1:9092)
  • XKAFKA_TOPIC (default xkafka-example)
  • XKAFKA_GROUP_ID (default xkafka-example-group)
  • XKAFKA_PARTITION (default 0)
  • XKAFKA_TIMEOUT (default 30s)

Layered Architecture

xkafka/
├── group_consumer.go
├── partition_consumer.go
├── producer.go
├── group_consumer_config.go
├── partition_consumer_config.go
├── producer_config.go
├── types.go
├── errors.go
├── middleware/
│   ├── consume/
│   │   ├── chain.go
│   │   ├── logger/
│   │   ├── retry/
│   │   └── trace/
│   └── produce/
│       ├── chain.go
│       ├── logger/
│       ├── retry/
│       └── trace/
└── internal/
    ├── primitives/
    ├── runtime/
    │   ├── group/
    │   ├── partition/
    │   └── producer/
    ├── transport/sarama/
    └── store/

Breaking Changes

  • Removed all Option APIs (WithXxx, Option, PartitionOption, ProducerOption).
  • New explicit constructors:
    • NewGroupConsumer(GroupConsumerConfig)
    • NewPartitionConsumer(PartitionConsumerConfig)
    • NewProducer(ProducerConfig)
  • Middleware imports moved to:
    • consume: github.com/codesjoy/pkg/basic/xkafka/middleware/consume
    • produce: github.com/codesjoy/pkg/basic/xkafka/middleware/produce

Group Consumer Example

package main

import (
	"context"
	"log/slog"

	"github.com/codesjoy/pkg/basic/xkafka"
	"github.com/codesjoy/pkg/basic/xkafka/middleware/consume"
)

func main() {
	consumer, err := xkafka.NewGroupConsumer(xkafka.GroupConsumerConfig{
		Brokers: []string{"127.0.0.1:9092"},
		GroupID: "demo-group",
		Topics:  []string{"orders"},
		Logger:  slog.Default(),
	})
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	err = consumer.Consume(context.Background(), func(ctx context.Context, msg *consume.MessageContext) error {
		return nil
	})
	if err != nil {
		panic(err)
	}
}

Partition Consumer Example

package main

import (
	"context"
	"log/slog"

	"github.com/IBM/sarama"
	"github.com/codesjoy/pkg/basic/xkafka"
	"github.com/codesjoy/pkg/basic/xkafka/middleware/consume"
)

func main() {
	store := xkafka.NewMemoryOffsetStore()
	consumer, err := xkafka.NewPartitionConsumer(xkafka.PartitionConsumerConfig{
		Brokers:       []string{"127.0.0.1:9092"},
		Topic:         "orders",
		Partition:     0,
		OffsetStore:   store,
		InitialOffset: sarama.OffsetOldest,
		Logger:        slog.Default(),
	})
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	err = consumer.Consume(context.Background(), func(ctx context.Context, msg *consume.MessageContext) error {
		return nil
	})
	if err != nil {
		panic(err)
	}
}

Producer Example

package main

import (
	"context"
	"time"

	"github.com/codesjoy/pkg/basic/xkafka"
	"github.com/codesjoy/pkg/basic/xkafka/middleware/produce"
)

func main() {
	producer, err := xkafka.NewProducer(xkafka.ProducerConfig{
		Brokers:      []string{"127.0.0.1:9092"},
		DefaultTopic: "orders",
		Dispatch: xkafka.ProducerDispatchConfig{
			Mode:       xkafka.ProducerDispatchModeKeySharded,
			ShardCount: 32,
			QueueSize:  1024,
		},
	})
	if err != nil {
		panic(err)
	}
	defer producer.Close()

	_, _ = producer.Produce(context.Background(), &produce.Message{Value: []byte("one")})
	_, _ = producer.ProduceBatch(context.Background(),
		&produce.Message{Value: []byte("a")},
		&produce.Message{Value: []byte("b")},
	)

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()
	future, _ := producer.ProduceAsync(ctx, &produce.Message{Value: []byte("async")})
	_, _ = future.Await(context.Background())
}

Default Pipelines

  • Consume: consume/logger -> consume/retry -> user handlers -> business
  • Produce: produce/logger -> produce/retry -> user handlers -> transport send

Retry middleware wraps downstream handlers, so user handlers and business logic may run multiple times. Keep handlers idempotent.

OpenTelemetry Trace Middleware

Trace middleware is opt-in plugin middleware and is not injected by default.

  • Consume middleware: github.com/codesjoy/pkg/basic/xkafka/middleware/consume/trace
  • Produce middleware: github.com/codesjoy/pkg/basic/xkafka/middleware/produce/trace

Consume extracts parent context from Kafka headers (traceparent/tracestate/...), produce injects current context into Kafka headers.

Example (consume):

import (
	"github.com/codesjoy/pkg/basic/xkafka/middleware/consume"
	ctrace "github.com/codesjoy/pkg/basic/xkafka/middleware/consume/trace"
)

consumer, err := xkafka.NewGroupConsumer(xkafka.GroupConsumerConfig{
	Brokers: []string{"127.0.0.1:9092"},
	GroupID: "demo-group",
	Topics:  []string{"orders"},
	GlobalHandlers: []consume.Handler{
		ctrace.New(ctrace.Config{}),
	},
})

Example (produce):

import (
	"github.com/codesjoy/pkg/basic/xkafka/middleware/produce"
	ptrace "github.com/codesjoy/pkg/basic/xkafka/middleware/produce/trace"
)

producer, err := xkafka.NewProducer(xkafka.ProducerConfig{
	Brokers: []string{"127.0.0.1:9092"},
	DefaultTopic: "orders",
	GlobalHandlers: []produce.Handler{
		ptrace.New(ptrace.Config{}),
	},
})

Trace span granularity is per-attempt. Since trace middleware is user middleware inside retry middleware, each retry attempt creates a new span.

Integration Tests (Docker)

The integration suite is Docker-backed and uses Testcontainers to start Kafka.

Run from the module root:

go test -tags=integration ./testing/integration -v

The default go test ./... and make MODULES="basic/xkafka" test remain unit-test focused.

Semantics

  • At-least-once delivery (duplicates are possible).
  • Group mode commits only contiguous completed offsets.
  • Partition mode persists contiguous nextOffset checkpoints.
  • Async producer modes:
    • serial: global order
    • key_sharded: same-key order
    • parallel: no ordering guarantee

Documentation

Overview

Package xkafka provides Sarama-based Group/Partition consumers and producer helpers with middleware pipelines, slog logging, sharded key ordering, and at-least-once semantics.

Index

Constants

View Source
const (
	// DefaultShardCount is the default count of key-hash shards.
	DefaultShardCount = 32
	// DefaultShardQueueSize is the default queue size of one shard worker.
	DefaultShardQueueSize = 1024

	// DefaultRetryInitialBackoff is the first retry wait duration.
	DefaultRetryInitialBackoff = cconsume.DefaultInitialBackoff
	// DefaultRetryMaxBackoff is the max retry wait duration.
	DefaultRetryMaxBackoff = cconsume.DefaultMaxBackoff
	// DefaultRetryMultiplier is the exponential retry multiplier.
	DefaultRetryMultiplier = cconsume.DefaultMultiplier
	// InfiniteRetries means retry forever.
	InfiniteRetries = pretry.InfiniteRetries

	// DefaultPartitionReconnectInitialBackoff is the first reconnect wait duration.
	DefaultPartitionReconnectInitialBackoff = 200 * time.Millisecond
	// DefaultPartitionReconnectMaxBackoff is the max reconnect wait duration.
	DefaultPartitionReconnectMaxBackoff = 5 * time.Second
	// DefaultPartitionReconnectMultiplier is reconnect exponential multiplier.
	DefaultPartitionReconnectMultiplier = 2.0

	// DefaultProducerWorkerCount is default worker count for parallel dispatch.
	DefaultProducerWorkerCount = 4
)
View Source
const (
	// ExhaustedPolicyBlock keeps retrying and blocks the shard.
	ExhaustedPolicyBlock = cconsume.ExhaustedPolicyBlock
	// ExhaustedPolicyDLQCommit publishes to DLQ then marks offset as done.
	ExhaustedPolicyDLQCommit = cconsume.ExhaustedPolicyDLQCommit
	// ExhaustedPolicyStop stops consumption and returns an error.
	ExhaustedPolicyStop = cconsume.ExhaustedPolicyStop
)
View Source
const (
	// FailureStageRetry means a normal retry failure.
	FailureStageRetry = cconsume.FailureStageRetry
	// FailureStageExhausted means finite retries are exhausted.
	FailureStageExhausted = cconsume.FailureStageExhausted
	// FailureStageDLQ means message is being or has been handled by DLQ flow.
	FailureStageDLQ = cconsume.FailureStageDLQ
	// FailureStageStop means consumer stops due to policy.
	FailureStageStop = cconsume.FailureStageStop
)
View Source
const (
	// ProducerExhaustedPolicyBlock keeps retrying and blocks the pipeline.
	ProducerExhaustedPolicyBlock = pproduce.ExhaustedPolicyBlock
	// ProducerExhaustedPolicyStop returns error and stops current call.
	ProducerExhaustedPolicyStop = pproduce.ExhaustedPolicyStop
	// ProducerExhaustedPolicyDrop drops message and returns dropped error.
	ProducerExhaustedPolicyDrop = pproduce.ExhaustedPolicyDrop
)
View Source
const (
	// ProducerFailureStageRetry means a normal retry failure.
	ProducerFailureStageRetry = pproduce.FailureStageRetry
	// ProducerFailureStageExhausted means finite retries are exhausted.
	ProducerFailureStageExhausted = pproduce.FailureStageExhausted
	// ProducerFailureStageStop means call stops due to policy.
	ProducerFailureStageStop = pproduce.FailureStageStop
	// ProducerFailureStageDrop means message is dropped due to policy.
	ProducerFailureStageDrop = pproduce.FailureStageDrop
)

Variables

View Source
var (
	// ErrProducerClosed indicates producer is already closed.
	ErrProducerClosed = errors.New("producer is closed")
	// ErrNilProducerMessage indicates produce message is nil.
	ErrNilProducerMessage = errors.New("producer message is nil")
	// ErrProducerTopicRequired indicates topic cannot be resolved.
	ErrProducerTopicRequired = errors.New("producer topic is required")
	// ErrProducerDropped indicates retry policy dropped one message.
	ErrProducerDropped = pretry.ErrMessageDropped
)

Functions

This section is empty.

Types

type BackoffConfig

type BackoffConfig struct {
	// InitialBackoff 是首次重连等待时长。
	InitialBackoff time.Duration
	// MaxBackoff 是重连等待的最大时长上限。
	MaxBackoff time.Duration
	// Multiplier 是指数退避的乘数因子。
	Multiplier float64
}

BackoffConfig controls partition reconnect backoff strategy. 控制分区消费者的重连退避策略。

type ChainMode

type ChainMode string

ChainMode controls how topic handlers are combined with global handlers.

const (
	// ChainModeAppend appends topic handlers after global handlers.
	ChainModeAppend ChainMode = "append"
	// ChainModeReplace replaces global handlers with topic handlers.
	ChainModeReplace ChainMode = "replace"
)

type ConsumeTopicHandlers

type ConsumeTopicHandlers struct {
	// Mode 决定 topic 处理器与全局处理器的组合模式(追加或替换)。
	Mode ChainMode
	// Handlers 是该 topic 专属的中间件处理器列表。
	Handlers []consume.Handler
}

ConsumeTopicHandlers describes topic-specific consume middleware composition. 描述特定 topic 的消费者中间件组合方式。

type DLQConfig

type DLQConfig struct {
	// Topic is the destination topic for dead-letter messages.
	Topic string
	// Producer is optional. If nil, xkafka creates and owns a SyncProducer.
	Producer sarama.SyncProducer
}

DLQConfig configures dead-letter publishing when retries are exhausted.

type ExhaustedPolicy

type ExhaustedPolicy = cconsume.ExhaustedPolicy

ExhaustedPolicy controls action when finite retries are exhausted.

type FailureEvent

type FailureEvent = cconsume.Event

FailureEvent is emitted to FailureHook.

type FailureHook

type FailureHook = cconsume.FailureHook

FailureHook is called on retry/exhausted failure events.

type FailureStage

type FailureStage = cconsume.FailureStage

FailureStage marks current failure lifecycle stage.

type GroupConsumer

type GroupConsumer struct {
	// contains filtered or unexported fields
}

GroupConsumer wraps a Sarama consumer group with ordered shard processing. GroupConsumer 封装 Sarama 消费者组,提供有序分片处理能力。

func NewGroupConsumer

func NewGroupConsumer(cfg GroupConsumerConfig) (*GroupConsumer, error)

NewGroupConsumer creates a configured Sarama consumer-group wrapper. 根据配置创建 GroupConsumer 实例,包括验证配置、创建消费者组、创建 DLQ 写入器。

func (*GroupConsumer) Close

func (c *GroupConsumer) Close() error

Close releases consumer-group and owned DLQ producer. 依次关闭消费者组和 DLQ 写入器,使用 sync.Once 保证只关闭一次。

func (*GroupConsumer) Consume

func (c *GroupConsumer) Consume(ctx context.Context, business consume.HandlerFunc) error

Consume starts consuming in a rebalance-safe loop. 在 rebalance 安全的循环中消费消息,每次 rebalance 创建新的 Handler。

type GroupConsumerConfig

type GroupConsumerConfig struct {
	// Brokers 是 Kafka 集群地址列表。
	Brokers []string
	// GroupID 是消费者组 ID。
	GroupID string
	// Topics 是订阅的 topic 列表。
	Topics []string

	// SaramaConfig 是底层 Sarama 配置,nil 时使用默认值。
	SaramaConfig *sarama.Config

	// ShardCount 是分片数量,用于按键有序处理。
	ShardCount int
	// ShardQueueSize 是每个分片队列的缓冲区大小。
	ShardQueueSize int

	// GlobalHandlers 是所有 topic 共享的中间件处理器。
	GlobalHandlers []consume.Handler
	// TopicHandlers 是按 topic 名字索引的专属中间件配置。
	TopicHandlers map[string]ConsumeTopicHandlers

	// KeyExtractor 从消息中提取用于分片路由的逻辑键。
	KeyExtractor KeyExtractor

	// Logger 是结构化日志记录器。
	Logger *slog.Logger
	// LoggerHandlerEnabled 控制是否启用日志中间件,nil 表示启用。
	LoggerHandlerEnabled *bool

	// RetryConfig 控制重试行为。
	RetryConfig RetryConfig
	// ExhaustedPolicy 控制有限重试耗尽后的策略。
	ExhaustedPolicy ExhaustedPolicy
	// DLQ 是死信队列配置,仅在耗尽策略为 DLQ 时必需。
	DLQ *DLQConfig
	// FailureHook 是失败事件回调函数。
	FailureHook FailureHook
}

GroupConsumerConfig configures GroupConsumer. 消费者组的完整配置。

func (*GroupConsumerConfig) Validate

func (cfg *GroupConsumerConfig) Validate() error

Validate normalizes and validates group consumer config. 规范化并验证消费者组配置,依次执行:补默认值、规范化输入、校验必填字段、 确保依赖、校验耗尽策略、校验重试配置、校验 DLQ、校验 topic 处理器。

type KeyExtractor

type KeyExtractor func(*sarama.ConsumerMessage) (string, error)

KeyExtractor derives logical key for shard routing.

type MemoryOffsetStore

type MemoryOffsetStore = xstore.MemoryOffsetStore

MemoryOffsetStore keeps offsets in-process only.

func NewMemoryOffsetStore

func NewMemoryOffsetStore() *MemoryOffsetStore

NewMemoryOffsetStore creates default in-memory offset storage.

type OffsetStore

type OffsetStore interface {
	Load(
		ctx context.Context,
		topic string,
		partition int32,
	) (nextOffset int64, found bool, err error)
	Save(ctx context.Context, topic string, partition int32, nextOffset int64) error
}

OffsetStore persists per-partition next offsets for partition mode.

type PartitionConsumer

type PartitionConsumer struct {
	// contains filtered or unexported fields
}

PartitionConsumer wraps one Sarama partition consumer with ordered shard processing. PartitionConsumer 封装 Sarama 分区消费者,提供有序分片处理和自动重连能力。

func NewPartitionConsumer

func NewPartitionConsumer(cfg PartitionConsumerConfig) (*PartitionConsumer, error)

NewPartitionConsumer creates a configured partition-mode Kafka consumer. 根据配置创建 PartitionConsumer 实例,包括验证配置、创建消费者、创建 DLQ 写入器。

func (*PartitionConsumer) Close

func (c *PartitionConsumer) Close() error

Close releases partition consumer and owned DLQ producer. 依次关闭分区消费者和 DLQ 写入器,使用 sync.Once 保证只关闭一次。

func (*PartitionConsumer) Consume

func (c *PartitionConsumer) Consume(ctx context.Context, business consume.HandlerFunc) error

Consume starts partition-mode consume loop with auto reconnect. 启动分区消费循环,支持自动重连。委托 Runner 处理实际消费逻辑。

func (*PartitionConsumer) String

func (c *PartitionConsumer) String() string

String 返回分区消费者的可读标识,格式为 "partition-consumer(topic:partition)"。

type PartitionConsumerConfig

type PartitionConsumerConfig struct {
	// Brokers 是 Kafka 集群地址列表。
	Brokers []string
	// Topic 是消费的目标 topic。
	Topic string
	// Partition 是消费的目标分区号。
	Partition int32

	// SaramaConfig 是底层 Sarama 配置,nil 时使用默认值。
	SaramaConfig *sarama.Config

	// ShardCount 是分片数量,用于按键有序处理。
	ShardCount int
	// ShardQueueSize 是每个分片队列的缓冲区大小。
	ShardQueueSize int

	// GlobalHandlers 是所有消息共享的中间件处理器。
	GlobalHandlers []consume.Handler

	// KeyExtractor 从消息中提取用于分片路由的逻辑键。
	KeyExtractor KeyExtractor

	// Logger 是结构化日志记录器。
	Logger *slog.Logger
	// LoggerHandlerEnabled 控制是否启用日志中间件,nil 表示启用。
	LoggerHandlerEnabled *bool

	// RetryConfig 控制重试行为。
	RetryConfig RetryConfig
	// ExhaustedPolicy 控制有限重试耗尽后的策略。
	ExhaustedPolicy ExhaustedPolicy
	// DLQ 是死信队列配置。
	DLQ *DLQConfig
	// FailureHook 是失败事件回调函数。
	FailureHook FailureHook

	// OffsetStore 是分区 offset 持久化存储。
	OffsetStore OffsetStore
	// InitialOffset 是首次消费时的起始 offset。
	InitialOffset int64
	// Reconnect 控制重连退避策略。
	Reconnect BackoffConfig
}

PartitionConsumerConfig configures PartitionConsumer. 分区消费者的完整配置。

func (*PartitionConsumerConfig) Validate

func (cfg *PartitionConsumerConfig) Validate() error

Validate normalizes and validates partition consumer config. 规范化并验证分区消费者配置,依次执行:补默认值、规范化输入、校验必填字段、 确保依赖、校验耗尽策略、校验重试配置、校验 DLQ、校验重连配置。

type ProduceTopicHandlers

type ProduceTopicHandlers struct {
	// Mode 决定 topic 处理器与全局处理器的组合模式(追加或替换)。
	Mode ChainMode
	// Handlers 是该 topic 专属的中间件处理器列表。
	Handlers []produce.Handler
}

ProduceTopicHandlers describes topic-specific producer middleware composition. 描述特定 topic 的生产者中间件组合方式。

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer wraps one Sarama SyncProducer with sync/batch/async capabilities. Producer 封装了 Sarama SyncProducer,提供同步、批量、异步发送能力。

func NewProducer

func NewProducer(cfg ProducerConfig) (*Producer, error)

NewProducer creates a configured producer wrapper. 根据配置创建 Producer 实例,包括验证配置、创建发送器、初始化异步运行时。

func (*Producer) Close

func (p *Producer) Close() error

Close stops runtime and closes owned producer. 关闭异步运行时和发送器,使用 sync.Once 保证只关闭一次。

func (*Producer) Produce

func (p *Producer) Produce(ctx context.Context, msg *produce.Message) (*produce.Result, error)

Produce sends one message synchronously. 同步发送一条消息,会经过中间件链处理。

func (*Producer) ProduceAsync

func (p *Producer) ProduceAsync(ctx context.Context, msg *produce.Message) (produce.Future, error)

ProduceAsync queues one message into async runtime. 异步发送一条消息,消息进入运行时队列后立即返回 Future。

func (*Producer) ProduceBatch

func (p *Producer) ProduceBatch(
	ctx context.Context,
	msgs ...*produce.Message,
) ([]*produce.Result, error)

ProduceBatch sends messages sequentially and fails fast on first error. It is kept for compatibility and is not suitable for per-item acknowledgement flows such as xevent outbox relays. 按顺序逐条发送消息,遇到第一条错误立即返回(快速失败)。

func (*Producer) ProduceBatchReport

func (p *Producer) ProduceBatchReport(
	ctx context.Context,
	msgs ...*produce.Message,
) ([]produce.BatchItemResult, error)

ProduceBatchReport sends messages and returns a per-item outcome vector. A top-level error is returned only for call-level failures such as a nil producer or a context that is already canceled before the call starts. 逐条发送消息并返回每条的独立结果,仅当调用级别出错时返回顶层错误。

type ProducerConfig

type ProducerConfig struct {
	// Brokers 是 Kafka 集群地址列表。
	Brokers []string
	// DefaultTopic 是消息未指定 topic 时的默认目标 topic。
	DefaultTopic string

	// SaramaConfig 是底层 Sarama 配置,nil 时使用默认值。
	SaramaConfig *sarama.Config
	// SyncProducer 是外部传入的同步生产者,nil 时自动创建。
	SyncProducer sarama.SyncProducer

	// Dispatch 控制异步消息分发的路由策略。
	Dispatch ProducerDispatchConfig

	// GlobalHandlers 是所有 topic 共享的中间件处理器。
	GlobalHandlers []produce.Handler
	// TopicHandlers 是按 topic 名字索引的专属中间件配置。
	TopicHandlers map[string]ProduceTopicHandlers

	// Logger 是结构化日志记录器。
	Logger *slog.Logger
	// LoggerHandlerEnabled 控制是否启用日志中间件,nil 表示启用。
	LoggerHandlerEnabled *bool

	// RetryConfig 控制重试行为。
	RetryConfig RetryConfig
	// ExhaustedPolicy 控制有限重试耗尽后的策略。
	ExhaustedPolicy ProducerExhaustedPolicy
	// FailureHook 是失败事件回调函数。
	FailureHook ProducerFailureHook
}

ProducerConfig configures Producer. Producer 的完整配置,涵盖 broker、发送器、分发、中间件、重试等。

func (*ProducerConfig) Validate

func (cfg *ProducerConfig) Validate() error

Validate normalizes and validates producer config. 规范化并验证生产者配置,依次执行:补默认值、规范化输入、校验必填字段、 校验分发配置、确保依赖、校验重试配置、校验耗尽策略、校验 topic 处理器。

type ProducerDispatchConfig

type ProducerDispatchConfig struct {
	// Mode 指定消息分发模式:串行、按键分片或并行轮询。
	Mode ProducerDispatchMode
	// ShardCount 是按键分片模式下的分片数量。
	ShardCount int
	// WorkerCount 是并行轮询模式下的工作协程数量。
	WorkerCount int
	// QueueSize 是每个工作队列的缓冲区大小。
	QueueSize int
}

ProducerDispatchConfig controls async runtime queueing and routing. 控制异步生产者运行时的队列路由策略。

type ProducerDispatchMode

type ProducerDispatchMode string

ProducerDispatchMode controls async dispatch behavior.

const (
	// ProducerDispatchModeSerial routes all messages to one worker.
	ProducerDispatchModeSerial ProducerDispatchMode = "serial"
	// ProducerDispatchModeKeySharded routes by key hash modulo shard count.
	ProducerDispatchModeKeySharded ProducerDispatchMode = "key_sharded"
	// ProducerDispatchModeParallel routes by round-robin across workers.
	ProducerDispatchModeParallel ProducerDispatchMode = "parallel"

	// DefaultProducerDispatchMode is default async dispatch mode.
	DefaultProducerDispatchMode = ProducerDispatchModeKeySharded
)

type ProducerExhaustedPolicy

type ProducerExhaustedPolicy = pproduce.ExhaustedPolicy

ProducerExhaustedPolicy controls action when finite retries are exhausted.

type ProducerFailureEvent

type ProducerFailureEvent = pproduce.Event

ProducerFailureEvent is emitted to ProducerFailureHook.

type ProducerFailureHook

type ProducerFailureHook = pproduce.FailureHook

ProducerFailureHook is called on producer retry/exhausted failure events.

type ProducerFailureStage

type ProducerFailureStage = pproduce.FailureStage

ProducerFailureStage marks producer failure lifecycle stage.

type RetryConfig

type RetryConfig = pretry.Config

RetryConfig controls message retry behavior.

Directories

Path Synopsis
internal
middleware
consume
Package consume provides consumer-side middleware types and composition helpers.
Package consume provides consumer-side middleware types and composition helpers.
consume/trace
Package trace provides consume-side OpenTelemetry tracing middleware.
Package trace provides consume-side OpenTelemetry tracing middleware.
produce
Package produce provides producer-side middleware types and composition helpers.
Package produce provides producer-side middleware types and composition helpers.
produce/trace
Package trace provides produce-side OpenTelemetry tracing middleware.
Package trace provides produce-side OpenTelemetry tracing middleware.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL