Documentation ¶
Overview ¶
Package outboxer is an implementation of the outbox pattern. The producer of messages can durably store those messages in a local outbox before sending to a Message Endpoint. The durable local storage may be implemented in the Message Channel directly, especially when combined with Idempotent Messages.
Index ¶
- Variables
- type DataStore
- type DynamicValues
- type EventStream
- type ExecerContext
- type Option
- func WithCheckInterval(t time.Duration) Option
- func WithCleanUpBatchSize(s int32) Option
- func WithCleanUpBefore(t time.Time) Option
- func WithCleanupInterval(t time.Duration) Option
- func WithDataStore(ds DataStore) Option
- func WithEventStream(es EventStream) Option
- func WithMessageBatchSize(s int32) Option
- type OutboxMessage
- type Outboxer
- func (o *Outboxer) ErrChan() <-chan error
- func (o *Outboxer) OkChan() <-chan struct{}
- func (o *Outboxer) Send(ctx context.Context, m *OutboxMessage) error
- func (o *Outboxer) SendWithinTx(ctx context.Context, evt *OutboxMessage, fn func(ExecerContext) error) error
- func (o *Outboxer) Start(ctx context.Context)
- func (o *Outboxer) StartCleanup(ctx context.Context)
- func (o *Outboxer) StartDispatcher(ctx context.Context)
- func (o *Outboxer) Stop()
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrMissingEventStream is used when no event stream is provided ErrMissingEventStream = errors.New("an event stream is required for the outboxer to work") // ErrMissingDataStore is used when no data store is provided ErrMissingDataStore = errors.New("a data store is required for the outboxer to work") )
Functions ¶
This section is empty.
Types ¶
type DataStore ¶
type DataStore interface { // Tries to find the given message in the outbox. GetEvents(ctx context.Context, batchSize int32) ([]*OutboxMessage, error) Add(ctx context.Context, m *OutboxMessage) error AddWithinTx(ctx context.Context, m *OutboxMessage, fn func(ExecerContext) error) error SetAsDispatched(ctx context.Context, id int64) error Remove(ctx context.Context, since time.Time, batchSize int32) error }
DataStore defines the data store methods
type DynamicValues ¶
type DynamicValues map[string]interface{}
DynamicValues is a map that can be serialized
func (*DynamicValues) Scan ¶
func (p *DynamicValues) Scan(src interface{}) error
Scan scans a database json representation into a []Item
type EventStream ¶
type EventStream interface {
Send(context.Context, *OutboxMessage) error
}
EventStream defines the event stream methods
type ExecerContext ¶
type ExecerContext interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}
ExecerContext defines the exec context method that is used within a transaction
type Option ¶
type Option func(*Outboxer)
Option represents the outboxer options
func WithCheckInterval ¶
WithCheckInterval sets the frequency that outboxer will check for new events
func WithCleanUpBatchSize ¶
WithCleanUpBatchSize sets the clean up process batch size
func WithCleanUpBefore ¶
WithCleanUpBefore sets the date that the clean up process should start removing from
func WithCleanupInterval ¶
WithCleanupInterval sets the frequency that outboxer will clean old events from the data store
func WithDataStore ¶
WithDataStore sets the data store where events will be stored before sending
func WithEventStream ¶
func WithEventStream(es EventStream) Option
WithEventStream sets the event stream to where events will be sent
func WithMessageBatchSize ¶
WithMessageBatchSize sets how many messages will be sent at a time
type OutboxMessage ¶
type OutboxMessage struct { ID int64 Dispatched bool DispatchedAt sql.NullTime Payload []byte Options DynamicValues Headers DynamicValues }
OutboxMessage represents a message that will be sent
type Outboxer ¶
type Outboxer struct {
// contains filtered or unexported fields
}
Outboxer implements the outbox pattern
func New ¶
New creates a new instance of Outboxer
Example ¶
package main import ( "context" "database/sql" "fmt" "os" "time" "github.com/italolelis/outboxer" amqpOut "github.com/italolelis/outboxer/amqp" "github.com/italolelis/outboxer/postgres" "github.com/streadway/amqp" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() db, err := sql.Open("postgres", os.Getenv("DS_DSN")) if err != nil { fmt.Printf("could not connect to amqp: %s", err) return } conn, err := amqp.Dial(os.Getenv("ES_DSN")) if err != nil { fmt.Printf("could not connect to amqp: %s", err) return } // we need to create a data store instance first ds, err := postgres.WithInstance(ctx, db) if err != nil { fmt.Printf("could not setup the data store: %s", err) return } defer ds.Close() // we create an event stream passing the amqp connection es := amqpOut.NewAMQP(conn) // now we create an outboxer instance passing the data store and event stream o, err := outboxer.New( outboxer.WithDataStore(ds), outboxer.WithEventStream(es), outboxer.WithCheckInterval(1*time.Second), outboxer.WithCleanupInterval(5*time.Second), outboxer.WithCleanUpBefore(time.Now().AddDate(0, 0, -5)), outboxer.WithCleanUpBatchSize(10), outboxer.WithMessageBatchSize(10), ) if err != nil { fmt.Printf("could not create an outboxer instance: %s", err) return } // here we initialize the outboxer checks and cleanup go rotines o.Start(ctx) defer o.Stop() // finally we are ready to send messages if err = o.Send(ctx, &outboxer.OutboxMessage{ Payload: []byte("test payload"), Options: map[string]interface{}{ amqpOut.ExchangeNameOption: "test", amqpOut.ExchangeTypeOption: "topic", amqpOut.RoutingKeyOption: "test.send", }, }); err != nil { fmt.Printf("could not send message: %s", err) return } // we can also listen for errors and ok messages that were send for { select { case err := <-o.ErrChan(): fmt.Printf("could not send message: %s", err) case <-o.OkChan(): fmt.Printf("message received") return } } }
Output:
func (*Outboxer) OkChan ¶
func (o *Outboxer) OkChan() <-chan struct{}
OkChan returns the ok channel that is used when each message is successfully delivered
func (*Outboxer) Send ¶
func (o *Outboxer) Send(ctx context.Context, m *OutboxMessage) error
Send sends a message
func (*Outboxer) SendWithinTx ¶
func (o *Outboxer) SendWithinTx(ctx context.Context, evt *OutboxMessage, fn func(ExecerContext) error) error
SendWithinTx encapsulate any database call within a transaction
func (*Outboxer) Start ¶
Start encapsulates two go routines. Starts the dispatcher, which is responsible for getting the messages from the data store and sending to the event stream. Starts the cleanup process, that makes sure old messages are removed from the data store.
func (*Outboxer) StartCleanup ¶
StartCleanup starts the cleanup process, that makes sure old messages are removed from the data store.
func (*Outboxer) StartDispatcher ¶
StartDispatcher starts the dispatcher, which is responsible for getting the messages from the data store and sending to the event stream.
Directories ¶
Path | Synopsis |
---|---|
Package amqp is the AMQP implementation of an event stream.
|
Package amqp is the AMQP implementation of an event stream. |
Package kinesis is the AWS Kinesis implementation of an event stream.
|
Package kinesis is the AWS Kinesis implementation of an event stream. |
Package mysql is the implementation of the mysql data store.
|
Package mysql is the implementation of the mysql data store. |
Package postgres is the implementation of the postgres data store.
|
Package postgres is the implementation of the postgres data store. |