fkafka

package
v1.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PackageNameProducer      = "fkafka.producer"
	PackageNameConsumerGroup = "fkafka.consumerGroup"
)
View Source
const (
	CodeOK    = "OK"
	CodeError = "Error"
)

Variables

This section is empty.

Functions

func ConsumerMessageToMap

func ConsumerMessageToMap(m *sarama.ConsumerMessage) map[string]interface{}

Types

type ClientConfig

type ClientConfig struct {
	Brokers      []string // Brokers brokers地址
	AuthType     string   // 鉴权方式, password / none. 默认为: none
	SaslUsername string   // 鉴权方式为 password 时, 必填
	SaslPassword string   // 鉴权方式为 password 时, 必填
	Version      string   // kafka version, 默认为 2.0.0.0
}

type Config

type Config struct {
	Debug                      bool
	EnableAccessInterceptorReq bool // 是否开启记录 publish 消息,默认开启
	EnableAccessInterceptorRes bool // 是否开启记录 consumer 消费消息, 默认开启

	ClientConfig         ClientConfig
	ProducerConfig       ProducerConfig
	ConsumerGroupConfigs map[string]ConsumerGroupConfig
}

func DefaultConfig

func DefaultConfig() *Config

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

func NewConsumerGroup

func NewConsumerGroup(name string, config *Config, groupConfig *ConsumerGroupConfig) (*ConsumerGroup, error)

func (*ConsumerGroup) Cleanup

func (s *ConsumerGroup) Cleanup(session sarama.ConsumerGroupSession) error

func (*ConsumerGroup) Close

func (s *ConsumerGroup) Close() error

func (*ConsumerGroup) ConsumeClaim

func (s *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*ConsumerGroup) SetHandler

func (s *ConsumerGroup) SetHandler(h Handler)

func (*ConsumerGroup) Setup

func (s *ConsumerGroup) Setup(session sarama.ConsumerGroupSession) error

func (*ConsumerGroup) Start

func (s *ConsumerGroup) Start()

type ConsumerGroupConfig

type ConsumerGroupConfig struct {
	Topics        []string
	GroupID       string
	InitialOffset string // 初始化 offset, oldest / newest, 默认 oldest
	RetryConfig   RetryConfig
}

type Handler

type Handler func(ctx context.Context, message *sarama.ConsumerMessage) error

type Message

type Message struct {
	Topic string
	Key   []byte
	Value []byte
	// The headers are key-value pairs that are transparently passed
	// by Kafka between producers and consumers.
	Headers []sarama.RecordHeader

	// This field is used to hold arbitrary data you wish to include, so it
	// will be available when receiving on the Successes and Errors channels.
	// Sarama completely ignores this field and is only to be used for
	// pass-through data.
	Metadata interface{}
}

Message sarama.ProducerMessage for kafka publish

func (*Message) ToMap

func (m *Message) ToMap() map[string]interface{}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(name string, config *Config) (*Producer, error)

func (*Producer) Close

func (c *Producer) Close() error

func (*Producer) SendMessage

func (c *Producer) SendMessage(ctx context.Context, msg *Message) error

type ProducerConfig

type ProducerConfig struct {
	MaxMessageBytes int
}

type RetryConfig

type RetryConfig struct {
	MaxRetries int64 // consumer 消费重试次数, 默认 0 不重试
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL