Version: v0.1.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2023 License: MPL-2.0 Imports: 11 Imported by: 0



Package outbox implements the transactional outbox pattern for Encore's pubsub package.

It works by binding a pubsub topic to a transaction, translating all calls to Publish into inserting a database row in an outbox table. If/when the transaction later commits, the messages are picked up by a relay that polls the outbox table and publishes the messages to the actual Pub/Sub topic.

Both the topic binding and the relay support pluggable storage backends, enabling using the outbox pattern with any transactional storage backend. The outbox package provides an implementation using Encore's built-in SQL database support.

Note that the topics to process have to be registered with the relay using RegisterTopic.

Package outbox implements the transactional outbox pattern for ensuring data consistency between a transactional data store and a Pub/Sub topic.

Package outbox implements the transactional outbox pattern for ensuring data consistency between a transactional data store and a Pub/Sub topic.



This section is empty.


This section is empty.


func Bind

func Bind[T any](topic pubsub.Publisher[T], persist PersistFunc) pubsub.Publisher[T]

Bind binds a topic reference with a persist function, Outbox topics can be used to publish messages with the same interface as a *pubsub.Topic, but where calls to Publish are recorded in the outbox instead of being published to the actual topic.

func RegisterTopic

func RegisterTopic[T any](r *Relay, topic pubsub.Publisher[T])

RegisterTopic registers a topic with a relay, enabling the relay to process messages for that topic.


type MessageBatch

type MessageBatch interface {
	// Messages returns the messages in the batch.
	// The messages must be returned in the order they were inserted into the outbox
	// within each topic (ordering is only defined within a topic, not between topics).
	// Messages must be held exclusively by the batch; other processes
	// are not allowed to check out the same messages while the batch is open.
	Messages() []PersistedMessage

	// MarkPublished marks a persisted message as being successfully published.
	// The store may choose what to do with the message, such as deleting it or
	// marking the message as having been published, as long as it is no longer
	// returned by subsequent calls to CheckoutBatch (assuming Close successfully commits
	// the batch's transaction, if applicable).
	// If the batch is implemented in a transactional way it is recommended
	// the deletion to take effect on the subsequent call to Close.
	MarkPublished(ctx context.Context, msg PersistedMessage, publishID string) error

	// Close closes the batch, committing any MarkPublished changes (if the store is transactional)
	// and releasing the lock on the remaining persisted messages in the batch (if any).
	Close() error

MessageBatch is a batch of messages that have been checked out for publishing via a Relay.

type PersistFunc

type PersistFunc func(ctx context.Context, topicName string, msg any) (id string, err error)

func PgxTxPersister

func PgxTxPersister(tx pgx.Tx) PersistFunc

PgxTxPersister is like TxPersister but for a pgx.Tx.

func StdlibTxPersister

func StdlibTxPersister(tx *sql.Tx) PersistFunc

StdlibTxPersister is like TxPersister but for a *sql.Tx.

func TxPersister

func TxPersister(tx *sqldb.Tx) PersistFunc

TxPersister returns a PersistFunc that inserts published messages within the given transaction.

type PersistedMessage

type PersistedMessage struct {
	// MessageID is the unique id of the persisted message.
	MessageID any

	// TopicName is the name of the topic the message is for.
	TopicName string

	// Data is the serialized message data.
	Data []byte

A PersistedMessage represents a checked-out persisted message in a Store.

type Relay

type Relay struct {
	// contains filtered or unexported fields

A Relay polls an outbox for new messages and publishes them.

Topics must be registered with the relay to be processed using RegisterTopic.

func NewRelay

func NewRelay(store Store) *Relay

NewRelay creates a new relay using the given store.

To start polling for messages, call PollForMessages. At any point you may also call ProcessMessages to immediately process any messages that have been persisted.

func (*Relay) PollForMessages

func (r *Relay) PollForMessages(ctx context.Context, batchSize int)

PollForMessages polls for new messages to publish, processing up to batchSize messages each time. If batchSize is <= 0 it defaults to 100.

It blocks (does not return) until ctx is cancelled.

func (*Relay) ProcessMessages

func (r *Relay) ProcessMessages(ctx context.Context, limit int) (numPublishSuccesses, numPublishErrs int, storeErr error)

ProcessMessages processes new messages, publishing them to the appropriate Pub/Sub topics. It processes up to limit new messages. If limit is <= 0 it defaults to 100.

It returns the number of topic publishing successes and errors, as well as any Store-related error, which can be used to adaptively change poll frequency.

type Store

type Store interface {
	// CheckoutBatch checks out a batch of messages in the outbox.
	// The messages returned must only belong to the given topics.
	// Messages in a batch must be held exclusively by the batch; other processes
	// are not allowed to check out the same messages while the batch is open.
	// The provided limit is a hint for how many total messages should be returned maximum.
	// CheckoutBatch may return (nil, nil) to indicate there were no matching messages in the outbox.
	CheckoutBatch(ctx context.Context, topicNames []string, limit int) (MessageBatch, error)

The Store interface implements the persistence part of the outbox.

func SQLDBStore

func SQLDBStore(db *sqldb.Database) Store

SQLDBStore returns a Store implementation backed by a *sqldb.Database. The provided database must have a single table with the following structure:

CREATE INDEX outbox_topic_idx ON outbox (topic, id);

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL