rabbitmq

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2023 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// StateInit 初始化状态
	StateInit int8 = iota + 1

	// StateActive 活跃状态
	StateActive

	// StateRetry 重试状态
	StateRetry

	// StateClosed 关闭状态
	StateClosed
)

Variables

View Source
var (
	// ErrClientClosed 客户端已关闭
	ErrClientClosed = errors.New("client is closed")

	// ErrClientNotClosed 客户端不处于关闭状态
	ErrClientNotClosed = errors.New("client isn't closed")

	// ErrConnectionCreateRequired 需要创建连接
	ErrConnectionCreateRequired = errors.New("connection is required to create")
)
View Source
var (
	// ErrConsumerNotInit 消费者不处于初始状态
	ErrConsumerNotInit = errors.New("consumer isn't in init stratus")

	// ErrConsumerClosed 消费者已关闭
	ErrConsumerClosed = errors.New("consumer is closed")

	// ErrChannelCreateRequired 需要创建 CHANNEL
	ErrChannelCreateRequired = errors.New("channel is required to create")
)

Functions

This section is empty.

Types

type Binding

type Binding struct {
	Queue    *Queue
	Exchange *Exchange
	Key      string
	NoWait   bool
	Args     amqp.Table
}

Binding 绑定

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client 客户端

func NewClient

func NewClient(dsn string, options ...ClientOption) *Client

NewClient @desc 创建客户端实例 @param dsn string @param options []ClientOption @return *Client

func (*Client) Channel

func (c *Client) Channel() (*amqp.Channel, error)

Channel @desc 获取 CHANNEL @receiver c *Client @return *amqp.Channel @return error

func (*Client) Close

func (c *Client) Close()

Close @desc 关闭客户端 @receiver c *Client

func (*Client) Connection

func (c *Client) Connection() (*amqp.Connection, error)

Connection @desc 获取连接 @receiver c *Client @return *amqp.Connection @return error

func (*Client) Declare

func (c *Client) Declare(declaration Declaration) error

Declare @desc 声明 @receiver c *Client @param declaration Declaration @return error

func (*Client) Open

func (c *Client) Open() error

Open @desc 开启 @receiver c *Client @return error

func (*Client) State

func (c *Client) State() int8

State @desc 获取客户端状态 @receiver c *Client @return int8

func (*Client) Store

func (c *Client) Store() error

Store @desc 恢复 @receiver c *Client @return error

type ClientOption

type ClientOption func(*Client)

ClientOption 客户端选项

func WithClientChannelMax

func WithClientChannelMax(channelMax int) ClientOption

WithClientChannelMax @desc 配置最大 CHANNEL 数量 @param channelMax int @return ClientOption

func WithClientHeartbeat

func WithClientHeartbeat(heartbeat time.Duration) ClientOption

WithClientHeartbeat @desc 配置心跳间隔 @param heartbeat time.Duration @return ClientOption

func WithClientRetryInterval

func WithClientRetryInterval(retryInterval time.Duration) ClientOption

WithClientRetryInterval @desc 配置重连间隔 @param retryInterval time.Duration @return ClientOption

func WithClientVhost

func WithClientVhost(vhost string) ClientOption

WithClientVhost @desc 配置虚拟主机 @param vhost string @return ClientOption

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer 消费者

func NewConsumer

func NewConsumer(client *Client, queues []string, options ...ConsumerOption) *Consumer

NewConsumer @desc 创建消费者 @param client *Client @param queues []string @param options []ConsumerOption @return *Consumer

func (*Consumer) Close

func (c *Consumer) Close()

Close @desc 关闭 @receiver c *Consumer

func (*Consumer) DataChan

func (c *Consumer) DataChan() <-chan amqp.Delivery

DataChan @desc 获取数据传输通道 @receiver c *Consumer @return <-chan

func (*Consumer) Serve

func (c *Consumer) Serve() error

Serve @desc 运行 @receiver c *Consumer @return error

func (*Consumer) State

func (c *Consumer) State() int8

State @desc 获取状态 @receiver c *Consumer @return int8

type ConsumerOption

type ConsumerOption func(*Consumer)

ConsumerOption 消费者选项

func WithConsumerArgs

func WithConsumerArgs(args amqp.Table) ConsumerOption

WithConsumerArgs @desc 配置额外选项 @param args amqp.Table @return ConsumerOption

func WithConsumerAutoACK

func WithConsumerAutoACK() ConsumerOption

WithConsumerAutoACK @desc 配置自动 ACK @return ConsumerOption

func WithConsumerAutoTag

func WithConsumerAutoTag() ConsumerOption

WithConsumerAutoTag @desc 自动配置消费者标签 @return ConsumerOption

func WithConsumerExclusive

func WithConsumerExclusive() ConsumerOption

WithConsumerExclusive @desc 配置独占使用 @return ConsumerOption

func WithConsumerNoLocal

func WithConsumerNoLocal() ConsumerOption

WithConsumerNoLocal @desc 配置 NO-LOCAL @return ConsumerOption

func WithConsumerNoWait

func WithConsumerNoWait() ConsumerOption

WithConsumerNoWait @desc 配置 NO-WAIT @return ConsumerOption

func WithConsumerQOS

func WithConsumerQOS(qos int) ConsumerOption

WithConsumerQOS @desc 配置 QOS(PRE-FETCH) @param qos int @return ConsumerOption

func WithConsumerTag

func WithConsumerTag(tags ...string) ConsumerOption

WithConsumerTag @desc 配置消费者标签 @param tags string @return ConsumerOption

type Declaration

type Declaration func(Declarer) error

Declaration 声明器

func DeclareBinding

func DeclareBinding(b *Binding) Declaration

DeclareBinding @desc 声明绑定 @param b *Binding @return Declaration

func DeclareExchange

func DeclareExchange(e *Exchange) Declaration

DeclareExchange @desc 声明交换机 @param e *Exchange @return Declaration

func DeclareQueue

func DeclareQueue(q *Queue) Declaration

DeclareQueue @desc 声明队列 @param q *Queue @return Declaration

type Declarer

type Declarer interface {
	// ExchangeDeclare 声明交换机
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error

	// QueueDeclare 声明队列
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)

	// QueueBind 绑定队列
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
}

Declarer 声明接口

type Exchange

type Exchange struct {
	Name       string
	Type       string
	Durable    bool
	AutoDelete bool
	NoWait     bool
	Args       amqp.Table
}

Exchange 交换机

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer 生产者

func NewProducer

func NewProducer(client *Client, exchange, routingKey string, options ...ProducerOption) *Producer

NewProducer @desc 创建生产者 @param client *Client @param exchange string @param routingKey string @param options []ProducerOption @return *Producer

func (*Producer) Publish

func (p *Producer) Publish(body []byte, routingKeys ...string) error

Publish @desc 发布 @receiver p *Producer @param body []byte @param routingKeys []string @return error

func (*Producer) PublishFullData

func (p *Producer) PublishFullData(data amqp.Publishing, routingKeys ...string) error

PublishFullData @desc 发布独立数据 @receiver p *Producer @param data amqp.Publishing @param routingKeys []string @return error

func (*Producer) PublishFullDataMulti added in v1.1.0

func (p *Producer) PublishFullDataMulti(data []amqp.Publishing, routingKeys ...string) error

PublishFullDataMulti @desc 批量发布独立数据 @receiver p *Producer @param data []amqp.Publishing @param routingKeys []string @return error

func (*Producer) PublishMulti added in v1.1.0

func (p *Producer) PublishMulti(bodies [][]byte, routingKeys ...string) error

PublishMulti @desc 批量发布 @receiver p *Producer @param bodies [][]byte @param routingKeys []string @return error

type ProducerOption

type ProducerOption func(*Producer)

ProducerOption 生产者选项

func WithProducerImmediate

func WithProducerImmediate(immediate bool) ProducerOption

WithProducerImmediate @desc 配置 IMMEDIATE @param immediate bool @return ProducerOption

func WithProducerMandatory

func WithProducerMandatory(mandatory bool) ProducerOption

WithProducerMandatory @desc 配置 MANDATORY @param mandatory bool @return ProducerOption

func WithProducerTemplate

func WithProducerTemplate(template amqp.Publishing) ProducerOption

WithProducerTemplate @desc 配置消息模板 @param template amqp.Publishing @return ProducerOption

type Queue

type Queue struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
}

Queue 队列

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL