queue

package
v0.5.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SendMsg
	ReceiveMsg
)

Variables

This section is empty.

Functions

func RegisterKafkaProducer added in v0.2.33

func RegisterKafkaProducer(connOpt KafkaConfig, mqIns *KafkaMq)

RegisterKafkaProducerAsync 注册同步类型实例

Types

type Consumer added in v0.2.33

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Cleanup added in v0.2.33

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer) ConsumeClaim added in v0.2.33

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*Consumer) Setup added in v0.2.33

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim

type KafkaConfig added in v0.2.33

type KafkaConfig struct {
	ClientId    string
	Brokers     []string
	GroupID     string
	Partitions  int32
	Replication int16
	Version     string
	UserName    string
	Password    string
}

type KafkaMq added in v0.2.33

type KafkaMq struct {
	Partitions int32
	// contains filtered or unexported fields
}

func (*KafkaMq) ListenReceiveMsgDo added in v0.2.33

func (r *KafkaMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg)) (err error)

ListenReceiveMsgDo 消费数据

func (*KafkaMq) SendByteMsg added in v0.2.33

func (r *KafkaMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error)

SendByteMsg 生产数据

func (*KafkaMq) SendMsg added in v0.2.33

func (r *KafkaMq) SendMsg(topic string, body string) (mqMsg MqMsg, err error)

SendMsg 按字符串类型生产数据

type MqConsumer added in v0.2.14

type MqConsumer interface {
	ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg)) (err error)
}

func NewConsumer added in v0.2.14

func NewConsumer(groupName string) (mqClient MqConsumer, err error)

NewConsumer 新建一个消费者实例

func RegisterKafkaMqConsumerMust added in v0.2.33

func RegisterKafkaMqConsumerMust(connOpt KafkaConfig) (client MqConsumer)

RegisterRedisMqConsumerMust 注册消费者

func RegisterRedisMqConsumerMust added in v0.2.14

func RegisterRedisMqConsumerMust(connOpt RedisOption, poolOpt PoolOption, groupName string) (client MqConsumer)

RegisterRedisMqConsumerMust 注册消费者

func RegisterRocketConsumerMust added in v0.2.14

func RegisterRocketConsumerMust(endPoints []string, groupName string) (client MqConsumer)

RegisterRocketConsumerMust 注册消费者

type MqMsg added in v0.2.14

type MqMsg struct {
	RunType   int       `json:"run_type"`
	Topic     string    `json:"topic"`
	MsgId     string    `json:"msg_id"`
	Offset    int64     `json:"offset"`
	Partition int32     `json:"partition"`
	Timestamp time.Time `json:"timestamp"`

	Body []byte `json:"body"`
}

func (*MqMsg) BodyString added in v0.2.14

func (m *MqMsg) BodyString() string

type MqProducer added in v0.2.14

type MqProducer interface {
	SendMsg(topic string, body string) (mqMsg MqMsg, err error)
	SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error)
}

func NewProducer added in v0.2.14

func NewProducer(groupName string) (mqClient MqProducer, err error)

NewProducer 新建一个生产者实例

func RegisterKafkaProducerMust added in v0.2.33

func RegisterKafkaProducerMust(connOpt KafkaConfig) (client MqProducer)

RegisterKafkaProducerMust 注册并启动生产者接口实现

func RegisterRedisMqProducerMust added in v0.2.14

func RegisterRedisMqProducerMust(connOpt RedisOption, poolOpt PoolOption, groupName string, retry int) (client MqProducer)

func RegisterRocketProducerMust added in v0.2.14

func RegisterRocketProducerMust(endPoints []string, groupName string, retry int) (client MqProducer)

RegisterRocketProducerMust 注册并启动生产者接口实现

type PoolOption added in v0.2.14

type PoolOption struct {
	InitCap     int
	MaxCap      int
	IdleTimeout int
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue() *Queue

func (*Queue) LPop

func (q *Queue) LPop() interface{}

func (*Queue) LPush

func (q *Queue) LPush(v interface{})

func (*Queue) Len

func (q *Queue) Len() int

func (*Queue) RPop

func (q *Queue) RPop() interface{}

func (*Queue) RPush

func (q *Queue) RPush(v interface{})

type RedisMq added in v0.2.14

type RedisMq struct {
	// contains filtered or unexported fields
}

func RegisterRedisMq added in v0.2.14

func RegisterRedisMq(connOpt RedisOption, poolOpt PoolOption, groupName string, retry int) (mqIns *RedisMq, err error)

RegisterRedisMq 注册redismq实例

func (*RedisMq) ListenReceiveMsgDo added in v0.2.14

func (r *RedisMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg)) (err error)

ListenReceiveMsgDo 消费数据

func (*RedisMq) SendByteMsg added in v0.2.14

func (r *RedisMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error)

SendByteMsg 生产数据

func (*RedisMq) SendMsg added in v0.2.14

func (r *RedisMq) SendMsg(topic string, body string) (mqMsg MqMsg, err error)

SendMsg 按字符串类型生产数据

type RedisOption added in v0.2.14

type RedisOption struct {
	Addr    string
	Passwd  string
	DBnum   int
	Timeout int
}

type RocketMq added in v0.2.14

type RocketMq struct {
	// contains filtered or unexported fields
}

func RegisterRocketMqConsumer added in v0.2.14

func RegisterRocketMqConsumer(endPoints []string, groupName string) (mqIns *RocketMq, err error)

RegisterRocketMqConsumer 注册rocketmq消费者

func RegisterRocketMqProducer added in v0.2.14

func RegisterRocketMqProducer(endPoints []string, groupName string, retry int) (mqIns *RocketMq, err error)

RegisterRocketMqProducer 注册rocketmq生产者

func (*RocketMq) ListenReceiveMsgDo added in v0.2.14

func (r *RocketMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg)) (err error)

ListenReceiveMsgDo 消费数据

func (*RocketMq) SendByteMsg added in v0.2.14

func (r *RocketMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error)

SendByteMsg 生产数据

func (*RocketMq) SendMsg added in v0.2.14

func (r *RocketMq) SendMsg(topic string, body string) (mqMsg MqMsg, err error)

SendMsg 按字符串类型生产数据

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL