postgres

package
v0.3.0-prerelease.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2024 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package postgres contains implementations of go-eventually interfaces specific to PostgreSQL, such as Aggregate Repository, Event Store, etc.

Index

Constants

View Source
const (
	// DefaultAggregateTableName is the default Aggregate table name an AggregateRepository points to.
	DefaultAggregateTableName = "aggregates"
	// DefaultEventsTableName is the default Domain Events table name an AggregateRepository points to.
	DefaultEventsTableName = "events"
	// DefaultStreamsTableName is the default Event Streams table name an AggregateRepository points to.
	DefaultStreamsTableName = "event_streams"
)

Variables

This section is empty.

Functions

func RunMigrations

func RunMigrations(db *sql.DB) error

RunMigrations runs the latest migrations for the postgres integration.

Make sure to run these in the entrypoint of your application, ideally before building a postgres interface implementation.

Types

type AggregateRepository

type AggregateRepository[ID aggregate.ID, T aggregate.Root[ID]] struct {
	// contains filtered or unexported fields
}

AggregateRepository implements the aggregate.Repository interface for PostgreSQL databases.

This implementation uses the "aggregates" table (by default) in the database as its main operational table.

At the same time, it also writes to both "events" and "event_streams" to append the Domain events recorded by Aggregate Roots. These updates are performed within the same transaction.

Note: the tables the Repository points to can be changed using the available functional options.

func NewAggregateRepository

func NewAggregateRepository[ID aggregate.ID, T aggregate.Root[ID]](
	conn *pgxpool.Pool,
	aggregateType aggregate.Type[ID, T],
	aggregateSerde serde.Bytes[T],
	messageSerde serde.Bytes[message.Message],
	options ...Option[*AggregateRepository[ID, T]],
) AggregateRepository[ID, T]

NewAggregateRepository returns a new AggregateRepository instance.

func (AggregateRepository[ID, T]) Get

func (repo AggregateRepository[ID, T]) Get(ctx context.Context, id ID) (T, error)

Get returns the aggregate.Root instance specified by the provided id. Returns aggregate.ErrRootNotFound if the Aggregate Root doesn't exist.

func (AggregateRepository[ID, T]) Save

func (repo AggregateRepository[ID, T]) Save(ctx context.Context, root T) (err error)

Save saves the new state of the provided aggregate.Root instance.

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore is an event.Store implementation targeted to PostgreSQL databases.

The implementation uses "event_streams" and "events" as their operational tables. Updates to these tables are transactional.

func NewEventStore

func NewEventStore(conn *pgxpool.Pool, messageSerde serde.Bytes[message.Message]) EventStore

NewEventStore returns a new EventStore instance.

func (EventStore) Append

func (es EventStore) Append(
	ctx context.Context,
	id event.StreamID,
	expected version.Check,
	events ...event.Envelope,
) (version.Version, error)

Append implements event.Store.

func (EventStore) Stream

func (es EventStore) Stream(
	ctx context.Context,
	stream event.StreamWrite,
	id event.StreamID,
	selector version.Selector,
) error

Stream implements the event.Streamer interface.

type Option

type Option[T any] interface {
	// contains filtered or unexported methods
}

Option can be used to change the configuration of an object.

func WithAggregateTableName

func WithAggregateTableName[ID aggregate.ID, T aggregate.Root[ID]](
	tableName string,
) Option[*AggregateRepository[ID, T]]

WithAggregateTableName allows you to specify a different Aggregate table name that an AggregateRepository should manage.

func WithEventsTableName

func WithEventsTableName[ID aggregate.ID, T aggregate.Root[ID]](tableName string) Option[*AggregateRepository[ID, T]]

WithEventsTableName allows you to specify a different Events table name that an AggregateRepository should manage.

func WithStreamsTableName

func WithStreamsTableName[ID aggregate.ID, T aggregate.Root[ID]](tableName string) Option[*AggregateRepository[ID, T]]

WithStreamsTableName allows you to specify a different Event Streams table name that an AggregateRepository should manage.

Directories

Path Synopsis
Package internal contains utilities and helper functions useful in the scope of eventually's postgres implementation.
Package internal contains utilities and helper functions useful in the scope of eventually's postgres implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL