eventuallypostgres

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2023 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Package eventuallypostgres contains implementations of go-eventually interfaces specific to PostgreSQL, such as Aggregate Repository, Event Store, etc.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunMigrations

func RunMigrations(db *sql.DB) error

RunMigrations runs the latest migrations for the postgres integration.

Make sure to run these in the entrypoint of your application, ideally before building a postgres interface implementation.

Types

type AggregateRepository

type AggregateRepository[ID aggregate.ID, T aggregate.Root[ID]] struct {
	Conn           *pgxpool.Pool
	AggregateType  aggregate.Type[ID, T]
	AggregateSerde serde.Bytes[T]
	MessageSerde   serde.Bytes[message.Message]
}

AggregateRepository implements the aggregate.Repository interface for PostgreSQL databases.

This implementation uses the "aggregates" table in the database as its main operational table. At the same time, it also writes to both "events" and "event_streams" to append the Domain events recorded by Aggregate Roots. These updates are performed within the same transaction.

func (AggregateRepository[ID, T]) Get

func (repo AggregateRepository[ID, T]) Get(ctx context.Context, id ID) (T, error)

Get returns the aggregate.Root instance specified by the provided id. Returns aggregate.ErrRootNotFound if the Aggregate Root doesn't exist.

func (AggregateRepository[ID, T]) Save

func (repo AggregateRepository[ID, T]) Save(ctx context.Context, root T) error

Save saves the new state of the provided aggregate.Root instance.

type EventStore

type EventStore struct {
	Conn  *pgxpool.Pool
	Serde serde.Bytes[message.Message]
}

EventStore is an event.Store implementation targeted to PostgreSQL databases.

The implementation uses "event_streams" and "events" as their operational tables. Updates to these tables are transactional.

func (EventStore) Append

func (es EventStore) Append(
	ctx context.Context,
	id event.StreamID,
	expected version.Check,
	events ...event.Envelope,
) (version.Version, error)

Append implements event.Store.

func (EventStore) Stream

func (es EventStore) Stream(
	ctx context.Context,
	stream event.StreamWrite,
	id event.StreamID,
	selector version.Selector,
) error

Stream implements the event.Streamer interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL