ramqp

package module
v0.0.0-...-71dca7a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2020 License: MIT Imports: 7 Imported by: 0

README

Ramqp

Ramqp是对Go版本amqp的简单封装,提供了简洁明了的调用方式,并且支持断线重连。理论上支持所有实现了amqp协议的消息中间件,比如著名的RabbitMQ。

How to install

go get -u -v github.com/ijkbytes/ramqp

How to use

下面的例子展示了如何使用Ramqp:

...

msg := make(chan string)

mq := ramqp.New("amqp://test:test@127.0.0.1:5672/test")
receiver := &ramqp.Receiver{
        ExchangeName: "exchange_name",
        ExchangeType: "topic",
        QueueName:    "test_queue",
        RouteKey:     "test_queue",
        OnReceive:    func(msg *amqp.Delivery) bool {
            msg <- string(msg.Body)
            return true
        },
}
publisher = &rabbitmq.Publisher{}

// register receiver and enable auto ack
mq.RegisterReceiver(&ReceiverA{}, ramqp.WithConsumeAutoAck())
// register publisher
mq.RegisterPubliser(publisher)
if err := mq.Start(); err != nil {
    panic(err)
}

publisher.Publish("exchange_name", "test_queue", amqp.Publishing{
	Body: []byte("this is a test."),
})

fmt.Println("receive msg: ", <-msg)

...

Features

  • TCP断线自动重连
  • 多通道共用一条TCP连接
  • 通道异常重连
  • 队列被删除自动重建
  • 实现消费者
  • 实现生产者

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Opt

type Opt func(*Receiver)

func WithBindArgs

func WithBindArgs(args amqp.Table) Opt

func WithBindNoWait

func WithBindNoWait() Opt

func WithConsumeArgs

func WithConsumeArgs(args amqp.Table) Opt

func WithConsumeAutoAck

func WithConsumeAutoAck() Opt

func WithConsumeExclusive

func WithConsumeExclusive() Opt

func WithConsumeNoLocal

func WithConsumeNoLocal() Opt

func WithConsumeNoWait

func WithConsumeNoWait() Opt

func WithExchangeArgs

func WithExchangeArgs(args amqp.Table) Opt

func WithExchangeAutoDel

func WithExchangeAutoDel() Opt

func WithExchangeDurable

func WithExchangeDurable() Opt

func WithExchangeInternal

func WithExchangeInternal() Opt

func WithExchangeNoWait

func WithExchangeNoWait() Opt

func WithPrefetchCount

func WithPrefetchCount(c int) Opt

func WithPrefetchSize

func WithPrefetchSize(s int) Opt

func WithQueueArgs

func WithQueueArgs(args amqp.Table) Opt

func WithQueueAutoDel

func WithQueueAutoDel() Opt

func WithQueueDurable

func WithQueueDurable() Opt

func WithQueueExclusive

func WithQueueExclusive() Opt

func WithQueueNoWait

func WithQueueNoWait() Opt

type POpt

type POpt func(p *Publisher)

func WithPExchangeArgs

func WithPExchangeArgs(args amqp.Table) POpt

func WithPExchangeAutoDel

func WithPExchangeAutoDel() POpt

func WithPExchangeDurable

func WithPExchangeDurable() POpt

func WithPExchangeInternal

func WithPExchangeInternal() POpt

func WithPExchangeNoWait

func WithPExchangeNoWait() POpt

func WithPMandatory

func WithPMandatory() POpt

type Publisher

type Publisher struct {
	ExchangeType string
	ExchangeName string
	NotifyReturn func(msg amqp.Return)
	// contains filtered or unexported fields
}

func (*Publisher) Publish

func (p *Publisher) Publish(exchange, key string, msg amqp.Publishing) (err error)

type Ramqp

type Ramqp struct {
	// contains filtered or unexported fields
}

func New

func New(urls string) *Ramqp

supports multiple urls, separated by commas. For example: "amqp://guest:guest@10.0.1.21:5672, amqp://guest:guest@10.0.1.22:5672"

func (*Ramqp) RegisterPublisher

func (mq *Ramqp) RegisterPublisher(pub *Publisher, options ...POpt) error

func (*Ramqp) RegisterReceiver

func (mq *Ramqp) RegisterReceiver(recv *Receiver, options ...Opt) error

func (*Ramqp) Start

func (mq *Ramqp) Start() error

func (*Ramqp) Stop

func (mq *Ramqp) Stop()

type Receiver

type Receiver struct {
	ExchangeType string
	ExchangeName string
	// The QueueName may be empty, in which case the server will
	// generate a unique name
	QueueName string
	RouteKey  string
	// OnReceive can not be nil, it will be invoke when receive msg
	OnReceive func(*amqp.Delivery) bool
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL