group

package
v0.0.0-...-8c522c3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BuildChainFunc

type BuildChainFunc func(topic string, business consume.HandlerFunc) consume.HandlerFunc

BuildChainFunc builds one topic consume chain around the business handler.

type Config

type Config struct {
	// ShardCount 是分片数量。
	ShardCount int
	// ShardQueueSize 是每个分片队列的缓冲区大小。
	ShardQueueSize int

	// ExtractLogicalKey 从消息中提取逻辑键的函数。
	ExtractLogicalKey router.ConsumeKeyExtractor
	// ShardForKey 根据逻辑键计算分片索引的函数。
	ShardForKey ShardRouter
	// BuildChain 构建指定 topic 的中间件链。
	BuildChain BuildChainFunc
	// Business 是最终的业务处理函数。
	Business consume.HandlerFunc
}

Config describes session runtime dependencies. 消费者组会话运行时的配置。

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler implements Sarama ConsumerGroupHandler with sharded ordering. 实现 Sarama ConsumerGroupHandler 接口,使用分片队列保证消息顺序。

func NewHandler

func NewHandler(cfg Config) *Handler

NewHandler creates a runtime-backed consumer group handler.

func (*Handler) Cleanup

func (h *Handler) Cleanup(_ sarama.ConsumerGroupSession) error

Cleanup waits for runtime workers and returns fatal errors if present. 在 Sarama rebalance 的 Cleanup 阶段关闭运行时并检查致命错误。

func (*Handler) ConsumeClaim

func (h *Handler) ConsumeClaim(
	_ sarama.ConsumerGroupSession,
	claim sarama.ConsumerGroupClaim,
) error

ConsumeClaim routes each message into shard workers. 消费消息循环:观察 offset、提取逻辑键、计算分片、构建上下文、入队。

func (*Handler) FatalErr

func (h *Handler) FatalErr() error

FatalErr returns fatal processing error in this consume session.

func (*Handler) Setup

func (h *Handler) Setup(session sarama.ConsumerGroupSession) error

Setup initializes one consume session runtime. 在 Sarama rebalance 的 Setup 阶段创建新的会话运行时。

type ShardRouter

type ShardRouter func(logicalKey string) int

ShardRouter maps one logical key to a shard index.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL