Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BuildChainFunc ¶
type BuildChainFunc func(topic string, business consume.HandlerFunc) consume.HandlerFunc
BuildChainFunc builds one topic consume chain around the business handler.
type Config ¶
type Config struct {
// ShardCount 是分片数量。
ShardCount int
// ShardQueueSize 是每个分片队列的缓冲区大小。
ShardQueueSize int
// ExtractLogicalKey 从消息中提取逻辑键的函数。
ExtractLogicalKey router.ConsumeKeyExtractor
// ShardForKey 根据逻辑键计算分片索引的函数。
ShardForKey ShardRouter
// BuildChain 构建指定 topic 的中间件链。
BuildChain BuildChainFunc
// Business 是最终的业务处理函数。
Business consume.HandlerFunc
}
Config describes session runtime dependencies. 消费者组会话运行时的配置。
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler implements Sarama ConsumerGroupHandler with sharded ordering. 实现 Sarama ConsumerGroupHandler 接口,使用分片队列保证消息顺序。
func NewHandler ¶
NewHandler creates a runtime-backed consumer group handler.
func (*Handler) Cleanup ¶
func (h *Handler) Cleanup(_ sarama.ConsumerGroupSession) error
Cleanup waits for runtime workers and returns fatal errors if present. 在 Sarama rebalance 的 Cleanup 阶段关闭运行时并检查致命错误。
func (*Handler) ConsumeClaim ¶
func (h *Handler) ConsumeClaim( _ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, ) error
ConsumeClaim routes each message into shard workers. 消费消息循环:观察 offset、提取逻辑键、计算分片、构建上下文、入队。
type ShardRouter ¶
ShardRouter maps one logical key to a shard index.
Click to show internal directories.
Click to hide internal directories.