outbox

package module
v0.0.0-...-e2a6ffb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2022 License: MIT Imports: 15 Imported by: 0

README

outbox 发件箱模式(实验性支持)

基于EventBus,使用Go语言实现最终一致性的微服务分布式事务解决方案

Documentation

Index

Constants

View Source
const (
	// MessageHeaderMsgIDKey 消息ID
	MessageHeaderMsgIDKey = "nilorg.outbox.msg.id"
	// MessageHeaderMsgTopicKey 消息主题
	MessageHeaderMsgTopicKey = "nilorg.outbox.msg.topic"
	// MessageHeaderMsgTypeKey 消息内容类型
	MessageHeaderMsgTypeKey = "nilorg.outbox.msg.type"
	// MessageHeaderMsgSendTimeKey 消息发送时间
	MessageHeaderMsgSendTimeKey = "nilorg.outbox.msg.sendtime"
	// MessageHeaderMsgCallbackKey 消息回调
	MessageHeaderMsgCallbackKey = "nilorg.outbox.msg.callback"
)
View Source
const (
	// StatusNameFailed 失败
	StatusNameFailed = "failed"
	// StatusNameScheduled 列入计划
	StatusNameScheduled = "scheduled"
	// StatusNameSucceeded 成功
	StatusNameSucceeded = "succeeded"
)
View Source
const (
	// PublishedTableName ...
	PublishedTableName = "outbox_published"
	// ReceivedTableName ...
	ReceivedTableName = "outbox_received"
)
View Source
const (
	// CallbackTypePublished ...
	CallbackTypePublished = "Published"
	// CallbackTypeReceived ...
	CallbackTypeReceived = "Received"
)
View Source
const (
	// MessageVersion 版本
	MessageVersion = "v1"
)

Variables

View Source
var (
	// DefaultEngineOptions 默认选项
	DefaultEngineOptions = EngineOptions{
		FailedRetryInterval:        time.Minute,
		FailedRetryCount:           50,
		DataCleanInterval:          time.Hour,
		SucceedMessageExpiredAfter: 24 * time.Hour,
		SnowflakeNode:              1,
		Logger:                     &eventbus.StdLogger{},
	}
)

Functions

func DecodeValue

func DecodeValue(s string, v interface{}) (err error)

DecodeValue 对值进行解码

func EncodeValue

func EncodeValue(v interface{}) (s string, err error)

EncodeValue 对值进行编码

Types

type CommitMessage

type CommitMessage struct {
	Topic         string
	Value         interface{}
	CallbackTopic string
}

CommitMessage 提交message

type Engine

type Engine interface {
	Publisher
	Subscriber
	Begin(ctx context.Context, opts ...*sql.TxOptions) (tx Transactioner, err error)
	Transaction(ctx context.Context, h TransactionHandler, args ...*CommitMessage) (err error)
}

Engine ...

func New

func New(typ EngineType, v interface{}, eventBus eventbus.EventBus, options ...*EngineOptions) (engine Engine, err error)

New 创建

type EngineOptions

type EngineOptions struct {
	FailedRetryInterval        time.Duration                  // 失败重试间隔时间
	FailedRetryCount           int                            // 最大重试次数
	FailedThresholdCallback    FailedThresholdCallbackHandler // 重试阈值的失败回调
	SucceedMessageExpiredAfter time.Duration                  // 成功消息的过期时间
	DataCleanInterval          time.Duration                  // 数据清理间隔
	SnowflakeNode              int64                          // snowflake节点数
	Logger                     Logger                         // 日志接口
}

EngineOptions ...

type EngineType

type EngineType int

EngineType engine type

const (
	// EngineTypeGorm engine type for gorm
	EngineTypeGorm EngineType = iota + 1
)

type FailedThresholdCallbackHandler

type FailedThresholdCallbackHandler func(ctx context.Context, typ string, v interface{})

FailedThresholdCallbackHandler 重试阈值的失败回调处理

type Logger

type Logger interface {
	// Debugf 测试
	Debugf(ctx context.Context, format string, args ...interface{})
	// Debugln 测试
	Debugln(ctx context.Context, args ...interface{})
	// Infof 信息
	Infof(ctx context.Context, format string, args ...interface{})
	// Infoln 消息
	Infoln(ctx context.Context, args ...interface{})
	// Warnf 警告
	Warnf(ctx context.Context, format string, args ...interface{})
	// Warnln 警告
	Warnln(ctx context.Context, args ...interface{})
	// Warningf 警告
	Warningf(ctx context.Context, format string, args ...interface{})
	// Warningln 警告
	Warningln(ctx context.Context, args ...interface{})
	// Errorf 错误
	Errorf(ctx context.Context, format string, args ...interface{})
	// Errorln 错误
	Errorln(ctx context.Context, args ...interface{})
}

Logger logger

type Message

type Message eventbus.Message

Message 消息

func (*Message) Callback

func (m *Message) Callback() string

Callback 回调地址

func (*Message) ID

func (m *Message) ID() string

ID msg id

func (*Message) IsCallback

func (m *Message) IsCallback() bool

IsCallback 存在回调

func (*Message) IsID

func (m *Message) IsID() bool

IsID 是否存在ID

func (*Message) IsTimeout

func (m *Message) IsTimeout(timeout time.Duration) bool

IsTimeout 是否超时

type MessageTracker

type MessageTracker interface {
	HasProcessed(msgID string) (exist bool, err error)
	MarkAsProcessed(msgID string) (err error)
}

MessageTracker 消息追踪

func NewRedisMessageTracker

func NewRedisMessageTracker(rd *redis.Client, hashKey string) MessageTracker

NewRedisMessageTracker 创建基于redis做的消息追踪,用于幂等操作

type Published

type Published struct {
	ID         int64      `json:"id" gorm:"column:id;primaryKey;type:BIGINT(20)"`
	Version    string     `json:"version" gorm:"column:version;type:VARCHAR(20)"`
	Topic      string     `json:"topic" gorm:"column:topic;type:VARCHAR(200);not null"`
	Value      string     `json:"value" gorm:"column:value;type:LONGTEXT"`
	Retries    int        `json:"retries" gorm:"column:retries;type:INT(11)"`
	CreatedAt  time.Time  `json:"created_at" gorm:"column:created_at;type:DATETIME;not null"`
	ExpiresAt  *time.Time `json:"expires_at" gorm:"column:expires_at;type:DATETIME"`
	StatusName string     `json:"status_name" gorm:"index;column:status_name;type:VARCHAR(40);not null"`
}

Published ...

func (Published) TableName

func (Published) TableName() string

TableName ...

type Publisher

type Publisher interface {
	Publish(ctx context.Context, topic string, v interface{}, callback ...string) (err error)
	PublishAsync(ctx context.Context, topic string, v interface{}, callback ...string) (err error)
}

Publisher 发布接口

type Received

type Received struct {
	ID         int64      `json:"id" gorm:"column:id;primaryKey;type:BIGINT(20)"`
	Version    string     `json:"version" gorm:"column:version;type:VARCHAR(20)"`
	Topic      string     `json:"name" gorm:"column:name;type:VARCHAR(200);not null"`
	Group      string     `json:"group" gorm:"column:group;type:VARCHAR(200);not null"`
	Value      string     `json:"value" gorm:"column:value;type:LONGTEXT"`
	Retries    int        `json:"retries" gorm:"column:retries;type:INT(11)"`
	CreatedAt  time.Time  `json:"created_at" gorm:"column:created_at;type:DATETIME;not null"`
	ExpiresAt  *time.Time `json:"expires_at" gorm:"column:expires_at;type:DATETIME"`
	StatusName string     `json:"status_name" gorm:"index;column:status_name;type:VARCHAR(40);not null"`
}

Received ...

func (Received) TableName

func (Received) TableName() string

TableName ...

type StdLogger

type StdLogger struct {
}

StdLogger ...

func (StdLogger) Debugf

func (StdLogger) Debugf(ctx context.Context, format string, args ...interface{})

Debugf 测试

func (StdLogger) Debugln

func (StdLogger) Debugln(ctx context.Context, args ...interface{})

Debugln 测试

func (StdLogger) Errorf

func (StdLogger) Errorf(ctx context.Context, format string, args ...interface{})

Errorf 错误

func (StdLogger) Errorln

func (StdLogger) Errorln(ctx context.Context, args ...interface{})

Errorln 错误

func (StdLogger) Infof

func (StdLogger) Infof(ctx context.Context, format string, args ...interface{})

Infof 信息

func (StdLogger) Infoln

func (StdLogger) Infoln(ctx context.Context, args ...interface{})

Infoln 消息

func (StdLogger) Warnf

func (StdLogger) Warnf(ctx context.Context, format string, args ...interface{})

Warnf 警告

func (StdLogger) Warningf

func (StdLogger) Warningf(ctx context.Context, format string, args ...interface{})

Warningf 警告

func (StdLogger) Warningln

func (StdLogger) Warningln(ctx context.Context, args ...interface{})

Warningln 警告

func (StdLogger) Warnln

func (StdLogger) Warnln(ctx context.Context, args ...interface{})

Warnln 警告

type SubscribeHandler

type SubscribeHandler func(ctx context.Context, msg *Message) error

SubscribeHandler 订阅处理

type Subscriber

type Subscriber interface {
	Subscribe(ctx context.Context, topic string, h SubscribeHandler) (err error)
	SubscribeAsync(ctx context.Context, topic string, h SubscribeHandler) (err error)
}

Subscriber 订阅接口

type TransactionHandler

type TransactionHandler func(ctx context.Context, db interface{}) error

TransactionHandler ...

type Transactioner

type Transactioner interface {
	Rollback(ctx context.Context) (err error)
	Commit(ctx context.Context, args ...*CommitMessage) (err error)
	Session() interface{}
}

Transactioner 事务接口

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL