final

package module
v0.0.0-...-5891a38 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2022 License: Apache-2.0 Imports: 20 Imported by: 0

README

Final

Go Report Card codecov Build status LICENSE status

简介

Final 使用本地消息表实现最终一致性

使用

初始化

db, err := sql.Open("mysql", mysqlConnStr)
if err != nil {
  panic(err)
}

mqProvider := amqp.NewProvider(amqpConnStr)

bus := final.New("send_svc", db, mqProvider, final.DefaultOptions())
bus.Start()

更多的选项配置在 options.go

订阅

bus.Subscribe("topic1").Middleware(common.Middleware1, common.Middleware2).Handler(common.EchoHandler)

common.Middleware1,common.Middleware2,common.EchoHandler 的代码在 common.go

发布

msg := common.GeneralMessage{Type: "simple message", Count: 100}
msgBytes, _ := msgpack.Marshal(msg)
err := bus.Publish("topic1", msgBytes, message.WithConfirm(true))
if err != nil {
  panic(err)
}

更多消息发布策略在 message_policy.go

关联本地事务发布

database/sql
tx, _ := _example.NewDB().Begin()

/* return err rollback,return nil commit */
err := bus.Transaction(tx, func(txBus *final.TxBus) error {
  // 本地业务
  _, err := tx.Exec("INSERT INTO local_business (remark) VALUE (?)", "sql local business")
  if err != nil {
    return err
  }
  
  // 发布消息
  msg := common.GeneralMessage{Type: "sql transaction message", Count: 100}
	msgBytes, _ := msgpack.Marshal(msg)
  
  err = txBus.Publish("topic1", msgBytes)
  if err != nil {
    return err
  }
  return nil
})

if err != nil {
  panic(err)
}
gorm
tx := _example.NewGormDB().Begin()

/* return err rollback,return nil commit */
err = bus.Transaction(tx.Statement.ConnPool.(*sql.Tx), func(txBus *final.TxBus) error {
   // 本地业务
  result := tx.Create(&_example.LocalBusiness{
    Remark: "gorm local business",
  })
  if result.Error != nil {
    return result.Error
  }

   // 发送消息
  msg := common.GeneralMessage{Type: "gorm transaction message", Count: 100}
  msgBytes, _ := msgpack.Marshal(msg)
  err = txBus.Publish("topic1", msgBytes)
  if err != nil {
    return err
  }
  return nil
})

if err != nil {
  panic(err)
}

Documentation

Index

Constants

View Source
const (
	OutBoxRecordStatusPending uint8 = iota // 客户端发送消息,消息表中的默认状态, 等待 mq 的 confirm ack
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

func New

func New(svcName string, db *sql.DB, mqProvider mq.IProvider, opt Options) *Bus

New 初始化Bus db sql.DB 本地消息表使用到的db mqProvider mq.IProvider mq驱动实现,用于与消息队列交互, amqp 的实现 amqp_provider.Provider

func (*Bus) Publish

func (bus *Bus) Publish(topic string, payload []byte, opts ...message.PolicyOption) error

func (*Bus) Shutdown

func (bus *Bus) Shutdown() error

func (*Bus) Start

func (bus *Bus) Start() error

func (*Bus) Subscribe

func (bus *Bus) Subscribe(topic string) *routerTopic

func (*Bus) Transaction

func (bus *Bus) Transaction(tx *sql.Tx, fc func(txBus *TxBus) error) error

func (*Bus) WithTx

func (bus *Bus) WithTx(tx *sql.Tx) *TxBus

type Context

type Context struct {
	Topic   string
	Key     string
	Message *message.Message
	// contains filtered or unexported fields
}

func (*Context) Next

func (c *Context) Next() error

func (*Context) Reset

func (c *Context) Reset(m *message.Message, handlers []HandlerFunc)

type HandlerFunc

type HandlerFunc func(*Context) error

type HandlerMiddleware

type HandlerMiddleware func(h HandlerFunc) HandlerFunc

type Options

type Options struct {
	PurgeOnStartup bool // 启动Bus时是否清除遗留的消息,包含(mq遗留的消息,和本地消息表遗留的消息)

	// RetryCount retry count of message processing failed
	// Does not include the first attempt
	RetryCount uint
	// RetryCount retry interval of message processing failed
	RetryInterval time.Duration

	// outbox opt
	OutboxScanInterval time.Duration // 扫描outbox没有收到ack的消息间隔
	OutboxScanOffset   int64         // 扫描outbox没有收到ack的消息偏移量
	OutboxScanAgoTime  time.Duration // 扫描多久之前的消息

	NumSubscriber int // subscriber number
	NumAcker      int // acker number
}

func DefaultOptions

func DefaultOptions() Options

DefaultOptions bus 默认配置

func (Options) WithNumAcker

func (opt Options) WithNumAcker(val int) Options

WithNumAcker sets the number of acker Each acker runs in an independent goroutine The default value of NumAcker is 5.

func (Options) WithNumSubscriber

func (opt Options) WithNumSubscriber(val int) Options

WithNumSubscriber sets the number of subscriber Each subscriber runs in an independent goroutine The default value of NumSubscriber is 5.

func (Options) WithOutboxScanAgoTime

func (opt Options) WithOutboxScanAgoTime(val int64) Options

WithOutboxScanAgoTime 设置扫描多久之前的消息 The default value of OutboxScanAgoTime is 1 minute.

func (Options) WithOutboxScanInterval

func (opt Options) WithOutboxScanInterval(val time.Duration) Options

WithOutboxScanInterval 设置扫描outbox没有收到ack的消息间隔 The default value of OutboxScanInterval is 1 minute.

func (Options) WithOutboxScanOffset

func (opt Options) WithOutboxScanOffset(val int64) Options

WithOutboxScanOffset 设置扫描outbox没有收到ack的消息的偏移量 The default value of OutboxScanOffset is 500.

func (Options) WithPurgeOnStartup

func (opt Options) WithPurgeOnStartup(val bool) Options

WithPurgeOnStartup 设置启动Bus时是否清除遗留的消息 包含(mq遗留的消息,和本地消息表遗留的消息) The default value of PurgeOnStartup is false.

func (Options) WithRetryCount

func (opt Options) WithRetryCount(val uint) Options

WithRetryCount sets the retry count of message processing failed Does not include the first attempt The default value of RetryCount is 3.

func (Options) WithRetryInterval

func (opt Options) WithRetryInterval(val time.Duration) Options

WithRetryInterval sets the retry interval of message processing failed The default value of RetryInterval is 10 millisecond.

type TxBus

type TxBus struct {
	// contains filtered or unexported fields
}

func (*TxBus) Commit

func (txBus *TxBus) Commit() error

func (*TxBus) Publish

func (txBus *TxBus) Publish(topic string, payload []byte, opts ...message.PolicyOption) error

func (*TxBus) RollBack

func (txBus *TxBus) RollBack() error

Directories

Path Synopsis
tx
mq

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL