outboxer

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2019 License: MIT Imports: 8 Imported by: 0

README

Outboxer

Build Status Coverage Status Go Report Card GoDoc

Outboxer is a go library that implements the outbox pattern.

Getting Started

Outboxer was designed to simplify the tough work of orchestrating message reliabilty. Essentially we are trying to solve this question:

How can producers reliably send messages when the broker/consumer is unavailable?

If you have a distributed system architecture and especially is dealing with Event Driven Architecture, you might want to use outboxer.

The first thing to do is include the package in your project

go get github.com/italolelis/outboxer
Initial Configuration

Let's setup a simple example where you are using RabbitMQ and Postgres as your outbox pattern components:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

db, err := sql.Open("postgres", os.Getenv("DS_DSN"))
if err != nil {
    fmt.Printf("could not connect to amqp: %s", err)
    return
}

conn, err := amqp.Dial(os.Getenv("ES_DSN"))
if err != nil {
    fmt.Printf("could not connect to amqp: %s", err)
    return
}

// we need to create a data store instance first
ds, err := postgres.WithInstance(ctx, db)
if err != nil {
    fmt.Printf("could not setup the data store: %s", err)
    return
}
defer ds.Close()

// we create an event stream passing the amqp connection
es := amqpOut.NewAMQP(conn)

// now we create an outboxer instance passing the data store and event stream
o, err := outboxer.New(
    outboxer.WithDataStore(ds),
    outboxer.WithEventStream(es),
    outboxer.WithCheckInterval(1*time.Second),
    outboxer.WithCleanupInterval(5*time.Second),
    outboxer.WithCleanUpBefore(time.Now().AddDate(0, 0, -5)),
)
if err != nil {
    fmt.Printf("could not create an outboxer instance: %s", err)
    return
}

// here we initialize the outboxer checks and cleanup go rotines
o.Start(ctx)
defer o.Stop()

// finally we are ready to send messages
if err = o.Send(ctx, &outboxer.OutboxMessage{
    Payload: []byte("test payload"),
    Options: map[string]interface{}{
        amqpOut.ExchangeNameOption: "test",
        amqpOut.ExchangeTypeOption: "topic",
        amqpOut.RoutingKeyOption:   "test.send",
    },
}); err != nil {
    fmt.Printf("could not send message: %s", err)
    return
}

// we can also listen for errors and ok messages that were send
for {
    select {
    case err := <-o.ErrChan():
        fmt.Printf("could not send message: %s", err)
    case <-o.OkChan():
        fmt.Printf("message received")
        return
    }
}

Features

Outboxer comes with a few implementations of Data Stores and Event Streams.

Contributing

Please read CONTRIBUTING.md for details on our code of conduct, and the process for submitting pull requests to us.

License

This project is licensed under the MIT License - see the LICENSE file for details

Documentation

Overview

Package outboxer is an implementation of the outbox pattern. The producer of messages can durably store those messages in a local outbox before sending to a Message Endpoint. The durable local storage may be implemented in the Message Channel directly, especially when combined with Idempotent Messages.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrMissingEventStream is used when no event stream is provided
	ErrMissingEventStream = errors.New("an event stream is required for the outboxer to work")

	// ErrMissingDataStore is used when no data store is provided
	ErrMissingDataStore = errors.New("a data store is required for the outboxer to work")
)

Functions

This section is empty.

Types

type DataStore

type DataStore interface {
	// Tries to find the given message in the outbox.
	GetEvents(ctx context.Context, batchSize int32) ([]*OutboxMessage, error)
	Add(ctx context.Context, m *OutboxMessage) error
	AddWithinTx(ctx context.Context, m *OutboxMessage, fn func(ExecerContext) error) error
	SetAsDispatched(ctx context.Context, id int64) error
	Remove(ctx context.Context, since time.Time, batchSize int32) error
}

DataStore defines the data store methods

type DynamicValues

type DynamicValues map[string]interface{}

DynamicValues is a map that can be serialized

func (*DynamicValues) Scan

func (p *DynamicValues) Scan(src interface{}) error

Scan scans a database json representation into a []Item

func (DynamicValues) Value

func (p DynamicValues) Value() (driver.Value, error)

Value return a driver.Value representation of the order items

type EventStream

type EventStream interface {
	Send(context.Context, *OutboxMessage) error
}

EventStream defines the event stream methods

type ExecerContext

type ExecerContext interface {
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}

ExecerContext defines the exec context method that is used within a transaction

type Option

type Option func(*Outboxer)

Option represents the outboxer options

func WithCheckInterval

func WithCheckInterval(t time.Duration) Option

WithCheckInterval sets the frequency that outboxer will check for new events

func WithCleanUpBatchSize

func WithCleanUpBatchSize(s int32) Option

WithCleanUpBatchSize sets the clean up process batch size

func WithCleanUpBefore

func WithCleanUpBefore(t time.Time) Option

WithCleanUpBefore sets the date that the clean up process should start removing from

func WithCleanupInterval

func WithCleanupInterval(t time.Duration) Option

WithCleanupInterval sets the frequency that outboxer will clean old events from the data store

func WithDataStore

func WithDataStore(ds DataStore) Option

WithDataStore sets the data store where events will be stored before sending

func WithEventStream

func WithEventStream(es EventStream) Option

WithEventStream sets the event stream to where events will be sent

func WithMessageBatchSize

func WithMessageBatchSize(s int32) Option

WithMessageBatchSize sets how many messages will be sent at a time

type OutboxMessage

type OutboxMessage struct {
	ID           int64
	Dispatched   bool
	DispatchedAt sql.NullTime
	Payload      []byte
	Options      DynamicValues
	Headers      DynamicValues
}

OutboxMessage represents a message that will be sent

type Outboxer

type Outboxer struct {
	// contains filtered or unexported fields
}

Outboxer implements the outbox pattern

func New

func New(opts ...Option) (*Outboxer, error)

New creates a new instance of Outboxer

Example
package main

import (
	"context"
	"database/sql"
	"fmt"
	"os"
	"time"

	"github.com/italolelis/outboxer"
	amqpOut "github.com/italolelis/outboxer/amqp"
	"github.com/italolelis/outboxer/postgres"
	"github.com/streadway/amqp"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	db, err := sql.Open("postgres", os.Getenv("DS_DSN"))
	if err != nil {
		fmt.Printf("could not connect to amqp: %s", err)
		return
	}

	conn, err := amqp.Dial(os.Getenv("ES_DSN"))
	if err != nil {
		fmt.Printf("could not connect to amqp: %s", err)
		return
	}

	// we need to create a data store instance first
	ds, err := postgres.WithInstance(ctx, db)
	if err != nil {
		fmt.Printf("could not setup the data store: %s", err)
		return
	}
	defer ds.Close()

	// we create an event stream passing the amqp connection
	es := amqpOut.NewAMQP(conn)

	// now we create an outboxer instance passing the data store and event stream
	o, err := outboxer.New(
		outboxer.WithDataStore(ds),
		outboxer.WithEventStream(es),
		outboxer.WithCheckInterval(1*time.Second),
		outboxer.WithCleanupInterval(5*time.Second),
		outboxer.WithCleanUpBefore(time.Now().AddDate(0, 0, -5)),
		outboxer.WithCleanUpBatchSize(10),
		outboxer.WithMessageBatchSize(10),
	)
	if err != nil {
		fmt.Printf("could not create an outboxer instance: %s", err)
		return
	}

	// here we initialize the outboxer checks and cleanup go rotines
	o.Start(ctx)
	defer o.Stop()

	// finally we are ready to send messages
	if err = o.Send(ctx, &outboxer.OutboxMessage{
		Payload: []byte("test payload"),
		Options: map[string]interface{}{
			amqpOut.ExchangeNameOption: "test",
			amqpOut.ExchangeTypeOption: "topic",
			amqpOut.RoutingKeyOption:   "test.send",
		},
	}); err != nil {
		fmt.Printf("could not send message: %s", err)
		return
	}

	// we can also listen for errors and ok messages that were send
	for {
		select {
		case err := <-o.ErrChan():
			fmt.Printf("could not send message: %s", err)
		case <-o.OkChan():
			fmt.Printf("message received")
			return
		}
	}
}
Output:

func (*Outboxer) ErrChan

func (o *Outboxer) ErrChan() <-chan error

ErrChan returns the error channel

func (*Outboxer) OkChan

func (o *Outboxer) OkChan() <-chan struct{}

OkChan returns the ok channel that is used when each message is successfully delivered

func (*Outboxer) Send

func (o *Outboxer) Send(ctx context.Context, m *OutboxMessage) error

Send sends a message

func (*Outboxer) SendWithinTx

func (o *Outboxer) SendWithinTx(ctx context.Context, evt *OutboxMessage, fn func(ExecerContext) error) error

SendWithinTx encapsulate any database call within a transaction

func (*Outboxer) Start

func (o *Outboxer) Start(ctx context.Context)

Start encapsulates two go routines. Starts the dispatcher, which is responsible for getting the messages from the data store and sending to the event stream. Starts the cleanup process, that makes sure old messages are removed from the data store.

func (*Outboxer) StartCleanup

func (o *Outboxer) StartCleanup(ctx context.Context)

StartCleanup starts the cleanup process, that makes sure old messages are removed from the data store.

func (*Outboxer) StartDispatcher

func (o *Outboxer) StartDispatcher(ctx context.Context)

StartDispatcher starts the dispatcher, which is responsible for getting the messages from the data store and sending to the event stream.

func (*Outboxer) Stop

func (o *Outboxer) Stop()

Stop closes all channels

Directories

Path Synopsis
Package amqp is the AMQP implementation of an event stream.
Package amqp is the AMQP implementation of an event stream.
Package kinesis is the AWS Kinesis implementation of an event stream.
Package kinesis is the AWS Kinesis implementation of an event stream.
Package mysql is the implementation of the mysql data store.
Package mysql is the implementation of the mysql data store.
Package postgres is the implementation of the postgres data store.
Package postgres is the implementation of the postgres data store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL