rabbitmq 延时消费队列
依赖地址
go get -u gitee.com/golang_common/go-mq@v1.0.6
使用方式
初始化消费端
//获取配置文件
host := g.Cfg().MustGet(gctx.New(), "rabbitmq.host").String()
port := g.Cfg().MustGet(gctx.New(), "rabbitmq.port").Int()
user := g.Cfg().MustGet(gctx.New(), "rabbitmq.user").String()
pwd := g.Cfg().MustGet(gctx.New(), "rabbitmq.password").String()
vhost := g.Cfg().MustGet(gctx.New(), "rabbitmq.vhost").String()
//初始化rabbitmq
mq := rabbitmq.GetRabbitMQ()
//连接rabbitmq
err := mq.Conn(host, port, user, pwd, vhost)
if err != nil {
glog.Errorf(gctx.New(), "rabbitmq connect fail%s", err)
}
//创建交换机跟队列
if err = mq.ExchangeQueueCreate(map[rabbitmq.ExchangeName]*rabbitmq.Exchange{
"test_exchange1": { // 交换机名字,自己定义的
BindQueues: map[rabbitmq.QueueName]*rabbitmq.Queue{
"test_queue1": {}, //队列名字自己定义
},
},
}); err != nil {
panic(err)
} else {
fmt.Println("ExchangeQueueCreate success")
}
监听消费端
go func() {
_ = m.RegisterConsumer("", &Consumer{
QueueName: "test_queue1", //队列名字
ConsumeFunc: func(msg []byte) error {
fmt.Printf("消费端接受到数据: %s,时间为: %s\n", string(msg), time.Now().Format("2006-01-02 15:04:05"))
return nil
},
})
}()
select {}
生产端
//获取配置文件
host := g.Cfg().MustGet(gctx.New(), "rabbitmq.host").String()
port := g.Cfg().MustGet(gctx.New(), "rabbitmq.port").Int()
user := g.Cfg().MustGet(gctx.New(), "rabbitmq.user").String()
pwd := g.Cfg().MustGet(gctx.New(), "rabbitmq.password").String()
vhost := g.Cfg().MustGet(gctx.New(), "rabbitmq.vhost").String()
//初始化rabbitmq
mq := rabbitmq.GetRabbitMQ()
//连接rabbitmq
err := mq.Conn(host, port, user, pwd, vhost)
if err != nil {
glog.Errorf(gctx.New(), "rabbitmq connect fail%s", err)
}
发送普通消息 (注意,这种发送的消息是普通消息.是不会有延时队列效果的)
mq := rabbitmq.GetRabbitMQ()
err = mq.SendToQueue("test_queue1", map[string]interface{}{
"id": i + 1,
})
发送延时消息
mq := rabbitmq.GetRabbitMQ()
err := mq.SendToQueueDelay("test_queue1", 5*time.Minute, map[string]interface{}{
"id": i + 1,
})
if err != nil {
return err
}