mysql

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidDB = fmt.Errorf("invalid db")
View Source
var ErrNoTransaction = fmt.Errorf("no transaction")
View Source
var ErrServiceNotCreate = fmt.Errorf("service not create")

Functions

This section is empty.

Types

type CustomRetry

type CustomRetry struct {
	Intervals []time.Duration
}

CustomRetry 自定义重试次数和间隔

func (*CustomRetry) Next

func (c *CustomRetry) Next(info *RetryInfo) *RetryInfo

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

func NewEventBus

func NewEventBus(serviceName string, db *gorm.DB, options ...Option) *EventBus

NewEventBus 提供领域事件直接持久化到数据库,异步查询事件并推送的功能 需要在业务数据库提前创建符合 EventPO, ServicePO 描述的库表,并且使用兼容 gorm Model 的 executor 参数:serviceName 服务名,同一个服务之间只有一个消费,不同服务之间独立消费 用法:eventBus := NewEventBus("service", db); NewEngine(lock, eventBus.Options()...)

func (*EventBus) Commit

func (e *EventBus) Commit(ctx context.Context) error

Commit 提交事务消息

func (*EventBus) Dispatch

func (e *EventBus) Dispatch(ctx context.Context, events ...*dddfirework.DomainEvent) error

Dispatch ...

func (*EventBus) DispatchBegin

func (e *EventBus) DispatchBegin(ctx context.Context, evts ...*dddfirework.DomainEvent) (context.Context, error)

DispatchBegin 开启事务消息

func (*EventBus) Options

func (e *EventBus) Options() []dddfirework.Option

func (*EventBus) RegisterEventHandler

func (e *EventBus) RegisterEventHandler(cb dddfirework.DomainEventHandler)

func (*EventBus) RegisterEventTXChecker

func (e *EventBus) RegisterEventTXChecker(checker dddfirework.DomainEventTXChecker)

func (*EventBus) Rollback

func (e *EventBus) Rollback(ctx context.Context) error

Rollback 回滚事务消息

func (*EventBus) Start

func (e *EventBus) Start(ctx context.Context)

type EventPO

type EventPO struct {
	ID             int64                    `gorm:"primaryKey;autoIncrement"`
	EventID        string                   `gorm:"column:event_id"`
	Event          *dddfirework.DomainEvent `gorm:"serializer:json"`
	TransID        int64                    `gorm:"column:trans_id"` // 事务id
	EventCreatedAt time.Time                `gorm:"index"`           // 事件的创建时间
	CreatedAt      time.Time                `gorm:"index"`           // 记录创建时间
}

EventPO 事件存储模型

CREATE TABLE `ddd_domain_event` (

`id` int NOT NULL AUTO_INCREMENT,
`event_id` varchar(64) NOT NULL,
`event` text NOT NULL,
`trans_id` int,
`event_created_at` datetime(3) DEFAULT NULL,
`created_at` datetime(3) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `idx_ddd_domain_event_event_id` (`event_id`),
KEY `idx_ddd_domain_event_trans_id` (`trans_id`),
KEY `idx_ddd_domain_event_created_at` (`created_at`),
KEY `idx_ddd_domain_event_event_created_at` (`event_created_at`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

func (*EventPO) TableName

func (o *EventPO) TableName() string

type EventStatus

type EventStatus int8
const (
	EventStatusToSend EventStatus = 1
	EventStatusSent   EventStatus = 2
	EventStatusFailed EventStatus = 3
)

type FailedInfo

type FailedInfo struct {
	IDs   []string
	Retry int
}

type IRetryStrategy

type IRetryStrategy interface {
	// Next 获取下一次重试的策略,返回 nil 表示不再重试
	// 当 RetryInfo.RetryCount == 0 表示初始化状态,通过 Next 获取第一次重试信息
	Next(info *RetryInfo) *RetryInfo
}

type IntervalRetry

type IntervalRetry struct {
	Interval time.Duration
	Limit    int
}

IntervalRetry 指定固定间隔和次数

func (*IntervalRetry) Next

func (c *IntervalRetry) Next(info *RetryInfo) *RetryInfo

type LimitRetry

type LimitRetry struct {
	Limit int
}

LimitRetry 设定最大重试次数,不指定间隔

func (*LimitRetry) Next

func (c *LimitRetry) Next(info *RetryInfo) *RetryInfo

type Option

type Option func(opt *Options)

type Options

type Options struct {
	// 重试策略:有两种方式
	// 1, RetryInterval + RetryLimit 表示固定间隔重试
	// 2, CustomRetry 表示自定义间隔重试
	RetryLimit    int             // 重试次数
	RetryInterval time.Duration   // 重试间隔
	CustomRetry   []time.Duration // 自定义重试间隔

	DefaultOffset     *int64        // 默认起始 offset
	RunInterval       time.Duration // 默认轮询间隔
	CleanCron         string        // 默认清理周期
	RetentionTime     time.Duration // 消费完成的event在db里的保留时间
	LimitPerRun       int           // 每次轮询最大的处理条数
	ConsumeConcurrent int           // 事件消费的并发数
	RetryStrategy     IRetryStrategy
	TXCheckTimeout    time.Duration
}

type RetryInfo

type RetryInfo struct {
	ID         int64
	RetryCount int       // 第 RetryCount 次重试, 有限范围从 1 开始, 0 表示初始状态
	RetryTime  time.Time // 重试时间
}

type ServicePO

type ServicePO struct {
	Name      string       `gorm:"primaryKey"`
	Retry     []*RetryInfo `gorm:"serializer:json"` // 重试信息
	Failed    []*RetryInfo `gorm:"serializer:json"` // 失败信息
	Offset    int64        `gorm:"column:offset"`   // 消费位置,等于最后一次消费的事件id
	CreatedAt time.Time    `gorm:"index"`           // 记录创建时间
	UpdatedAt time.Time    `gorm:"index"`           // 记录的更新时间
}

ServicePO 服务存储模型

CREATE TABLE `ddd_eventbus_service` (
`name` varchar(30) NOT NULL,
`failed` text,
`retry` text,
`offset` bigint(20) DEFAULT NULL,
`created_at` datetime(3) DEFAULT NULL,
`updated_at` datetime(3) DEFAULT NULL,
PRIMARY KEY (`name`),
KEY `idx_ddd_eventbus_service_created_at` (`created_at`),
KEY `idx_ddd_eventbus_service_updated_at` (`updated_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

func (*ServicePO) GetID

func (o *ServicePO) GetID() string

func (*ServicePO) TableName

func (o *ServicePO) TableName() string

type Transaction

type Transaction struct {
	ID        int64                      `gorm:"primaryKey;autoIncrement"`
	Service   string                     `gorm:"column:service"` // 服务名
	Events    []*dddfirework.DomainEvent `gorm:"serializer:json"`
	DueTime   time.Time                  `gorm:"column:due_time"` // 事务超时时间
	CreatedAt time.Time                  `gorm:"index"`           // 记录创建时间
}

CREATE TABLE `ddd_event_transaction` (

`id` int NOT NULL AUTO_INCREMENT,
`service` varchar(30) NOT NULL,
`events` text,
`due_time` datetime(3) DEFAULT NULL,
`created_at` datetime(3) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `idx_ddd_event_transaction_created_at` (`created_at`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

func (*Transaction) TableName

func (o *Transaction) TableName() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL