Documentation
¶
Overview ¶
Package xkafka provides Sarama-based Group/Partition consumers and producer helpers with middleware pipelines, slog logging, sharded key ordering, and at-least-once semantics.
Index ¶
- Constants
- Variables
- type BackoffConfig
- type ChainMode
- type ConsumeTopicHandlers
- type DLQConfig
- type ExhaustedPolicy
- type FailureEvent
- type FailureHook
- type FailureStage
- type GroupConsumer
- type GroupConsumerConfig
- type KeyExtractor
- type MemoryOffsetStore
- type OffsetStore
- type PartitionConsumer
- type PartitionConsumerConfig
- type ProduceTopicHandlers
- type Producer
- func (p *Producer) Close() error
- func (p *Producer) Produce(ctx context.Context, msg *produce.Message) (*produce.Result, error)
- func (p *Producer) ProduceAsync(ctx context.Context, msg *produce.Message) (produce.Future, error)
- func (p *Producer) ProduceBatch(ctx context.Context, msgs ...*produce.Message) ([]*produce.Result, error)
- func (p *Producer) ProduceBatchReport(ctx context.Context, msgs ...*produce.Message) ([]produce.BatchItemResult, error)
- type ProducerConfig
- type ProducerDispatchConfig
- type ProducerDispatchMode
- type ProducerExhaustedPolicy
- type ProducerFailureEvent
- type ProducerFailureHook
- type ProducerFailureStage
- type RetryConfig
Constants ¶
const ( // DefaultShardCount is the default count of key-hash shards. DefaultShardCount = 32 // DefaultShardQueueSize is the default queue size of one shard worker. DefaultShardQueueSize = 1024 // DefaultRetryInitialBackoff is the first retry wait duration. DefaultRetryInitialBackoff = cconsume.DefaultInitialBackoff // DefaultRetryMaxBackoff is the max retry wait duration. DefaultRetryMaxBackoff = cconsume.DefaultMaxBackoff // DefaultRetryMultiplier is the exponential retry multiplier. DefaultRetryMultiplier = cconsume.DefaultMultiplier // InfiniteRetries means retry forever. InfiniteRetries = pretry.InfiniteRetries // DefaultPartitionReconnectInitialBackoff is the first reconnect wait duration. DefaultPartitionReconnectInitialBackoff = 200 * time.Millisecond // DefaultPartitionReconnectMaxBackoff is the max reconnect wait duration. DefaultPartitionReconnectMaxBackoff = 5 * time.Second // DefaultPartitionReconnectMultiplier is reconnect exponential multiplier. DefaultPartitionReconnectMultiplier = 2.0 // DefaultProducerWorkerCount is default worker count for parallel dispatch. DefaultProducerWorkerCount = 4 )
const ( // ExhaustedPolicyBlock keeps retrying and blocks the shard. ExhaustedPolicyBlock = cconsume.ExhaustedPolicyBlock // ExhaustedPolicyDLQCommit publishes to DLQ then marks offset as done. ExhaustedPolicyDLQCommit = cconsume.ExhaustedPolicyDLQCommit // ExhaustedPolicyStop stops consumption and returns an error. ExhaustedPolicyStop = cconsume.ExhaustedPolicyStop )
const ( // FailureStageRetry means a normal retry failure. FailureStageRetry = cconsume.FailureStageRetry // FailureStageExhausted means finite retries are exhausted. FailureStageExhausted = cconsume.FailureStageExhausted // FailureStageDLQ means message is being or has been handled by DLQ flow. FailureStageDLQ = cconsume.FailureStageDLQ // FailureStageStop means consumer stops due to policy. FailureStageStop = cconsume.FailureStageStop )
const ( // ProducerExhaustedPolicyBlock keeps retrying and blocks the pipeline. ProducerExhaustedPolicyBlock = pproduce.ExhaustedPolicyBlock // ProducerExhaustedPolicyStop returns error and stops current call. ProducerExhaustedPolicyStop = pproduce.ExhaustedPolicyStop // ProducerExhaustedPolicyDrop drops message and returns dropped error. ProducerExhaustedPolicyDrop = pproduce.ExhaustedPolicyDrop )
const ( // ProducerFailureStageRetry means a normal retry failure. ProducerFailureStageRetry = pproduce.FailureStageRetry // ProducerFailureStageExhausted means finite retries are exhausted. ProducerFailureStageExhausted = pproduce.FailureStageExhausted // ProducerFailureStageStop means call stops due to policy. ProducerFailureStageStop = pproduce.FailureStageStop // ProducerFailureStageDrop means message is dropped due to policy. ProducerFailureStageDrop = pproduce.FailureStageDrop )
Variables ¶
var ( // ErrProducerClosed indicates producer is already closed. ErrProducerClosed = errors.New("producer is closed") // ErrNilProducerMessage indicates produce message is nil. ErrNilProducerMessage = errors.New("producer message is nil") // ErrProducerTopicRequired indicates topic cannot be resolved. ErrProducerTopicRequired = errors.New("producer topic is required") // ErrProducerDropped indicates retry policy dropped one message. ErrProducerDropped = pretry.ErrMessageDropped )
Functions ¶
This section is empty.
Types ¶
type BackoffConfig ¶
type BackoffConfig struct {
// InitialBackoff 是首次重连等待时长。
InitialBackoff time.Duration
// MaxBackoff 是重连等待的最大时长上限。
MaxBackoff time.Duration
// Multiplier 是指数退避的乘数因子。
Multiplier float64
}
BackoffConfig controls partition reconnect backoff strategy. 控制分区消费者的重连退避策略。
type ChainMode ¶
type ChainMode string
ChainMode controls how topic handlers are combined with global handlers.
type ConsumeTopicHandlers ¶
type ConsumeTopicHandlers struct {
// Mode 决定 topic 处理器与全局处理器的组合模式(追加或替换)。
Mode ChainMode
// Handlers 是该 topic 专属的中间件处理器列表。
Handlers []consume.Handler
}
ConsumeTopicHandlers describes topic-specific consume middleware composition. 描述特定 topic 的消费者中间件组合方式。
type DLQConfig ¶
type DLQConfig struct {
// Topic is the destination topic for dead-letter messages.
Topic string
// Producer is optional. If nil, xkafka creates and owns a SyncProducer.
Producer sarama.SyncProducer
}
DLQConfig configures dead-letter publishing when retries are exhausted.
type ExhaustedPolicy ¶
type ExhaustedPolicy = cconsume.ExhaustedPolicy
ExhaustedPolicy controls action when finite retries are exhausted.
type FailureHook ¶
type FailureHook = cconsume.FailureHook
FailureHook is called on retry/exhausted failure events.
type FailureStage ¶
type FailureStage = cconsume.FailureStage
FailureStage marks current failure lifecycle stage.
type GroupConsumer ¶
type GroupConsumer struct {
// contains filtered or unexported fields
}
GroupConsumer wraps a Sarama consumer group with ordered shard processing. GroupConsumer 封装 Sarama 消费者组,提供有序分片处理能力。
func NewGroupConsumer ¶
func NewGroupConsumer(cfg GroupConsumerConfig) (*GroupConsumer, error)
NewGroupConsumer creates a configured Sarama consumer-group wrapper. 根据配置创建 GroupConsumer 实例,包括验证配置、创建消费者组、创建 DLQ 写入器。
func (*GroupConsumer) Close ¶
func (c *GroupConsumer) Close() error
Close releases consumer-group and owned DLQ producer. 依次关闭消费者组和 DLQ 写入器,使用 sync.Once 保证只关闭一次。
func (*GroupConsumer) Consume ¶
func (c *GroupConsumer) Consume(ctx context.Context, business consume.HandlerFunc) error
Consume starts consuming in a rebalance-safe loop. 在 rebalance 安全的循环中消费消息,每次 rebalance 创建新的 Handler。
type GroupConsumerConfig ¶
type GroupConsumerConfig struct {
// Brokers 是 Kafka 集群地址列表。
Brokers []string
// GroupID 是消费者组 ID。
GroupID string
// Topics 是订阅的 topic 列表。
Topics []string
// SaramaConfig 是底层 Sarama 配置,nil 时使用默认值。
SaramaConfig *sarama.Config
// ShardCount 是分片数量,用于按键有序处理。
ShardCount int
// ShardQueueSize 是每个分片队列的缓冲区大小。
ShardQueueSize int
// GlobalHandlers 是所有 topic 共享的中间件处理器。
GlobalHandlers []consume.Handler
// TopicHandlers 是按 topic 名字索引的专属中间件配置。
TopicHandlers map[string]ConsumeTopicHandlers
// KeyExtractor 从消息中提取用于分片路由的逻辑键。
KeyExtractor KeyExtractor
// Logger 是结构化日志记录器。
Logger *slog.Logger
// LoggerHandlerEnabled 控制是否启用日志中间件,nil 表示启用。
LoggerHandlerEnabled *bool
// RetryConfig 控制重试行为。
RetryConfig RetryConfig
// ExhaustedPolicy 控制有限重试耗尽后的策略。
ExhaustedPolicy ExhaustedPolicy
// DLQ 是死信队列配置,仅在耗尽策略为 DLQ 时必需。
DLQ *DLQConfig
// FailureHook 是失败事件回调函数。
FailureHook FailureHook
}
GroupConsumerConfig configures GroupConsumer. 消费者组的完整配置。
func (*GroupConsumerConfig) Validate ¶
func (cfg *GroupConsumerConfig) Validate() error
Validate normalizes and validates group consumer config. 规范化并验证消费者组配置,依次执行:补默认值、规范化输入、校验必填字段、 确保依赖、校验耗尽策略、校验重试配置、校验 DLQ、校验 topic 处理器。
type KeyExtractor ¶
type KeyExtractor func(*sarama.ConsumerMessage) (string, error)
KeyExtractor derives logical key for shard routing.
type MemoryOffsetStore ¶
type MemoryOffsetStore = xstore.MemoryOffsetStore
MemoryOffsetStore keeps offsets in-process only.
func NewMemoryOffsetStore ¶
func NewMemoryOffsetStore() *MemoryOffsetStore
NewMemoryOffsetStore creates default in-memory offset storage.
type OffsetStore ¶
type OffsetStore interface {
Load(
ctx context.Context,
topic string,
partition int32,
) (nextOffset int64, found bool, err error)
Save(ctx context.Context, topic string, partition int32, nextOffset int64) error
}
OffsetStore persists per-partition next offsets for partition mode.
type PartitionConsumer ¶
type PartitionConsumer struct {
// contains filtered or unexported fields
}
PartitionConsumer wraps one Sarama partition consumer with ordered shard processing. PartitionConsumer 封装 Sarama 分区消费者,提供有序分片处理和自动重连能力。
func NewPartitionConsumer ¶
func NewPartitionConsumer(cfg PartitionConsumerConfig) (*PartitionConsumer, error)
NewPartitionConsumer creates a configured partition-mode Kafka consumer. 根据配置创建 PartitionConsumer 实例,包括验证配置、创建消费者、创建 DLQ 写入器。
func (*PartitionConsumer) Close ¶
func (c *PartitionConsumer) Close() error
Close releases partition consumer and owned DLQ producer. 依次关闭分区消费者和 DLQ 写入器,使用 sync.Once 保证只关闭一次。
func (*PartitionConsumer) Consume ¶
func (c *PartitionConsumer) Consume(ctx context.Context, business consume.HandlerFunc) error
Consume starts partition-mode consume loop with auto reconnect. 启动分区消费循环,支持自动重连。委托 Runner 处理实际消费逻辑。
func (*PartitionConsumer) String ¶
func (c *PartitionConsumer) String() string
String 返回分区消费者的可读标识,格式为 "partition-consumer(topic:partition)"。
type PartitionConsumerConfig ¶
type PartitionConsumerConfig struct {
// Brokers 是 Kafka 集群地址列表。
Brokers []string
// Topic 是消费的目标 topic。
Topic string
// Partition 是消费的目标分区号。
Partition int32
// SaramaConfig 是底层 Sarama 配置,nil 时使用默认值。
SaramaConfig *sarama.Config
// ShardCount 是分片数量,用于按键有序处理。
ShardCount int
// ShardQueueSize 是每个分片队列的缓冲区大小。
ShardQueueSize int
// GlobalHandlers 是所有消息共享的中间件处理器。
GlobalHandlers []consume.Handler
// KeyExtractor 从消息中提取用于分片路由的逻辑键。
KeyExtractor KeyExtractor
// Logger 是结构化日志记录器。
Logger *slog.Logger
// LoggerHandlerEnabled 控制是否启用日志中间件,nil 表示启用。
LoggerHandlerEnabled *bool
// RetryConfig 控制重试行为。
RetryConfig RetryConfig
// ExhaustedPolicy 控制有限重试耗尽后的策略。
ExhaustedPolicy ExhaustedPolicy
// DLQ 是死信队列配置。
DLQ *DLQConfig
// FailureHook 是失败事件回调函数。
FailureHook FailureHook
// OffsetStore 是分区 offset 持久化存储。
OffsetStore OffsetStore
// InitialOffset 是首次消费时的起始 offset。
InitialOffset int64
// Reconnect 控制重连退避策略。
Reconnect BackoffConfig
}
PartitionConsumerConfig configures PartitionConsumer. 分区消费者的完整配置。
func (*PartitionConsumerConfig) Validate ¶
func (cfg *PartitionConsumerConfig) Validate() error
Validate normalizes and validates partition consumer config. 规范化并验证分区消费者配置,依次执行:补默认值、规范化输入、校验必填字段、 确保依赖、校验耗尽策略、校验重试配置、校验 DLQ、校验重连配置。
type ProduceTopicHandlers ¶
type ProduceTopicHandlers struct {
// Mode 决定 topic 处理器与全局处理器的组合模式(追加或替换)。
Mode ChainMode
// Handlers 是该 topic 专属的中间件处理器列表。
Handlers []produce.Handler
}
ProduceTopicHandlers describes topic-specific producer middleware composition. 描述特定 topic 的生产者中间件组合方式。
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer wraps one Sarama SyncProducer with sync/batch/async capabilities. Producer 封装了 Sarama SyncProducer,提供同步、批量、异步发送能力。
func NewProducer ¶
func NewProducer(cfg ProducerConfig) (*Producer, error)
NewProducer creates a configured producer wrapper. 根据配置创建 Producer 实例,包括验证配置、创建发送器、初始化异步运行时。
func (*Producer) Close ¶
Close stops runtime and closes owned producer. 关闭异步运行时和发送器,使用 sync.Once 保证只关闭一次。
func (*Producer) ProduceAsync ¶
ProduceAsync queues one message into async runtime. 异步发送一条消息,消息进入运行时队列后立即返回 Future。
func (*Producer) ProduceBatch ¶
func (p *Producer) ProduceBatch( ctx context.Context, msgs ...*produce.Message, ) ([]*produce.Result, error)
ProduceBatch sends messages sequentially and fails fast on first error. It is kept for compatibility and is not suitable for per-item acknowledgement flows such as xevent outbox relays. 按顺序逐条发送消息,遇到第一条错误立即返回(快速失败)。
func (*Producer) ProduceBatchReport ¶
func (p *Producer) ProduceBatchReport( ctx context.Context, msgs ...*produce.Message, ) ([]produce.BatchItemResult, error)
ProduceBatchReport sends messages and returns a per-item outcome vector. A top-level error is returned only for call-level failures such as a nil producer or a context that is already canceled before the call starts. 逐条发送消息并返回每条的独立结果,仅当调用级别出错时返回顶层错误。
type ProducerConfig ¶
type ProducerConfig struct {
// Brokers 是 Kafka 集群地址列表。
Brokers []string
// DefaultTopic 是消息未指定 topic 时的默认目标 topic。
DefaultTopic string
// SaramaConfig 是底层 Sarama 配置,nil 时使用默认值。
SaramaConfig *sarama.Config
// SyncProducer 是外部传入的同步生产者,nil 时自动创建。
SyncProducer sarama.SyncProducer
// Dispatch 控制异步消息分发的路由策略。
Dispatch ProducerDispatchConfig
// GlobalHandlers 是所有 topic 共享的中间件处理器。
GlobalHandlers []produce.Handler
// TopicHandlers 是按 topic 名字索引的专属中间件配置。
TopicHandlers map[string]ProduceTopicHandlers
// Logger 是结构化日志记录器。
Logger *slog.Logger
// LoggerHandlerEnabled 控制是否启用日志中间件,nil 表示启用。
LoggerHandlerEnabled *bool
// RetryConfig 控制重试行为。
RetryConfig RetryConfig
// ExhaustedPolicy 控制有限重试耗尽后的策略。
ExhaustedPolicy ProducerExhaustedPolicy
// FailureHook 是失败事件回调函数。
FailureHook ProducerFailureHook
}
ProducerConfig configures Producer. Producer 的完整配置,涵盖 broker、发送器、分发、中间件、重试等。
func (*ProducerConfig) Validate ¶
func (cfg *ProducerConfig) Validate() error
Validate normalizes and validates producer config. 规范化并验证生产者配置,依次执行:补默认值、规范化输入、校验必填字段、 校验分发配置、确保依赖、校验重试配置、校验耗尽策略、校验 topic 处理器。
type ProducerDispatchConfig ¶
type ProducerDispatchConfig struct {
// Mode 指定消息分发模式:串行、按键分片或并行轮询。
Mode ProducerDispatchMode
// ShardCount 是按键分片模式下的分片数量。
ShardCount int
// WorkerCount 是并行轮询模式下的工作协程数量。
WorkerCount int
// QueueSize 是每个工作队列的缓冲区大小。
QueueSize int
}
ProducerDispatchConfig controls async runtime queueing and routing. 控制异步生产者运行时的队列路由策略。
type ProducerDispatchMode ¶
type ProducerDispatchMode string
ProducerDispatchMode controls async dispatch behavior.
const ( // ProducerDispatchModeSerial routes all messages to one worker. ProducerDispatchModeSerial ProducerDispatchMode = "serial" // ProducerDispatchModeKeySharded routes by key hash modulo shard count. ProducerDispatchModeKeySharded ProducerDispatchMode = "key_sharded" // ProducerDispatchModeParallel routes by round-robin across workers. ProducerDispatchModeParallel ProducerDispatchMode = "parallel" // DefaultProducerDispatchMode is default async dispatch mode. DefaultProducerDispatchMode = ProducerDispatchModeKeySharded )
type ProducerExhaustedPolicy ¶
type ProducerExhaustedPolicy = pproduce.ExhaustedPolicy
ProducerExhaustedPolicy controls action when finite retries are exhausted.
type ProducerFailureEvent ¶
ProducerFailureEvent is emitted to ProducerFailureHook.
type ProducerFailureHook ¶
type ProducerFailureHook = pproduce.FailureHook
ProducerFailureHook is called on producer retry/exhausted failure events.
type ProducerFailureStage ¶
type ProducerFailureStage = pproduce.FailureStage
ProducerFailureStage marks producer failure lifecycle stage.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
middleware
|
|
|
consume
Package consume provides consumer-side middleware types and composition helpers.
|
Package consume provides consumer-side middleware types and composition helpers. |
|
consume/trace
Package trace provides consume-side OpenTelemetry tracing middleware.
|
Package trace provides consume-side OpenTelemetry tracing middleware. |
|
produce
Package produce provides producer-side middleware types and composition helpers.
|
Package produce provides producer-side middleware types and composition helpers. |
|
produce/trace
Package trace provides produce-side OpenTelemetry tracing middleware.
|
Package trace provides produce-side OpenTelemetry tracing middleware. |