mq

package
v0.0.0-...-0098b58 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 27, 2025 License: Apache-2.0 Imports: 11 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumeMessageFunc

type ConsumeMessageFunc func(Message)

ConsumeMessageFunc 处理消息方法,默认不应答ack.

type ConsumeMsgOption

type ConsumeMsgOption struct {
	Poolsize int
	Ctx      context.Context
}

func MergeConsumeMsgOptions

func MergeConsumeMsgOptions(opts []ConsumeMsgOption) ConsumeMsgOption

func NewConsumeMsgOption

func NewConsumeMsgOption() *ConsumeMsgOption

func (*ConsumeMsgOption) WithContext

func (opt *ConsumeMsgOption) WithContext(ctx context.Context) *ConsumeMsgOption

func (*ConsumeMsgOption) WithPoolsize

func (opt *ConsumeMsgOption) WithPoolsize(size int) *ConsumeMsgOption

type KafkaConfig

type KafkaConfig struct {
	Topics             []string // @Description: 接受消息的topic
	ConsumerGroup      string   // @Description: 消费者组名称
	NumOfPartition     int
	NumOfReplica       int
	AutoCommitSecond   int
	ProducerBufferSize int
	ProducerFrequency  time.Duration // 生产者flush的频率
	Initial            int64         // 最新偏移消息
	Version            sarama.KafkaVersion
	ClientID           string
}

KafkaConfig @Description:.

func NewDefaultKafkaConfig

func NewDefaultKafkaConfig() *KafkaConfig

func ParseKafkaConfig

func ParseKafkaConfig(param map[string]string) (*KafkaConfig, error)

func (*KafkaConfig) GenConfig

func (c *KafkaConfig) GenConfig() *sarama.Config

type KafkaMessage

type KafkaMessage struct {
	Marked bool
	// contains filtered or unexported fields
}

KafkaMessage message

func (*KafkaMessage) Ack

func (msg *KafkaMessage) Ack() error

Ack reply ack

func (*KafkaMessage) Body

func (msg *KafkaMessage) Body() []byte

Body msg context

func (*KafkaMessage) ID

func (msg *KafkaMessage) ID() string

ID partition offset

func (*KafkaMessage) Nack

func (msg *KafkaMessage) Nack() error

Nack no ack

type KafkaMessageQueue

type KafkaMessageQueue struct {
	Source string
	// contains filtered or unexported fields
}

KafkaMessageQueue kafka实现的队列

func NewKafkaMessageQueue

func NewKafkaMessageQueue(source string) (*KafkaMessageQueue, error)

NewKafkaMessageQueue new message queue

func (*KafkaMessageQueue) Close

func (mq *KafkaMessageQueue) Close() error

Close mq

func (*KafkaMessageQueue) CreateTopic

func (mq *KafkaMessageQueue) CreateTopic(topic string) error

CreateTopic create topic if not exist param topic name

func (*KafkaMessageQueue) CreateTopics

func (mq *KafkaMessageQueue) CreateTopics() error

CreateTopics create topics

func (*KafkaMessageQueue) GenConfig

func (mq *KafkaMessageQueue) GenConfig() *sarama.Config

func (*KafkaMessageQueue) ReceiveMessage

func (mq *KafkaMessageQueue) ReceiveMessage() (<-chan Message, error)

ReceiveMessage receive message

func (*KafkaMessageQueue) SendMessage

func (mq *KafkaMessageQueue) SendMessage(msg []byte, opts ...*SendMsgOption) error

SendMessage implements

func (*KafkaMessageQueue) SetLogger

func (mq *KafkaMessageQueue) SetLogger(l *slog.Logger)

SetLogger add set logger method for mq interface

func (*KafkaMessageQueue) SyncSchema

func (mq *KafkaMessageQueue) SyncSchema() error

SyncSchema implements create topics

type MQDriver

type MQDriver interface {
	// 订阅
	Subscribe(topic string, handler func(msg []byte)) error
	// 发布
	Publish(topic string, msg []byte) error
}

type Message

type Message interface {
	ID() string
	Body() []byte
	Ack() error
	Nack() error
}

Message message content interface.

type MessageQueue

type MessageQueue interface {
	// SyncSchema create topic
	SyncSchema() error
	// SendMessage send message
	SendMessage(b []byte, opts ...*SendMsgOption) error
	// ReceiveMessage receive message
	ReceiveMessage() (<-chan Message, error)
	// Close mq close
	Close() error
}

MessageQueue 消息队列接口规范.

func NewMessageQueue

func NewMessageQueue(source string) (MessageQueue, error)

NewMessageQueue ...

type SendMsgOption

type SendMsgOption struct {
	Sendtime time.Time
	Key      string
}

func NewSendMsgOption

func NewSendMsgOption() *SendMsgOption

func (*SendMsgOption) WithKey

func (opt *SendMsgOption) WithKey(key string) *SendMsgOption

func (*SendMsgOption) WithSendtime

func (opt *SendMsgOption) WithSendtime(t time.Time) *SendMsgOption

type SlideWindow

type SlideWindow[T any] struct {
	// window size
	WindowSize int
	// window data
	Window []T
	// contains filtered or unexported fields
}

SlideWindow sliding window for kafka message consumer commit

func NewSlideWindow

func NewSlideWindow[T any](size int) (*SlideWindow[T], error)

func (*SlideWindow[T]) Add

func (w *SlideWindow[T]) Add(data T)

Add add data to window, if window is full, it will block until window has space

func (*SlideWindow[T]) SlideWihFunc

func (w *SlideWindow[T]) SlideWihFunc(f func(T) bool) (T, int)

SlideWihFunc slide window with function, if function return true, it will slide to next, until function return false f is the function to slide window, if function return true, it will slide to next, until function return false return the last data that can slide to window, and the total number of data that can slide to window

type SlideWindowsMap

type SlideWindowsMap[T any] struct {
	// contains filtered or unexported fields
}

func NewSlideWindows

func NewSlideWindows[T any](size int) (*SlideWindowsMap[T], error)

func (*SlideWindowsMap[T]) Add

func (s *SlideWindowsMap[T]) Add(key string, data T)

func (*SlideWindowsMap[T]) SlideWihFunc

func (s *SlideWindowsMap[T]) SlideWihFunc(key string, f func(T) bool) (T, int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL