rabbitgo

package module
v0.0.0-...-e2f473d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2022 License: Apache-2.0 Imports: 9 Imported by: 0

README

rabbitgo

基于golang的rabbitmq连接池

安装rabbitgo

go get github.com/q752115847/rabbitgo

如何使用rabbitgo?
初始化全局连接池对象
var rabbit *rabbitgo.RabbitPool

func init(){
    rabbit = rabbitgo.New(fmt.Sprintf("amqp://%s:%s@%s:%d/%s", "guest", "guest", "127.0.0.1", 5672, ""),
    Config{
        ConnectionMax: 5,
	ChannelMax:    10,
        ChannelActive: 20,
        ChannelIdle:   10,
    })
    //设置日志打印级别,默认rabbitgo.LOG_DEBUG
    rabbit.SetLevel(rabbitgo.LOG_ERROR)
}

Sender
// 获取Channel对象
ch, err := rabbit.Get()
if err != nil {
    log.Printf("Get channel error, %s", err.Error())
    retrun err
}
// 重入channel池复用
defer rabbit.Push(ch)

queue, err := ch.Ch.QueueDeclare("test_queue", true, false, false, false, nil)
if err != nil {
    log.Printf("Queue declare error, %s", err.Error())
    return err
}

data := fmt.Sprintf("{\"code\":200,\"message\":\"success\",\"data\":\"%s\"}", time.Now().String())
err = ch.Ch.Publish(
        "",
        queue.Name,
        false,
        false,
        amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        ContentType:  "appliction/plain",
        Body:         []byte(data),
    })
if err != nil {
    log.Printf("Send message error, %s", err.Error())
    return err
}
Sender Confirm
// 获取Channel对象
ch, err := rabbit.Get()
if err != nil {
    log.Printf("Get channel error, %s", err.Error())
    retrun err
}
// 重入channel池复用
defer rabbit.Push(ch)

ch.Confirm(false)
defer func() {
    if confirmed := <-ch.NotifyConfirm; confirmed.Ack {
        // code when messages is confirmed
        t.Logf("Confirmed tag %d", confirmed.DeliveryTag)
    } else {
        // code when messages is nacked
        t.Logf("Nacked tag %d", confirmed.DeliveryTag)
    }
}()

queue, err := ch.Ch.QueueDeclare("test_queue", true, false, false, false, nil)
if err != nil {
    log.Printf("Queue declare error, %s", err.Error())
    return err
}

data := fmt.Sprintf("{\"code\":200,\"message\":\"success\",\"data\":\"%s\"}", time.Now().String())
err = ch.Ch.Publish(
        "",
        queue.Name,
        false,
        false,
        amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        ContentType:  "appliction/plain",
        Body:         []byte(data),
    })
if err != nil {
    log.Printf("Send message error, %s", err.Error())
    return err
}

Documentation

Index

Constants

View Source
const (
	LOG_DEBUG int = iota
	LOG_INFO
	LOG_WARN
	LOG_ERROR
)
View Source
const (
	LOG_PREFIX = "[rabbitgo]"
	LOG_FORMAT = log.LstdFlags
	LOG_LEVEL  = LOG_DEBUG
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel struct {
	ChId          int64
	T             time.Time
	Ch            *amqp.Channel
	NotifyConfirm chan amqp.Confirmation
	// contains filtered or unexported fields
}

func (*Channel) Confirm

func (c *Channel) Confirm(noWait bool) error

type Config

type Config struct {
	ConnectionMax int
	ChannelMax    int
	ChannelActive int
	ChannelIdle   int
	Health        time.Duration
	Timeout       time.Duration
	Heartbeat     time.Duration
}

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

type Logger

type Logger struct {
	LogI *log.Logger
	LogW *log.Logger
	LogD *log.Logger
	LogE *log.Logger
	// contains filtered or unexported fields
}

func (*Logger) Debug

func (l *Logger) Debug(v ...interface{})

func (*Logger) Debugf

func (l *Logger) Debugf(f string, v ...interface{})

func (*Logger) Error

func (l *Logger) Error(v ...interface{})

func (*Logger) Errorf

func (l *Logger) Errorf(f string, v ...interface{})

func (*Logger) Flag

func (l *Logger) Flag() int

func (*Logger) Info

func (l *Logger) Info(v ...interface{})

func (*Logger) Infof

func (l *Logger) Infof(f string, v ...interface{})

func (*Logger) Init

func (l *Logger) Init()

func (*Logger) Level

func (l *Logger) Level() int

func (*Logger) Prefix

func (l *Logger) Prefix() string

func (*Logger) SetFlag

func (l *Logger) SetFlag(flag int)

func (*Logger) SetLevel

func (l *Logger) SetLevel(level int)

func (*Logger) SetPrefix

func (l *Logger) SetPrefix(prefix string)

func (*Logger) Warn

func (l *Logger) Warn(v ...interface{})

func (*Logger) Warnf

func (l *Logger) Warnf(f string, v ...interface{})

type RabbitPool

type RabbitPool struct {
	// contains filtered or unexported fields
}

Rabbit connection pool

func New

func New(url string, config Config) *RabbitPool

func (*RabbitPool) Get

func (r *RabbitPool) Get() (*Channel, error)

Randomly return an available channel

func (*RabbitPool) Push

func (r *RabbitPool) Push(ch *Channel)

Put back into the connection pool

func (*RabbitPool) SetLevel

func (r *RabbitPool) SetLevel(level int)

Set log level

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL