outbox

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2022 License: MIT Imports: 7 Imported by: 1

README

Outbox

outbox-image

Overview

Simple and clean implementation of outbox.

Available storages:

  • MongoDB
  • Mysql (in progress)

Available broker/producer:

  • Kafka

Usage

Create a collection on mongodb with this fields

{
  "idempotency_id": "58452f68-705b-4b2e-8685-fc929e750588",
  "message": {
    "id": "58452f68-705b-4b2e-8685-fc929e750588",
    "name": "Guilherme",
    "age": 27
  },
  "topic": "user_created",
  "produced": false
}

Initialize outbox

mongoStorage := mongostorage.NewMongoStorage(mongoConnection())
kafkaProducer := kafka.NewProducer(kafka.NewProducer(
		&kafka.ConfigMap{
			"bootstrap.servers":        "localhost:29092",
			"delivery.timeout.ms":      600000,
			"linger.ms":                10000,
			"message.send.max.retries": 10000000,
			"batch.num.messages":       1,
			"enable.idempotence":       true,
		},
	))

o := outbox.NewOutbox(mongoStorage, kafkaProducer)

go o.Listen(context.Background())

In your storage layer do you should remember to use db transaction to save to your app collection/table and outbox.

Example

Use the docker-compose inside example_mongo to up dependencies and run the example

It's recommend use the configuration batch.num.messages with value 1 to will don't have problem with duplicate messages, also validate idempotency_id on consumer

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Model

type Model struct {
	IdempotencyID string `json:"idempotency_id" bson:"idempotency_id"`
	Message       string `json:"message" bson:"message"`
	Topic         string `json:"topic" bson:"topic"`
	Delivered     bool   `json:"delivered" bson:"delivered"`
}

type Outbox

type Outbox interface {
	Listen(ctx context.Context)
}

func NewOutbox

func NewOutbox(storage Storage, producer Producer) Outbox

type Producer

type Producer interface {
	Produce(items []Model, deliveredID chan<- string) error
}

func NewKafkaProducer

func NewKafkaProducer(producer *kafka.Producer) Producer

type Storage

type Storage interface {
	ListAllItems(ctx context.Context) ([]Model, error)
	UpdateItemToCheck(ctx context.Context, id string) error
	DeleteCheckedItems(ctx context.Context) error
	SaveItem(ctx context.Context, item Model) error
}

func NewMongoStorage

func NewMongoStorage(db *mongo.Database) Storage

func NewMysqlStorage

func NewMysqlStorage(db *sqlx.DB) Storage

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL