troutbox

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2025 License: MIT Imports: 10 Imported by: 0

README

Every trout is delivered.

Go Transactional Outbox Library

Build Status

The Go Transactional Outbox Library is a robust implementation of the Outbox Pattern for reliable message delivery in distributed systems. It ensures that messages are stored in a database and sent at least once to a message broker (e.g., RabbitMQ, Kafka) in a fault-tolerant and transactional manner.

Usage

Here is a simple example of how to use the library:

import (
    "context"
    "github.com/Darkemon/troutbox"
    "github.com/Darkemon/troutbox/adapter/psql"
)

var (
    db *sql.DB             // your database connection
    sender troutbox.Sender // your message sender implementation
)

// Create psql implementation of the repository.
// The second argument is lock id for partitioning job.
// It is used to prevent multiple instances of the application
// from running the partitioning job at the same time.
repo, err := psql.NewPostgresMessageRepository(db, 12345)

// Create necessary tables.
err := repo.Migrate(ctx)

// Run a job to create/remove partitions.
go func() {
    err := repo.RunPartitioningJob(ctx)
    // ...
}()

// Create a new outbox instance and run it.
outbox := troutbox.NewOutbox(repo, sender)

go func() {
    err := outbox.Run(ctx)
    // ...
}()

// Add a message to the outbox.
tr := db.BeginTx(ctx, nil)
err := outbox.AddMessage(ctx, tr, "my-key", []byte("my-value"))
// ...
tr.Commit() // or tr.Rollback()

For more detailed example, please refer to the examples directory.
See adapter directory for repository implementations.

Features

  • Distributed Systems Support: Designed to work seamlessly in distributed environments.
  • Transactional Support: Ensures messages are added to the outbox and processed reliably within database transactions.
  • Retry Logic: Automatically retries failed messages up to a configurable limit.
  • Dead Letter Handling: Marks messages as dead if they exceed the retry limit, ensuring they are not retried indefinitely.
  • Extensibility: Easily integrates with different storage backends (e.g., PostgreSQL, MySQL) and message brokers (e.g., RabbitMQ, Kafka).
  • OpenTelemetry Integration: Provides observability with metrics and tracing.
  • Customizable: Configurable batch size, retry limits and error handlers.

Installation

To install the library, run:

go get github.com/Darkemon/troutbox

Observability

The library provides OpenTelemetry traces and metrics for monitoring the status of the outbox.

The following metrics are available:

  • outbox_messages_sent: (counter) the total number of messages sent to the message broker.
  • outbox_messages_failed: (counter) the total number of messages that failed to be sent.
  • outbox_messages_retried: (counter) the total number of messages that were retried.
  • outbox_messages_dead: (counter) the total number of messages that were marked as dead.

Take into account that the counters might be not 100% accurate in case when there are issues with database connection.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type InTransactionFn

type InTransactionFn func(ctx context.Context, s MessageRepository) error

type Message

type Message struct {
	ID        uint64
	Key       string
	Value     []byte
	Timestamp time.Time
	Retries   uint
	// contains filtered or unexported fields
}

func (*Message) MarkAsFailed

func (m *Message) MarkAsFailed()

func (*Message) MarkAsSent

func (m *Message) MarkAsSent()

type MessageRepository

type MessageRepository interface {
	// FetchAndLock fetches a batch of ready-to-sent messages from the outbox and locks them for processing.
	// It fetches at most batchSize messages sorted by timestamp in ascending order.
	FetchAndLock(ctx context.Context, batchSize uint) ([]Message, error)
	// UpdateRetryCount updates the retry count for the message with the given ID.
	// It's called when the message is retried.
	UpdateRetryCount(ctx context.Context, ids []uint64) error
	// MarkAsDead marks the message as dead so that it won't be retried anymore.
	// It's called when the message cannot be sent after maxRetries.
	MarkAsDead(ctx context.Context, ids []uint64) error
	// MarkAsSent marks the message as sent.
	MarkAsSent(ctx context.Context, ids []uint64) error
}

MessageRepository describes an interface for interacting with the outbox storage (e.g., a database).

type Option

type Option func(*options)

func WithBatchSize

func WithBatchSize(size uint) Option

WithBatchSize sets the batch size to fetch messages from the repository. The default batch size is 10.

func WithErrorHandler

func WithErrorHandler(handler func(error)) Option

WithErrorHandler sets the error handler for the outbox. The error handler is called when an error occurs while sending messages. The default error handler logs the error to the standard logger. Be careful, it blocks the main sending loop.

func WithMaxRetries

func WithMaxRetries(retries uint) Option

WithMaxRetries sets the maximum number of retries for sending messages. The default maximum number of retries is 3.

func WithOtelMeter

func WithOtelMeter(meter metric.Meter) Option

WithOtelMeter sets the OpenTelemetry meter for the outbox. By default, the default OpenTelemetry meter is used.

func WithOtelTracer

func WithOtelTracer(tracer trace.Tracer) Option

WithOtelTracer sets the OpenTelemetry tracer for the outbox. By default, the default OpenTelemetry tracer is used.

func WithPeriod

func WithPeriod(period time.Duration) Option

WithPeriod sets the period for sending messages. The default period is 5 seconds.

func WithSendTimeout

func WithSendTimeout(timeout time.Duration) Option

WithSendTimeout sets the timeout for sending messages, including communication with db. The default timeout is 2 second.

type Outbox

type Outbox[T any] struct {
	// contains filtered or unexported fields
}

func NewOutbox

func NewOutbox[T any](storage TransactionalMessageRepository[T], sender Sender, opts ...Option) (*Outbox[T], error)

NewOutbox creates a new Outbox instance.

func (*Outbox[T]) AddMessage

func (o *Outbox[T]) AddMessage(ctx context.Context, tx T, key string, value []byte) error

AddMessage adds a message to the outbox. It sets the timestamp to the current UTC time.

func (*Outbox[T]) Run

func (o *Outbox[T]) Run(ctx context.Context) error

Run starts the outbox loop. It fetches a batch of messages from the DB every period (5 seconds by default) and sends them at least once. It marks the messages as sent after they are successfully sent. The loop can be stopped by cancelling the context.

type SendStatus

type SendStatus uint8
const (
	StatusNone SendStatus = 0 // not tried to send
	StatusSent SendStatus = 1 // sent successfully
	StatusFail SendStatus = 2 // failed to send
	StatusDead SendStatus = 3 // retries exceeded, message is dead
)

type Sender

type Sender interface {
	// Send sends a batch of messages to the message broker. The passed messages are
	// sorted by timestamp in ascending order.
	// It should return the same list of messages with updated statuses
	// (method SetStatusSent or SetStatusFail of the Message). Returned error should be
	// returned if the sending failed completely or partially.
	Send(ctx context.Context, msg []Message) ([]Message, error)
}

type TransactionalMessageRepository

type TransactionalMessageRepository[T any] interface {
	MessageRepository

	// AddMessage adds a message to the outbox.
	AddMessage(ctx context.Context, tx T, msg Message) error
	// WithTransaction executes a function within a transaction.
	// The function should return an error if the transaction should be rolled back.
	WithTransaction(ctx context.Context, cb InTransactionFn) error
}

TransactionalMessageRepository extends MessageRepository to support transactional operations.

Directories

Path Synopsis
adapter
psql
This package provides an implementation of the TransactionalMessageRepository interface for a PostgreSQL database.
This package provides an implementation of the TransactionalMessageRepository interface for a PostgreSQL database.
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL