Documentation
¶
Index ¶
- func SetKafkaScramUser(user, pwd string, algo scram.Algorithm) option
- func SetKafkaUser(user, pwd string) option
- type KafkaSendMgr
- func (this *KafkaSendMgr) Completion(messages []kafkago.Message, err error)
- func (this *KafkaSendMgr) DebugLogger(format string, param ...interface{})
- func (this *KafkaSendMgr) ErrorLogger(format string, param ...interface{})
- func (this *KafkaSendMgr) Send(index int, key string, value []byte)
- func (this *KafkaSendMgr) SendWithTopic(topic string, key string, value []byte)
- func (this *KafkaSendMgr) Start(ctx context.Context)
- func (this *KafkaSendMgr) Stop()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SetKafkaScramUser ¶
SetKafkaScramUser 设置Kafka的SCRAM用户 如果用户名或密码为空,则不设置 该函数用于配置Kafka的SASL认证机制,使用SCRAM机制传输用户名和密码 参数algo指定SCRAM算法类型,如scram.SHA256或scram.SHA512
func SetKafkaUser ¶
func SetKafkaUser(user, pwd string) option
SetKafkaUser 设置Kafka的用户名和密码 如果用户名或密码为空,则不设置 该函数用于配置Kafka的SASL认证机制,使用PLAIN机制传输用户名和密码
Types ¶
type KafkaSendMgr ¶
type KafkaSendMgr struct {
Writer kafkago.Writer // Kafka Writer 实例
// contains filtered or unexported fields
}
KafkaSendMgr 是 Kafka 消息发送管理器,负责管理 Kafka Writer、上下文、主题和地址等信息。 通过该结构体可以实现 Kafka 消息的异步发送、认证配置、启动与关闭等功能。
func NewKafkaMgr ¶
func NewKafkaMgr(topics []string, addrs []string, compf func(messages []kafkago.Message, err error), opts ...option) (result *KafkaSendMgr)
NewKafkaMgr 创建并初始化一个 KafkaSendMgr 实例。
参数: - topics: 主题列表 - addrs: Broker 地址列表 - compf: 消息发送完成回调函数(可为 nil,默认为内部 Completion) - opts: 其他可选配置(如 SASL 认证)
返回: - KafkaSendMgr 实例指针
func (*KafkaSendMgr) Completion ¶
func (this *KafkaSendMgr) Completion(messages []kafkago.Message, err error)
Completion 默认的消息发送失败回调。
参数: - messages: 发送失败的消息 - err: 错误信息
func (*KafkaSendMgr) DebugLogger ¶
func (this *KafkaSendMgr) DebugLogger(format string, param ...interface{})
DebugLogger Kafka Writer 的调试日志回调。
参数: - format: 日志格式 - param: 日志参数
func (*KafkaSendMgr) ErrorLogger ¶
func (this *KafkaSendMgr) ErrorLogger(format string, param ...interface{})
ErrorLogger Kafka Writer 的错误日志回调。
参数: - format: 日志格式 - param: 日志参数
func (*KafkaSendMgr) Send ¶
func (this *KafkaSendMgr) Send(index int, key string, value []byte)
Send 发送消息到 topics[index] 指定的主题。
参数: - index: 主题在 topics 列表中的索引 - key: 消息 key - value: 消息内容
func (*KafkaSendMgr) SendWithTopic ¶
func (this *KafkaSendMgr) SendWithTopic(topic string, key string, value []byte)
SendWithTopic 发送消息到指定的 topic。
参数: - topic: 目标主题 - key: 消息 key - value: 消息内容
func (*KafkaSendMgr) Start ¶
func (this *KafkaSendMgr) Start(ctx context.Context)
Start 启动 KafkaSendMgr,初始化上下文。
参数: - ctx: 父级上下文
func (*KafkaSendMgr) Stop ¶
func (this *KafkaSendMgr) Stop()
Stop 安全关闭 KafkaSendMgr,关闭 Writer 并取消上下文。