rabbitmq

package module
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2025 License: MIT Imports: 12 Imported by: 0

README

rabbitmq 延时消费队列

依赖地址

 go get -u gitee.com/golang_common/go-mq@v1.0.6

使用方式

初始化消费端

    //获取配置文件
	host := g.Cfg().MustGet(gctx.New(), "rabbitmq.host").String()
	port := g.Cfg().MustGet(gctx.New(), "rabbitmq.port").Int()
	user := g.Cfg().MustGet(gctx.New(), "rabbitmq.user").String()
	pwd := g.Cfg().MustGet(gctx.New(), "rabbitmq.password").String()
	vhost := g.Cfg().MustGet(gctx.New(), "rabbitmq.vhost").String()
        
	//初始化rabbitmq
	mq := rabbitmq.GetRabbitMQ()
	
	//连接rabbitmq
	err := mq.Conn(host, port, user, pwd, vhost)
	if err != nil {
		glog.Errorf(gctx.New(), "rabbitmq connect fail%s", err)
	}

    //创建交换机跟队列
    if err = mq.ExchangeQueueCreate(map[rabbitmq.ExchangeName]*rabbitmq.Exchange{
        "test_exchange1": { // 交换机名字,自己定义的
            BindQueues: map[rabbitmq.QueueName]*rabbitmq.Queue{
                "test_queue1": {}, //队列名字自己定义
            },
        },
    }); err != nil {
        panic(err)
    } else {
        fmt.Println("ExchangeQueueCreate success")
    }
监听消费端
	go func() {
		_ = m.RegisterConsumer("", &Consumer{
			QueueName: "test_queue1", //队列名字
			ConsumeFunc: func(msg []byte) error {
				fmt.Printf("消费端接受到数据: %s,时间为: %s\n", string(msg), time.Now().Format("2006-01-02 15:04:05"))
				return nil
			},
		})
	}()
	select {}

生产端

    //获取配置文件
	host := g.Cfg().MustGet(gctx.New(), "rabbitmq.host").String()
	port := g.Cfg().MustGet(gctx.New(), "rabbitmq.port").Int()
	user := g.Cfg().MustGet(gctx.New(), "rabbitmq.user").String()
	pwd := g.Cfg().MustGet(gctx.New(), "rabbitmq.password").String()
	vhost := g.Cfg().MustGet(gctx.New(), "rabbitmq.vhost").String()
        
	//初始化rabbitmq
	mq := rabbitmq.GetRabbitMQ()
	
	//连接rabbitmq
	err := mq.Conn(host, port, user, pwd, vhost)
	if err != nil {
		glog.Errorf(gctx.New(), "rabbitmq connect fail%s", err)
	}
发送普通消息 (注意,这种发送的消息是普通消息.是不会有延时队列效果的)
    mq := rabbitmq.GetRabbitMQ()
    err = mq.SendToQueue("test_queue1", map[string]interface{}{
        "id": i + 1,
    })
发送延时消息
	mq := rabbitmq.GetRabbitMQ()
	err := mq.SendToQueueDelay("test_queue1", 5*time.Minute, map[string]interface{}{
        "id": i + 1,
    })
	if err != nil {
		return err
	}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetRabbitMQ

func GetRabbitMQ() *rabbitMQ

Types

type ConsumeFunc

type ConsumeFunc func(msg []byte) error

type Consumer

type Consumer struct {
	QueueName   QueueName   // 队列名称
	ConsumeFunc ConsumeFunc // 消费函数
}

type Exchange

type Exchange struct {
	ExchangeType string // 交换机类型
	// 交换机绑定的队列列表,如果有延迟时间,则生成 队列名_(秒)s_transfer,
	// 如果修改延迟时间,则生成新的队列并绑定,然后解绑旧的队列,旧队列需要确认消费完毕后手动删除
	BindQueues map[QueueName]*Queue
}

交换机

type ExchangeName

type ExchangeName string

type Queue

type Queue struct {
	RoutingKey string // 路由键
}

队列

type QueueName

type QueueName string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL