kafka

package
v0.1.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2023 License: Apache-2.0 Imports: 20 Imported by: 3

Documentation

Index

Constants

View Source
const (
	ConsumerMsgs        = 3
	AggregationMessages = 4
	ChannelNum          = 100
)

Variables

This section is empty.

Functions

func OperationIDGenerator added in v0.1.14

func OperationIDGenerator() string

Types

type BatchAggregationIdListHandlerF added in v0.1.14

type BatchAggregationIdListHandlerF func(triggerID string, idList []string)

type BatchConsumerGroup added in v0.1.14

type BatchConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

BatchConsumerGroup kafka consumer

func MustKafkaBatchConsumer added in v0.1.14

func MustKafkaBatchConsumer(c *KafkaConsumerConf) *BatchConsumerGroup

func (*BatchConsumerGroup) Cleanup added in v0.1.14

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*BatchConsumerGroup) ConsumeClaim added in v0.1.14

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*BatchConsumerGroup) Group added in v0.1.14

func (c *BatchConsumerGroup) Group() string

func (*BatchConsumerGroup) MessagesDistributionHandle added in v0.1.14

func (c *BatchConsumerGroup) MessagesDistributionHandle()

func (*BatchConsumerGroup) RegisterHandlers added in v0.1.14

func (*BatchConsumerGroup) Run added in v0.1.14

func (c *BatchConsumerGroup) Run(channelID int)

func (*BatchConsumerGroup) Setup added in v0.1.14

Setup is run at the beginning of a new session, before ConsumeClaim

func (*BatchConsumerGroup) Start added in v0.1.14

func (c *BatchConsumerGroup) Start()

Start start consume messages, watch signals

func (*BatchConsumerGroup) Stop added in v0.1.14

func (c *BatchConsumerGroup) Stop()

Stop Stop consume messages, watch signals

func (*BatchConsumerGroup) Topics added in v0.1.14

func (c *BatchConsumerGroup) Topics() []string

type BatchMessageHandlerF added in v0.1.14

type BatchMessageHandlerF func(ctx context.Context, msgs MsgChannelValue)

type CMessage added in v0.1.17

type CMessage struct {
	Message     *sarama.ConsumerMessage
	MarkMessage func()
}

type Cmd2Value added in v0.1.14

type Cmd2Value struct {
	Cmd   int
	Value interface{}
}

type ConsumerGroup

type ConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

ConsumerGroup kafka consumer

func MustKafkaConsumer

func MustKafkaConsumer(c *KafkaConsumerConf) *ConsumerGroup

func (*ConsumerGroup) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*ConsumerGroup) ConsumeClaim

func (c *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*ConsumerGroup) Group

func (c *ConsumerGroup) Group() string

func (*ConsumerGroup) RegisterHandlers

func (c *ConsumerGroup) RegisterHandlers(topic string, cb MessageHandlerF)

func (*ConsumerGroup) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

func (*ConsumerGroup) Start

func (c *ConsumerGroup) Start()

Start start consume messages, watch signals

func (*ConsumerGroup) Stop

func (c *ConsumerGroup) Stop()

Stop Stop consume messages, watch signals

func (*ConsumerGroup) Topics

func (c *ConsumerGroup) Topics() []string

type KafkaBatchConsumerConf added in v0.1.14

type KafkaBatchConsumerConf struct {
	KafkaConsumerConf
	Duration   int `json:",default=100"`
	ChannelNum int `json:"default=50"`
}

KafkaBatchConsumerConf kafka client settings.

type KafkaConsumerConf

type KafkaConsumerConf struct {
	Brokers []string
	Topics  []string
	Group   string
}

KafkaConsumerConf kafka client settings.

type KafkaProducerConf

type KafkaProducerConf struct {
	Brokers []string
	Topic   string
}

KafkaProducerConf kafka producer settings.

type KafkaShardingConsumerConf added in v0.1.17

type KafkaShardingConsumerConf struct {
	KafkaConsumerConf
	Concurrency int    `json:",default=2048"`
	QueueBuffer int    `json:",default=128"`
	ClientId    string `json:",default=sarama"`
}

type MessageHandlerF

type MessageHandlerF func(ctx context.Context, key string, value []byte)

type MsgChannelValue added in v0.1.14

type MsgChannelValue struct {
	AggregationID string //maybe userID or super groupID
	TriggerID     string
	MsgList       []*MsgDataToMQ
}

type MsgDataToMQ added in v0.1.14

type MsgDataToMQ struct {
	TraceId string
	MsgType string
	MsgData []byte
}

type None added in v0.1.17

type None struct{}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func GetCachedMQClient

func GetCachedMQClient(c *KafkaProducerConf) *Producer

func MustKafkaProducer

func MustKafkaProducer(c *KafkaProducerConf) *Producer

func (*Producer) Close

func (p *Producer) Close() (err error)

func (*Producer) SendMessage

func (p *Producer) SendMessage(ctx context.Context, key string, value []byte) (partition int32, offset int64, err error)

SendMessage Input send msg to kafka NOTE: If producer has beed created failed, the message will lose.

func (*Producer) Topic

func (p *Producer) Topic() string

type ShardingConsumerGroup added in v0.1.17

type ShardingConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

ShardingConsumerGroup represents a Sarama consumer GroupName consumer

func MustShardingConsumerGroup added in v0.1.17

func MustShardingConsumerGroup(c *KafkaShardingConsumerConf) *ShardingConsumerGroup

func (*ShardingConsumerGroup) Cleanup added in v0.1.17

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*ShardingConsumerGroup) ConsumeClaim added in v0.1.17

func (c *ShardingConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error)

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*ShardingConsumerGroup) Group added in v0.1.17

func (c *ShardingConsumerGroup) Group() string

func (*ShardingConsumerGroup) RegisterHandler added in v0.1.17

func (c *ShardingConsumerGroup) RegisterHandler(cb MessageHandlerF)

func (*ShardingConsumerGroup) Run added in v0.1.17

func (c *ShardingConsumerGroup) Run(channelID int)

func (*ShardingConsumerGroup) Setup added in v0.1.17

Setup is run at the beginning of a new session, before ConsumeClaim

func (*ShardingConsumerGroup) Sharding added in v0.1.17

func (c *ShardingConsumerGroup) Sharding(message *sarama.ConsumerMessage) int

func (*ShardingConsumerGroup) Start added in v0.1.17

func (c *ShardingConsumerGroup) Start()

Start start consume messages, watch signals

func (*ShardingConsumerGroup) Stop added in v0.1.17

func (c *ShardingConsumerGroup) Stop()

Stop Stop consume messages, watch signals

func (*ShardingConsumerGroup) Topics added in v0.1.17

func (c *ShardingConsumerGroup) Topics() []string

type TriggerChannelValue added in v0.1.14

type TriggerChannelValue struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL