cAmqp

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2024 License: MIT Imports: 9 Imported by: 1

Documentation

Index

Constants

View Source
const (
	AmqpConfigName = "AmqpConf"
)

Variables

View Source
var Component = &AmqpComponent{}

Functions

func Confirm

func Confirm(ctx *gin.Context, data amqp091.Delivery, ack AckType)

func Consume

func Consume(ctx *gin.Context, option ConsumeOption) (err error)

func Produce

func Produce(ctx *gin.Context, option ProduceOption) (err error)

Types

type AckType

type AckType uint8
const (
	ACK AckType = iota
	NACK
	REQUEUE
	DROP
)

type AmqpComponent added in v0.3.3

type AmqpComponent struct{}

func (*AmqpComponent) Inject added in v0.3.3

func (i *AmqpComponent) Inject(instance any) bool

func (*AmqpComponent) InjectConf added in v0.4.0

func (i *AmqpComponent) InjectConf(config cComponents.ConfigInterface) bool

func (*AmqpComponent) Listen added in v0.4.0

func (i *AmqpComponent) Listen() []*cComponents.ConfigListener

func (*AmqpComponent) Load added in v0.3.3

func (i *AmqpComponent) Load()

type AmqpConf

type AmqpConf struct {
	Connections map[string]*AmqpConf_Connection `json:"connections"`
	Drivers     map[string]*AmqpConf_Driver     `json:"drivers"`
}

func (*AmqpConf) ConfigName

func (i *AmqpConf) ConfigName() string

type AmqpConf_Connection

type AmqpConf_Connection struct {
	Host      string `json:"host"`
	Port      int64  `json:"port"`
	Username  string `json:"username"`
	Password  string `json:"password"`
	VHost     string `json:"vhost"`
	Heartbeat int64  `json:"heartbeat"`
}

type AmqpConf_Driver

type AmqpConf_Driver struct {
	Connection   string           `json:"connection"`
	ExchangeName string           `json:"exchange_name"`
	ExchangeType ExchangeTypeName `json:"exchange_type"`
	QueueName    string           `json:"queue_name"`
	RoutingKey   string           `json:"routing_key"`
}

type AmqpContainer added in v0.4.0

type AmqpContainer struct {
	// contains filtered or unexported fields
}

type AmqpError

type AmqpError string
const (
	AmqpConfDriverNotFoundErr     AmqpError = "AmqpConf driver 配置不存在"
	AmqpConfDriverNameNotFoundErr AmqpError = "AmqpConf driver name 不存在"
	AmqpConfConnectionNotFoundErr AmqpError = "AmqpConf connection 配置不存在"
	AmqpChannelCloseErr           AmqpError = "amqp channel 已关闭"

	AckTypeErr AmqpError = "ack 类型错误"
)

func (AmqpError) Error

func (e AmqpError) Error() string

type BindHandlerFunc

type BindHandlerFunc func(ctx *gin.Context, status bool, params amqp091.Table)

type BindOption

type BindOption struct {
	NoWait    bool          `json:"no_wait"`
	Arguments amqp091.Table `json:"arguments"`
}

type ConnectionType

type ConnectionType string
const (
	ConnectionType_Producer ConnectionType = "producer"
	ConnectionType_Consumer ConnectionType = "consumer"
)

type ConsumeOption

type ConsumeOption struct {
	// UUID        string           `json:"uuid"`         // 唯一标识,用于判断是否独立链接
	DriverName  string           `json:"driver_name"`  // AmqpConf 中的 drivers 配置名
	DriverConf  *AmqpConf_Driver `json:"driver_conf"`  // 默认的 channel 配置,当次配置存在是优先使用此配置,否则取 AmqpConf 中的 channel 配置
	Exchange    ExchangeOption   `json:"exchange"`     // 定义 exchange 初始化参数
	Queue       QueueOption      `json:"queue"`        // 定义 queue 初始化参数
	Bind        BindOption       `json:"bind"`         // 定义 bind 方法初始化参数
	ConsumeType ConsumeType      `json:"consume_type"` // 是否使用逐条读取的方式进行消费
	BatchSize   int64            `json:"batch_size"`   // 批量消费数量
	WaitTime    int64            `json:"wait_time"`    // 等待消费间隔时间,队列取空后/判断为暂停时/读取异常时,开始计时
	WaitCount   int64            `json:"wait_count"`   // 允许队列取空次数(有积压时不算取空),默认 0 次,大于该值,则执行消费或销毁队列
	Tag         string           `json:"tag"`          // 消费参数
	AutoAck     bool             `json:"auto_ack"`     // 消费参数
	Exclusive   bool             `json:"exclusive"`    // 消费参数
	NoLocal     bool             `json:"no_local"`     // 消费参数
	NoWait      bool             `json:"no_wait"`      // 消费参数
	Arguments   amqp091.Table    `json:"arguments"`    // 消费参数
	Params      amqp091.Table    `json:"params"`       // 创建消费者时,自定义参数,用于各类 handler 处理

	Handler       MessageHandlerFunc `json:"-"` // 队列消息处理方法
	PauseHandler  PauseHandlerFunc   `json:"-"` // 判断队列是否暂停消费,return true 为暂停消费,return false 为继续消费
	StartHandler  ParamsHandlerFunc  `json:"-"` // 开始消费处理方法
	FinishHandler ParamsHandlerFunc  `json:"-"` // 结束消费处理方法
	BindHandler   BindHandlerFunc    `json:"-"` // bind 执行后处理方法
}

type ConsumeType

type ConsumeType uint8
const (
	ConsumeType_BasicConsume ConsumeType = iota
	ConsumeType_BasicGet
)

type ExchangeOption

type ExchangeOption struct {
	Durable     bool          `json:"durable"`
	AutoDeleted bool          `json:"auto_deleted"`
	Internal    bool          `json:"internal"`
	NoWait      bool          `json:"no_wait"`
	Arguments   amqp091.Table `json:"arguments"`
}

type ExchangeTypeName

type ExchangeTypeName string
const (
	EXCHANGE_TYPE_DIRECT  ExchangeTypeName = "direct"
	EXCHANGE_TYPE_FANOUT  ExchangeTypeName = "fanout"
	EXCHANGE_TYPE_TOPIC   ExchangeTypeName = "topic"
	EXCHANGE_TYPE_HEADERS ExchangeTypeName = "headers"
)

type MessageHandlerFunc

type MessageHandlerFunc func(ctx *gin.Context, delivery []amqp091.Delivery)

type ParamsHandlerFunc

type ParamsHandlerFunc func(ctx *gin.Context, params amqp091.Table) error

type PauseHandlerFunc

type PauseHandlerFunc func(ctx *gin.Context, params amqp091.Table) bool

type ProduceOption

type ProduceOption struct {
	// UUID       string             `json:"uuid"` // 唯一标识,用于判断是否独立链接
	DriverName string             `json:"driver_name"`
	DriverConf *AmqpConf_Driver   `json:"driver_conf"`
	Exchange   ExchangeOption     `json:"exchange"`  // 定义 exchange 初始化参数
	Queue      QueueOption        `json:"queue"`     // 定义 queue 初始化参数
	Bind       BindOption         `json:"bind"`      // 定义 bind 方法初始化参数
	OnlyPush   bool               `json:"only_push"` // 是否只推送消息,不定义queue
	Mandatory  bool               `json:"mandatory"`
	Immediate  bool               `json:"immediate"`
	Message    amqp091.Publishing `json:"message"`
}

type QueueOption

type QueueOption struct {
	Durable     bool          `json:"durable"`
	AutoDeleted bool          `json:"auto_deleted"`
	Exclusive   bool          `json:"exclusive"`
	NoWait      bool          `json:"no_wait"`
	Arguments   amqp091.Table `json:"arguments"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL