Documentation
¶
Index ¶
- type ConsumeMessageFunc
- type ConsumeMsgOption
- type KafkaConfig
- type KafkaMessage
- type KafkaMessageQueue
- func (mq *KafkaMessageQueue) Close() error
- func (mq *KafkaMessageQueue) CreateTopic(topic string) error
- func (mq *KafkaMessageQueue) CreateTopics() error
- func (mq *KafkaMessageQueue) GenConfig() *sarama.Config
- func (mq *KafkaMessageQueue) ReceiveMessage() (<-chan Message, error)
- func (mq *KafkaMessageQueue) SendMessage(msg []byte, opts ...*SendMsgOption) error
- func (mq *KafkaMessageQueue) SetLogger(l *slog.Logger)
- func (mq *KafkaMessageQueue) SyncSchema() error
- type MQDriver
- type Message
- type MessageQueue
- type SendMsgOption
- type SlideWindow
- type SlideWindowsMap
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumeMsgOption ¶
func MergeConsumeMsgOptions ¶
func MergeConsumeMsgOptions(opts []ConsumeMsgOption) ConsumeMsgOption
func NewConsumeMsgOption ¶
func NewConsumeMsgOption() *ConsumeMsgOption
func (*ConsumeMsgOption) WithContext ¶
func (opt *ConsumeMsgOption) WithContext(ctx context.Context) *ConsumeMsgOption
func (*ConsumeMsgOption) WithPoolsize ¶
func (opt *ConsumeMsgOption) WithPoolsize(size int) *ConsumeMsgOption
type KafkaConfig ¶
type KafkaConfig struct { Topics []string // @Description: 接受消息的topic ConsumerGroup string // @Description: 消费者组名称 NumOfPartition int NumOfReplica int AutoCommitSecond int ProducerBufferSize int ProducerFrequency time.Duration // 生产者flush的频率 Initial int64 // 最新偏移消息 Version sarama.KafkaVersion ClientID string }
KafkaConfig @Description:.
func NewDefaultKafkaConfig ¶
func NewDefaultKafkaConfig() *KafkaConfig
func ParseKafkaConfig ¶
func ParseKafkaConfig(param map[string]string) (*KafkaConfig, error)
func (*KafkaConfig) GenConfig ¶
func (c *KafkaConfig) GenConfig() *sarama.Config
type KafkaMessage ¶
type KafkaMessage struct { Marked bool // contains filtered or unexported fields }
KafkaMessage message
type KafkaMessageQueue ¶
type KafkaMessageQueue struct { Source string // contains filtered or unexported fields }
KafkaMessageQueue kafka实现的队列
func NewKafkaMessageQueue ¶
func NewKafkaMessageQueue(source string) (*KafkaMessageQueue, error)
NewKafkaMessageQueue new message queue
func (*KafkaMessageQueue) CreateTopic ¶
func (mq *KafkaMessageQueue) CreateTopic(topic string) error
CreateTopic create topic if not exist param topic name
func (*KafkaMessageQueue) CreateTopics ¶
func (mq *KafkaMessageQueue) CreateTopics() error
CreateTopics create topics
func (*KafkaMessageQueue) GenConfig ¶
func (mq *KafkaMessageQueue) GenConfig() *sarama.Config
func (*KafkaMessageQueue) ReceiveMessage ¶
func (mq *KafkaMessageQueue) ReceiveMessage() (<-chan Message, error)
ReceiveMessage receive message
func (*KafkaMessageQueue) SendMessage ¶
func (mq *KafkaMessageQueue) SendMessage(msg []byte, opts ...*SendMsgOption) error
SendMessage implements
func (*KafkaMessageQueue) SetLogger ¶
func (mq *KafkaMessageQueue) SetLogger(l *slog.Logger)
SetLogger add set logger method for mq interface
func (*KafkaMessageQueue) SyncSchema ¶
func (mq *KafkaMessageQueue) SyncSchema() error
SyncSchema implements create topics
type MessageQueue ¶
type MessageQueue interface { // SyncSchema create topic SyncSchema() error // SendMessage send message SendMessage(b []byte, opts ...*SendMsgOption) error // ReceiveMessage receive message ReceiveMessage() (<-chan Message, error) // Close mq close Close() error }
MessageQueue 消息队列接口规范.
func NewMessageQueue ¶
func NewMessageQueue(source string) (MessageQueue, error)
NewMessageQueue ...
type SendMsgOption ¶
func NewSendMsgOption ¶
func NewSendMsgOption() *SendMsgOption
func (*SendMsgOption) WithKey ¶
func (opt *SendMsgOption) WithKey(key string) *SendMsgOption
func (*SendMsgOption) WithSendtime ¶
func (opt *SendMsgOption) WithSendtime(t time.Time) *SendMsgOption
type SlideWindow ¶
type SlideWindow[T any] struct { // window size WindowSize int // window data Window []T // contains filtered or unexported fields }
SlideWindow sliding window for kafka message consumer commit
func NewSlideWindow ¶
func NewSlideWindow[T any](size int) (*SlideWindow[T], error)
func (*SlideWindow[T]) Add ¶
func (w *SlideWindow[T]) Add(data T)
Add add data to window, if window is full, it will block until window has space
func (*SlideWindow[T]) SlideWihFunc ¶
func (w *SlideWindow[T]) SlideWihFunc(f func(T) bool) (T, int)
SlideWihFunc slide window with function, if function return true, it will slide to next, until function return false f is the function to slide window, if function return true, it will slide to next, until function return false return the last data that can slide to window, and the total number of data that can slide to window
type SlideWindowsMap ¶
type SlideWindowsMap[T any] struct { // contains filtered or unexported fields }
func NewSlideWindows ¶
func NewSlideWindows[T any](size int) (*SlideWindowsMap[T], error)
func (*SlideWindowsMap[T]) Add ¶
func (s *SlideWindowsMap[T]) Add(key string, data T)
func (*SlideWindowsMap[T]) SlideWihFunc ¶
func (s *SlideWindowsMap[T]) SlideWihFunc(key string, f func(T) bool) (T, int)