outbox

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2022 License: MIT Imports: 9 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultProcessInterval = 10 * time.Second
	DefaultClaimDuration   = 2 * time.Second
	DefaultBatchSize       = 20
)

Functions

func NamespaceFromContext added in v0.4.0

func NamespaceFromContext(ctx context.Context) string

NamespaceFromContext identifies what namespace to record published messages to in the outbox

func WithNamespace added in v0.4.0

func WithNamespace(ctx context.Context, namespace string) context.Context

WithNamespace creates a context which configures published messages to be recorded to the outbox with the specified namespace

Types

type ClaimedEntry

type ClaimedEntry struct {
	// Namespace is an identifier used to group outbox entries, e.g. for choosing what topics to route entries to
	Namespace string
	// ID is a unique identifier for any given Outbox ClaimedEntry, typically a database primary key
	ID string
	// Key to be included in the published Message
	Key []byte
	// Payload to be included in the published Message
	Payload []byte
}

ClaimedEntry is an entry in the Outbox

type Clock

type Clock interface {
	Now() time.Time
	After(c time.Duration) <-chan time.Time
}

Clock abstracts interactions with the time package to facilitate testing

type Config

type Config struct {
	// Clock abstracts interactions with the time package, defaults to a real clock implementation
	Clock Clock
	// Storage allows the processing task to claim, retrieve and delete ClaimedEntry objects
	Storage ProcessorStorage
	// Publisher is used to publish Message objects, made from ClaimedEntry objects, pulled from ProcessorStorage
	Publisher Publisher
	// ProcessInterval specifies how long the processor should spend idle without checking for work, this
	// is reset if Outbox.WakeProcessor is called
	ProcessInterval time.Duration
	// ClaimDuration specifies how long the processor will claim ClaimedEntry objects in ProcessorStorage
	ClaimDuration time.Duration
	// ProcessorID is a unique identifier for any instance of the outbox, so a horizontally scaled app
	// can run many Outbox instances, each claiming ClaimedEntry objects and publishing them
	ProcessorID string
	// BatchSize indicates how many ClaimedEntry objects to attempt to retrieve & publish in one go
	BatchSize int
	// Logger can be provided to receive logging output
	Logger logr.Logger
}

Config configures the behaviour of the Outbox

func (*Config) DefaultAndValidate

func (c *Config) DefaultAndValidate() error

DefaultAndValidate ensures the configuration is valid and, where possible, provides reasonable default values where no value is provided

type ContextSettings added in v0.4.0

type ContextSettings struct {
	Namespace string
}

ContextSettings are settings that can configure outbox behaviour through context

func (ContextSettings) Clone added in v0.4.0

func (c ContextSettings) Clone() *ContextSettings

Clone clones context settings

type Message

type Message struct {
	// Key is an optional value primarily used in streaming systems that partition
	// published messages by keys to facilitate in-order delivery and load balancing
	Key []byte
	// Payload is the actual message contents that should be published
	Payload []byte
}

Message is what will be published over some pubsub/streaming system

type Outbox

type Outbox struct {
	// contains filtered or unexported fields
}

Outbox is the primary object in the package that implements the transactional outbox pattern.

func New

func New(cfg Config) (*Outbox, error)

New attempts to construct an Outbox from the provided Config, if the Config is valid

func (*Outbox) Publish added in v0.2.0

func (o *Outbox) Publish(ctx context.Context, txn interface{}, messages ...Message) error

Publish publishes the provided messages to the outbox, and will be forwarded to the configured Publisher during one of the subsequent PumpOutbox calls

func (*Outbox) PumpOutbox

func (o *Outbox) PumpOutbox(ctx context.Context) (err error)

PumpOutbox causes the Outbox to process entries immediately. This is typically not called directly, instead called from StartProcessing. However, this is exposed partially for ease of testing, but also to facilitate customising the processing logic if the provided StartProcessing function isn't suitable for your application.

func (*Outbox) StartProcessing

func (o *Outbox) StartProcessing(ctx context.Context) error

StartProcessing blocks, processing the outbox until its context is cancelled. It wakes up to process regularly based on the Config.ProcessInterval and can be woken manually using WakeProcessor.

func (*Outbox) WakeProcessor

func (o *Outbox) WakeProcessor()

WakeProcessor is used to notify the outbox processor that new data has been written to the outbox and it should wake up and process them, rather than wait for the Config.ProcessInterval. For batch write operations, try to only call this once so the processor is likely to wake up fewer times and process them as a batch. This function does not block.

type ProcessorStorage

type ProcessorStorage interface {
	// ClaimEntries attempts to update all claimable entries as belonging to the calling processor
	ClaimEntries(ctx context.Context, processorID string, claimDeadline time.Time) error
	// GetClaimedEntries returns a batch of entries currently belonging to the calling processor
	GetClaimedEntries(ctx context.Context, processorID string, batchSize int) ([]ClaimedEntry, error)
	// DeleteEntries deletes the entries as specified by their ClaimedEntry.ID
	DeleteEntries(ctx context.Context, entryIDs ...string) error
	// Publish creates new outbox entries containing the provided messages, to be published as soon as possible
	// Note: implementations should consult the context for additional ContextSettings, e.g. namespace
	Publish(ctx context.Context, txn interface{}, messages ...Message) error
}

ProcessorStorage is the Outbox's interaction with persistence, typically a database

type PublishError

type PublishError struct {
	// Errors correlates one-to-one with the Message values passed to Publisher.Publish - if a message
	// was sent successfully it will have a nil entry, otherwise it will be an error value
	Errors []error
}

PublishError allows callers to understand which Message objects, if any, were sent successfully

func (*PublishError) Error

func (p *PublishError) Error() string

Error provides a brief string summary to implement the Error interface

func (*PublishError) ErrorCount

func (p *PublishError) ErrorCount() (count int)

ErrorCount counts how many messages failed to publish

type Publisher

type Publisher interface {
	// Publish attempts to write the given messages to a destination. It may return a PublishError
	// to indicate which messages were published successfully.
	// Note: implementations should consult the context for additional ContextSettings, e.g. namespace
	Publish(ctx context.Context, messages ...Message) error
}

Publisher is something that can take a batch of Message objects and attempt to publish them. Note that this interface is useful both as:

  • The destination that the Outbox will write Message objects to, e.g. some external pubsub/stream
  • A promise from your application's persistence layer which - as part of some ongoing transaction - will write the given Message objects to the underlying ProcessorStorage for later publishing

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL