reliablequeue

package
v1.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2024 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RecordStatusUnsuccess uint8 = 0
	RecordStatusSuccess   uint8 = 1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option struct {
	FirstDelaySecond         time.Duration // 发布时退避秒数
	RetryDelaySecondMultiple int64         // 重试退避倍数
}

type PublishParams

type PublishParams struct {
	LogID string
	Scene string
	Data  map[string]interface{}
}

PublishParams 发布消息方法参数

type ReliableMqMessage

type ReliableMqMessage struct {
	ID         uint64    `gorm:"column:id" json:"id"`                   // 主键id
	CreateUser string    `gorm:"column:create_user" json:"create_user"` // 创建方标识
	CreateTime time.Time `gorm:"column:create_time" json:"create_time"` // 创建时间
	UpdateUser string    `gorm:"column:update_user" json:"update_user"` // 更新方标识
	UpdateTime time.Time `gorm:"column:update_time" json:"update_time"` // 更新时间
	Version    uint      `gorm:"column:version" json:"version"`         // 版本号
	IsDel      uint8     `gorm:"column:is_del" json:"is_del"`           // 0-未删除,1-已删除
	Scene      string    `gorm:"column:scene" json:"scene"`             // 唯一消息scene
	SceneDesc  string    `gorm:"column:scene_desc" json:"scene_desc"`   // 描述信息
}

ReliableMqMessage 消息实体表

func (ReliableMqMessage) TableName

func (ReliableMqMessage) TableName() string

type ReliableMqMessageDistribute

type ReliableMqMessageDistribute struct {
	ID          uint64    `gorm:"column:id" json:"id"`                     // 主键id
	CreateUser  string    `gorm:"column:create_user" json:"create_user"`   // 创建方标识
	CreateTime  time.Time `gorm:"column:create_time" json:"create_time"`   // 创建时间
	UpdateUser  string    `gorm:"column:update_user" json:"update_user"`   // 更新方标识
	UpdateTime  time.Time `gorm:"column:update_time" json:"update_time"`   // 更新时间
	Version     uint      `gorm:"column:version" json:"version"`           // 版本号
	IsDel       uint8     `gorm:"column:is_del" json:"is_del"`             // 0-未删除,1-已删除
	MessageID   uint64    `gorm:"column:message_id" json:"message_id"`     // 关联message表主键id
	Scene       string    `gorm:"column:scene" json:"scene"`               // 关联message表scene
	ServiceName string    `gorm:"column:service_name" json:"service_name"` // service_name
	URL         string    `gorm:"column:uri" json:"uri"`                   // uri
	Method      string    `gorm:"column:method" json:"method"`             // http method
}

ReliableMqMessageDistribute 消息分发关联表

func (ReliableMqMessageDistribute) TableName

func (ReliableMqMessageDistribute) TableName() string

type ReliableMqMessageRecord

type ReliableMqMessageRecord struct {
	ID                  uint64    `gorm:"column:id" json:"id"`                                       // 主键id
	CreateUser          string    `gorm:"column:create_user" json:"create_user"`                     // 创建方标识
	CreateTime          time.Time `gorm:"column:create_time" json:"create_time"`                     // 创建时间
	UpdateUser          string    `gorm:"column:update_user" json:"update_user"`                     // 更新方标识
	UpdateTime          time.Time `gorm:"column:update_time" json:"update_time"`                     // 更新时间
	Version             uint      `gorm:"column:version" json:"version"`                             // 版本号
	IsDel               uint8     `gorm:"column:is_del" json:"is_del"`                               // 0-未删除,1-已删除
	MessageID           uint64    `gorm:"column:message_id" json:"message_id"`                       // 关联message表主键id
	MessageDistributeID uint64    `gorm:"column:message_distribute_id" json:"message_distribute_id"` // 关联reliable_mq_message_distribute表主键id
	LogID               string    `gorm:"column:log_id" json:"log_id"`                               // 消息产生的log_id
	UUID                string    `gorm:"column:uuid" json:"uuid"`                                   // 消息唯一id
	ServiceName         string    `gorm:"column:service_name" json:"service_name"`                   // service_name
	URL                 string    `gorm:"column:uri" json:"uri"`                                     // uri
	Method              string    `gorm:"column:method" json:"method"`                               // http method
	Body                string    `gorm:"column:body" json:"body"`                                   // 请求body
	Delay               int64     `gorm:"column:delay" json:"delay"`                                 // 重试间隔
	RetryTime           time.Time `gorm:"column:retry_time" json:"retry_time"`                       // 最后一次重试时间
	NextTime            time.Time `gorm:"column:next_time" json:"next_time"`                         // 下次重试时间
	IsSuccess           uint8     `gorm:"column:is_success" json:"is_success"`                       // 是否消费成功
}

ReliableMqMessageRecord 业务侧可靠消息表

func (ReliableMqMessageRecord) TableName

func (ReliableMqMessageRecord) TableName() string

type ReliableQueue

type ReliableQueue struct {
	// contains filtered or unexported fields
}

func NewReliableQueue

func NewReliableQueue(q queue.Queue, opts ...ReliableQueueOption) (*ReliableQueue, error)

func (*ReliableQueue) Publish

func (rq *ReliableQueue) Publish(ctx context.Context, tx *gorm.DB, msg PublishParams) (err error)

Publish 发布消息,注意此方法在本地事务最后一步调用,会自动提交事务

func (*ReliableQueue) Republish

func (rq *ReliableQueue) Republish(ctx context.Context, tx *gorm.DB) (err error)

Republish 重新发布,一般用离线任务调用

func (*ReliableQueue) Retry

func (rq *ReliableQueue) Retry(ctx context.Context, tx *gorm.DB, record ReliableMqMessageRecord) (err error)

Retry 消费失败退避重试

func (*ReliableQueue) SetSuccess

func (rq *ReliableQueue) SetSuccess(ctx context.Context, tx *gorm.DB, record ReliableMqMessageRecord) (err error)

SetSuccess 消费完成后标记成功

type ReliableQueueOption

type ReliableQueueOption func(*Option)

func WithFirstDelaySecond

func WithFirstDelaySecond(t time.Duration) ReliableQueueOption

func WithRetryDelaySecondMultiple

func WithRetryDelaySecondMultiple(i int64) ReliableQueueOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL