Documentation ¶
Index ¶
- Constants
- Variables
- func DecodeValue(s string, v interface{}) (err error)
- func EncodeValue(v interface{}) (s string, err error)
- type CommitMessage
- type Engine
- type EngineOptions
- type EngineType
- type FailedThresholdCallbackHandler
- type Logger
- type Message
- type MessageTracker
- type Published
- type Publisher
- type Received
- type StdLogger
- func (StdLogger) Debugf(ctx context.Context, format string, args ...interface{})
- func (StdLogger) Debugln(ctx context.Context, args ...interface{})
- func (StdLogger) Errorf(ctx context.Context, format string, args ...interface{})
- func (StdLogger) Errorln(ctx context.Context, args ...interface{})
- func (StdLogger) Infof(ctx context.Context, format string, args ...interface{})
- func (StdLogger) Infoln(ctx context.Context, args ...interface{})
- func (StdLogger) Warnf(ctx context.Context, format string, args ...interface{})
- func (StdLogger) Warningf(ctx context.Context, format string, args ...interface{})
- func (StdLogger) Warningln(ctx context.Context, args ...interface{})
- func (StdLogger) Warnln(ctx context.Context, args ...interface{})
- type SubscribeHandler
- type Subscriber
- type TransactionHandler
- type Transactioner
Constants ¶
View Source
const ( // MessageHeaderMsgIDKey 消息ID MessageHeaderMsgIDKey = "nilorg.outbox.msg.id" // MessageHeaderMsgTopicKey 消息主题 MessageHeaderMsgTopicKey = "nilorg.outbox.msg.topic" // MessageHeaderMsgTypeKey 消息内容类型 MessageHeaderMsgTypeKey = "nilorg.outbox.msg.type" // MessageHeaderMsgSendTimeKey 消息发送时间 MessageHeaderMsgSendTimeKey = "nilorg.outbox.msg.sendtime" // MessageHeaderMsgCallbackKey 消息回调 MessageHeaderMsgCallbackKey = "nilorg.outbox.msg.callback" )
View Source
const ( // StatusNameFailed 失败 StatusNameFailed = "failed" // StatusNameScheduled 列入计划 StatusNameScheduled = "scheduled" // StatusNameSucceeded 成功 StatusNameSucceeded = "succeeded" )
View Source
const ( // PublishedTableName ... PublishedTableName = "outbox_published" // ReceivedTableName ... ReceivedTableName = "outbox_received" )
View Source
const ( // CallbackTypePublished ... CallbackTypePublished = "Published" // CallbackTypeReceived ... CallbackTypeReceived = "Received" )
View Source
const (
// MessageVersion 版本
MessageVersion = "v1"
)
Variables ¶
View Source
var ( // DefaultEngineOptions 默认选项 DefaultEngineOptions = EngineOptions{ FailedRetryInterval: time.Minute, FailedRetryCount: 50, DataCleanInterval: time.Hour, SucceedMessageExpiredAfter: 24 * time.Hour, SnowflakeNode: 1, Logger: &eventbus.StdLogger{}, } )
Functions ¶
Types ¶
type CommitMessage ¶
CommitMessage 提交message
type Engine ¶
type Engine interface { Publisher Subscriber Begin(ctx context.Context, opts ...*sql.TxOptions) (tx Transactioner, err error) Transaction(ctx context.Context, h TransactionHandler, args ...*CommitMessage) (err error) }
Engine ...
func New ¶
func New(typ EngineType, v interface{}, eventBus eventbus.EventBus, options ...*EngineOptions) (engine Engine, err error)
New 创建
type EngineOptions ¶
type EngineOptions struct { FailedRetryInterval time.Duration // 失败重试间隔时间 FailedRetryCount int // 最大重试次数 FailedThresholdCallback FailedThresholdCallbackHandler // 重试阈值的失败回调 SucceedMessageExpiredAfter time.Duration // 成功消息的过期时间 DataCleanInterval time.Duration // 数据清理间隔 SnowflakeNode int64 // snowflake节点数 Logger Logger // 日志接口 }
EngineOptions ...
type EngineType ¶
type EngineType int
EngineType engine type
const ( // EngineTypeGorm engine type for gorm EngineTypeGorm EngineType = iota + 1 )
type FailedThresholdCallbackHandler ¶
FailedThresholdCallbackHandler 重试阈值的失败回调处理
type Logger ¶
type Logger interface { // Debugf 测试 Debugf(ctx context.Context, format string, args ...interface{}) // Debugln 测试 Debugln(ctx context.Context, args ...interface{}) // Infof 信息 Infof(ctx context.Context, format string, args ...interface{}) // Infoln 消息 Infoln(ctx context.Context, args ...interface{}) // Warnf 警告 Warnf(ctx context.Context, format string, args ...interface{}) // Warnln 警告 Warnln(ctx context.Context, args ...interface{}) // Warningf 警告 Warningf(ctx context.Context, format string, args ...interface{}) // Warningln 警告 Warningln(ctx context.Context, args ...interface{}) // Errorf 错误 Errorf(ctx context.Context, format string, args ...interface{}) // Errorln 错误 Errorln(ctx context.Context, args ...interface{}) }
Logger logger
type MessageTracker ¶
type MessageTracker interface { HasProcessed(msgID string) (exist bool, err error) MarkAsProcessed(msgID string) (err error) }
MessageTracker 消息追踪
func NewRedisMessageTracker ¶
func NewRedisMessageTracker(rd *redis.Client, hashKey string) MessageTracker
NewRedisMessageTracker 创建基于redis做的消息追踪,用于幂等操作
type Published ¶
type Published struct { ID int64 `json:"id" gorm:"column:id;primaryKey;type:BIGINT(20)"` Version string `json:"version" gorm:"column:version;type:VARCHAR(20)"` Topic string `json:"topic" gorm:"column:topic;type:VARCHAR(200);not null"` Value string `json:"value" gorm:"column:value;type:LONGTEXT"` Retries int `json:"retries" gorm:"column:retries;type:INT(11)"` CreatedAt time.Time `json:"created_at" gorm:"column:created_at;type:DATETIME;not null"` ExpiresAt *time.Time `json:"expires_at" gorm:"column:expires_at;type:DATETIME"` StatusName string `json:"status_name" gorm:"index;column:status_name;type:VARCHAR(40);not null"` }
Published ...
type Publisher ¶
type Publisher interface { Publish(ctx context.Context, topic string, v interface{}, callback ...string) (err error) PublishAsync(ctx context.Context, topic string, v interface{}, callback ...string) (err error) }
Publisher 发布接口
type Received ¶
type Received struct { ID int64 `json:"id" gorm:"column:id;primaryKey;type:BIGINT(20)"` Version string `json:"version" gorm:"column:version;type:VARCHAR(20)"` Topic string `json:"name" gorm:"column:name;type:VARCHAR(200);not null"` Group string `json:"group" gorm:"column:group;type:VARCHAR(200);not null"` Value string `json:"value" gorm:"column:value;type:LONGTEXT"` Retries int `json:"retries" gorm:"column:retries;type:INT(11)"` CreatedAt time.Time `json:"created_at" gorm:"column:created_at;type:DATETIME;not null"` ExpiresAt *time.Time `json:"expires_at" gorm:"column:expires_at;type:DATETIME"` StatusName string `json:"status_name" gorm:"index;column:status_name;type:VARCHAR(40);not null"` }
Received ...
type StdLogger ¶
type StdLogger struct { }
StdLogger ...
type SubscribeHandler ¶
SubscribeHandler 订阅处理
type Subscriber ¶
type Subscriber interface { Subscribe(ctx context.Context, topic string, h SubscribeHandler) (err error) SubscribeAsync(ctx context.Context, topic string, h SubscribeHandler) (err error) }
Subscriber 订阅接口
type TransactionHandler ¶
TransactionHandler ...
type Transactioner ¶
type Transactioner interface { Rollback(ctx context.Context) (err error) Commit(ctx context.Context, args ...*CommitMessage) (err error) Session() interface{} }
Transactioner 事务接口
Source Files ¶
Click to show internal directories.
Click to hide internal directories.