kelleyRabbimqPool

package module
v1.0.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2023 License: GPL-2.0 Imports: 13 Imported by: 1

README

rabbitmq 连接池channel复用

开发语言 golang 依赖库

go get -u gitee.com/tym_hmm/rabbitmq-pool-go

go get -u gitee.com/tym_hmm/rabbitmq-pool-go

已在线上生产环镜运行, 5200W请求 qbs 3000 时, 连接池显示无压力
rabbitmq部署为线上集群

接下来的功能,预计在1.0.15版本

1增加批次消息处理,用以提高生产及消费的吞吐量

功能说明

  1. 自定义连接池大小及最大处理channel数
  2. 消费者底层断线自动重连
  3. 生产者底层断线自动重连 v1.0.12
  4. 底层使用轮循方式复用tcp
  5. 生产者每个tcp对应一个channel,防止channel写入阻塞造成内存使用过量
  6. 支持rabbitmq exchangeType
  7. 默认交换机、队列、消息都会持久化磁盘
  8. 默认值
名称 说明
tcp最大连接数 5
生产者消费发送失败最大重试次数 5
消费者最大channel信道数(每个连接自动平分) 100(每个tcp10个)

使用

  1. 初始化
var oncePool sync.Once
var instanceRPool *kelleyRabbimqPool.RabbitPool
func initrabbitmq() *kelleyRabbimqPool.RabbitPool {
	oncePool.Do(func() {
        //初始化生产者
		instanceRPool = kelleyRabbimqPool.NewProductPool()
        //初始化消费者
	    instanceConsumePool = kelleyRabbimqPool.NewConsumePool()
        //使用默认虚拟host "/"
		err := instanceRPool.Connect("192.168.1.202", 5672, "guest", "guest")
        //使用自定义虚
        //err:=instanceConsumePool.ConnectVirtualHost("192.168.1.202", 5672, "guest", "guest", "/testHost")
		if err != nil {
			fmt.Println(err)
		}
	})
	return instanceRPool
}
  1. 生产者
var wg sync.WaitGroup
	for i:=0;i<100000; i++ {
		wg.Add(1)
		go func(num int) {
			defer wg.Done()
			data:=kelleyRabbimqPool.GetRabbitMqDataFormat("testChange5", kelleyRabbimqPool.EXCHANGE_TYPE_TOPIC, "textQueue5", "/", fmt.Sprintf("这里是数据%d", num))
			_=instanceRPool.Push(data)
		}(i)
	}
	wg.Wait()
  1. 消费者

可定义多个消息者事件, 不通交换机, 队列, 路由

每个事件独立

nomrl := &rabbitmq.ConsumeReceive{
        #定义消费者事件
        ExchangeName: "testChange31",//队列名称
        ExchangeType: kelleyRabbimqPool.EXCHANGE_TYPE_DIRECT,
        Route:        "",
        QueueName:    "testQueue31",
        IsTry:true,//是否重试
        IsAutoAck: false, //是否自动确认消息
        MaxReTry: 5,//最大重试次数
        EventFail: func(code int, e error, data []byte) {
        	fmt.Printf("error:%s", e)
        },
        /***
         * 参数说明
         * @param data []byte 接收的rabbitmq数据
         * @param header map[string]interface{} 原rabbitmq header
         * @param retryClient RabbitmqPool.RetryClientInterface 自定义重试数据接口,重试需return true 防止数据重复提交
         ***/
        EventSuccess: func(data []byte, header map[string]interface{},retryClient kelleyRabbimqPool.RetryClientInterface)bool {//如果返回true 则无需重试
            _ = retryClient.Ack()//确认消息    	
            fmt.Printf("data:%s\n", string(data))
        	return true
        },
	}
	instanceConsumePool.RegisterConsumeReceive(nomrl)

	err := instanceConsumePool.RunConsume()
	if err != nil {
		fmt.Println(err)
	}
  • 参数说明
名称 类型 说明
ExchangeName string 交换机名称
ExchangeType string 交换机类型:
EXCHANGE_TYPE_FANOUT
EXCHANGE_TYPE_DIRECT
EXCHANGE_TYPE_TOPIC
Route string 路由键
QueueName string 队列名称
IsTry bool 是否重试
如果开启重试后, 在成功回调用返回true会对消息进行重试, 重试时间为 5000~15000 MS
IsAutoAck bool 是否自动确认消息, true: 组件底层会自动对消息进行确认
false: 手动进行消息确认,在成功会调中需进行手动确认 _ = retryClient.Ack()
MaxReTry int 重试最大次数s, 需isTry为true
EventFail func 失败回调
EventSuccess func 成功回调
  1. 错误码说明

错误码为

  1. 生产者push时返回的 *RabbitMqError
  2. 消费者事件监听回返的 code
错误码 说明
501 生产者发送超过最大重试次数
502 获取信道失败, 一般为认道队列数用尽
503 交换机/队列/绑定失败
504 连接失败
506 信道创建失败
507 超过最大重试次数

Documentation

Index

Constants

View Source
const (
	DEFAULT_MAX_CONNECTION      = 5  //rabbitmq tcp 最大连接数
	DEFAULT_MAX_CONSUME_CHANNEL = 25 //最大消费channel数(一般指消费者)
	DEFAULT_MAX_CONSUME_RETRY   = 5  //消费者断线重连最大次数
	DEFAULT_PUSH_MAX_TIME       = 5  //最大重发次数

	DEFAULT_MAX_PRODUCT_RETRY = 5 //生产者断线重连最大次数

	//轮循-连接池负载算法
	LOAD_BALANCE_ROUND = 1
)
View Source
const (
	RABBITMQ_TYPE_PUBLISH = 1 //生产者
	RABBITMQ_TYPE_CONSUME = 2 //消费者

	DEFAULT_RETRY_MIN_RANDOM_TIME = 5000 //最小重试时间机数

	DEFAULT_RETRY_MAX_RADNOM_TIME = 15000 //最大重试时间机数

)
View Source
const (
	EXCHANGE_TYPE_FANOUT = "fanout" //  Fanout:广播,将消息交给所有绑定到交换机的队列
	EXCHANGE_TYPE_DIRECT = "direct" //Direct:定向,把消息交给符合指定routing key 的队列
	EXCHANGE_TYPE_TOPIC  = "topic"  //Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
)
View Source
const (
	RCODE_PUSH_MAX_ERROR                    = 501 //发送超过最大重试次数
	RCODE_GET_CHANNEL_ERROR                 = 502 //获取信道失败
	RCODE_CHANNEL_QUEUE_EXCHANGE_BIND_ERROR = 503 //交换机/队列/绑定失败
	RCODE_CONNECTION_ERROR                  = 504 //连接失败
	RCODE_PUSH_ERROR                        = 505 //消息推送失败
	RCODE_CHANNEL_CREATE_ERROR              = 506 //信道创建失败
	RCODE_RETRY_MAX_ERROR                   = 507 //超过最大重试次数

)

* 错误码

Variables

View Source
var (
	ACK_DATA_NIL = errors.New("ack data nil")
)
View Source
var UTFALL_SECOND = "2006-01-02 15:04:05"

Functions

func RandomAround

func RandomAround(min, max int64) (int64, error)

func RandomNum

func RandomNum(length int) string

* 随机数 @param int length 生成长度

Types

type ChannelQueue

type ChannelQueue struct {
	// contains filtered or unexported fields
}

func NewChannelQueue

func NewChannelQueue() *ChannelQueue

func (*ChannelQueue) Add

func (q *ChannelQueue) Add(data *rChannel)

func (*ChannelQueue) Count

func (q *ChannelQueue) Count() int32

func (*ChannelQueue) Pop

func (q *ChannelQueue) Pop() (*rChannel, bool)

type ConsumeReceive

type ConsumeReceive struct {
	ExchangeName string                                                                                  //交换机
	ExchangeType string                                                                                  //交换机类型
	Route        string                                                                                  //路由
	QueueName    string                                                                                  //队列名称
	EventSuccess func(data []byte, header map[string]interface{}, retryClient RetryClientInterface) bool //成功事件回调
	EventFail    func(int, error, []byte)                                                                //失败回调

	IsTry     bool  //是否重试
	MaxReTry  int32 //最大重式次数
	IsAutoAck bool  //是否自动确认
}

* 消费者注册接收数据

type Node

type Node struct {
	// contains filtered or unexported fields
}

* rabbitMq 通用队列

type RabbitLoadBalance

type RabbitLoadBalance struct {
}

* 连接负载处理

func NewRabbitLoadBalance

func NewRabbitLoadBalance() *RabbitLoadBalance

func (*RabbitLoadBalance) RoundRobin

func (r *RabbitLoadBalance) RoundRobin(cIndex, max int32) int32

* 负载均衡 轮循

type RabbitMqData

type RabbitMqData struct {
	ExchangeName string //交换机名称
	ExchangeType string //交换机类型 见RabbitmqPool.go 常量
	QueueName    string //队列名称
	Route        string //路由
	Data         string //发送数据
}

* 发送数据 消息发送

func GetRabbitMqDataFormat

func GetRabbitMqDataFormat(exChangeName string, exChangeType string, queueName string, route string, data string) *RabbitMqData

* 获取发送数据模板 @param exChangeName 交换机名称 @param exChangeType 交换机类型 @param queueName string 队列名称 @param route string 路由 @param data string 发送的数据

func GetRabbitMqDataFormatExpire

func GetRabbitMqDataFormatExpire(exChangeName string, exChangeType string, queueName string, route string, data string) *RabbitMqData

* 获取发送数据模板 过期设置(死信队列) @param exChangeName 交换机名称 @param exChangeType 交换机类型 @param queueName string 队列名称 @param route string 路由 @param data string 发送的数据

type RabbitMqError

type RabbitMqError struct {
	Code    int
	Message string
	Detail  string
}

* 错误返回

func NewRabbitMqError

func NewRabbitMqError(code int, message string, detail string) *RabbitMqError

func (RabbitMqError) Error

func (e RabbitMqError) Error() string

type RabbitPool

type RabbitPool struct {
	// contains filtered or unexported fields
}

func NewConsumePool

func NewConsumePool() *RabbitPool

* 初始化消费者

func NewProductPool

func NewProductPool() *RabbitPool

* 初始化生产者

func (*RabbitPool) Connect

func (r *RabbitPool) Connect(host string, port int, user string, password string) error

* 连接rabbitmq @param host string 服务器地址 @param port int 服务端口 @param user string 用户名 @param password 密码

func (*RabbitPool) ConnectVirtualHost added in v1.0.10

func (r *RabbitPool) ConnectVirtualHost(host string, port int, user string, password string, virtualHost string) error

* 自定义虚拟机连接 @param host string 服务器地址 @param port int 服务端口 @param user string 用户名 @param password 密码 @param virtualHost虚拟机路径

func (*RabbitPool) GetHost

func (r *RabbitPool) GetHost() string

func (*RabbitPool) GetPort

func (r *RabbitPool) GetPort() int

func (*RabbitPool) Push

func (r *RabbitPool) Push(data *RabbitMqData) *RabbitMqError

* 发送消息

func (*RabbitPool) RegisterConsumeReceive

func (r *RabbitPool) RegisterConsumeReceive(consumeReceive *ConsumeReceive)

* 注册消费接收

func (*RabbitPool) RunConsume

func (r *RabbitPool) RunConsume() error

* 消费者

func (*RabbitPool) SetConnectionBalance

func (r *RabbitPool) SetConnectionBalance(balance int)

* 设置连接池负载算法 默认轮循

func (*RabbitPool) SetMaxConnection

func (r *RabbitPool) SetMaxConnection(maxConnection int32)

* 设置最大连接数

func (*RabbitPool) SetMaxConsumeChannel

func (r *RabbitPool) SetMaxConsumeChannel(maxConsume int32)

* 设置消费者最大信道数

func (*RabbitPool) SetRandomRetryTime

func (r *RabbitPool) SetRandomRetryTime(min, max int64)

* 设置随时重试时间 避免同一时刻一次重试过多

type RetryClientInterface added in v1.0.6

type RetryClientInterface interface {
	Push(pushData []byte) *RabbitMqError
	Ack() error
}

type RetryTool added in v1.0.6

type RetryTool struct {
	// contains filtered or unexported fields
}

type RetryToolInterface added in v1.0.6

type RetryToolInterface interface {
	// contains filtered or unexported methods
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL