postgresengine

package
v1.4.1-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2025 License: GPL-3.0 Imports: 15 Imported by: 0

Documentation

Overview

Package postgresengine provides a PostgreSQL implementation of the eventstore interface.

This package implements dynamic event streams using PostgreSQL as the storage backend, supporting multiple database adapters (pgx, sql.DB, sqlx) with atomic operations and concurrency control.

Key features:

  • Multiple database adapter support (PGX, SQL, SQLX)
  • Atomic event appending with concurrency conflict detection
  • Dynamic event stream filtering with JSON predicate support
  • Configurable table names and structured logging support
  • OpenTelemetry-compatible metrics and distributed tracing for comprehensive observability
  • Transaction-safe operations with proper resource cleanup

Usage examples:

// Basic usage
db, _ := pgxpool.New(context.Background(), dsn)
store, _ := postgresengine.NewEventStoreFromPGXPool(db)

// With logging, metrics, and tracing (production observability)
store, _ := postgresengine.NewEventStoreFromPGXPool(
	db,
	postgresengine.WithTableName("my_events"),
	postgresengine.WithLogger(logger),
	postgresengine.WithMetrics(metricsCollector),
	postgresengine.WithTracing(tracingCollector),
)

events, maxSeq, _ := store.Query(ctx, filter)
err := store.Append(ctx, filter, maxSeq, newEvent)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ContextualLogger

type ContextualLogger = eventstore.ContextualLogger

ContextualLogger is an alias for eventstore.ContextualLogger for convenience when using postgresengine. It provides methods for context-aware logging with automatic trace correlation.

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore represents a storage mechanism for appending and querying events in an event sourcing implementation. It leverages a database adapter and supports customizable logging, metricsCollector collection, tracing, and event table configuration.

func NewEventStoreFromPGXPool

func NewEventStoreFromPGXPool(db *pgxpool.Pool, options ...Option) (*EventStore, error)

NewEventStoreFromPGXPool creates a new EventStore using a pgx.Pool with optional configuration.

func NewEventStoreFromPGXPoolAndReplica

func NewEventStoreFromPGXPoolAndReplica(db *pgxpool.Pool, replica *pgxpool.Pool, options ...Option) (*EventStore, error)

NewEventStoreFromPGXPoolAndReplica creates a new EventStore using a primary pgx.Pool and a replica pgx.Pool with optional configuration.

func NewEventStoreFromSQLDB

func NewEventStoreFromSQLDB(db *sql.DB, options ...Option) (*EventStore, error)

NewEventStoreFromSQLDB creates a new EventStore using a sql.DB with optional configuration.

func NewEventStoreFromSQLDBAndReplica

func NewEventStoreFromSQLDBAndReplica(db *sql.DB, replica *sql.DB, options ...Option) (*EventStore, error)

NewEventStoreFromSQLDBAndReplica creates a new EventStore using a primary sql.DB and a replica sql.DB with optional configuration.

func NewEventStoreFromSQLX

func NewEventStoreFromSQLX(db *sqlx.DB, options ...Option) (*EventStore, error)

NewEventStoreFromSQLX creates a new EventStore using a sqlx.DB with optional configuration.

func NewEventStoreFromSQLXAndReplica

func NewEventStoreFromSQLXAndReplica(db *sqlx.DB, replica *sqlx.DB, options ...Option) (*EventStore, error)

NewEventStoreFromSQLXAndReplica creates a new EventStore using a primary sqlx.DB and a replica sqlx.DB with optional configuration.

func (*EventStore) Append

func (es *EventStore) Append(
	ctx context.Context,
	filter eventstore.Filter,
	expectedMaxSequenceNumber eventstore.MaxSequenceNumberUint,
	storableEvents ...eventstore.StorableEvent,
) error

Append attempts to append one or multiple eventstore.StorableEvent(s) onto the Postgres event store respecting concurrency constraints for this "dynamic event stream" based on the provided eventstore.Filter criteria and the expected MaxSequenceNumberUint.

The provided eventstore.Filter criteria should be the same as the ones used for the Query before making the business decisions.

The insert query to append multiple events atomically is heavier than the one built to append a single event. In event-sourced applications, one command/request should typically only produce one event. Only supply multiple events if you are sure that you need to append multiple events at once!

func (*EventStore) DeleteSnapshot

func (es *EventStore) DeleteSnapshot(ctx context.Context, projectionType string, filter eventstore.Filter) error

DeleteSnapshot removes the snapshot for the given projection type and filter. Returns nil if the snapshot doesn't exist (idempotent operation).

func (*EventStore) LoadSnapshot

func (es *EventStore) LoadSnapshot(ctx context.Context, projectionType string, filter eventstore.Filter) (*eventstore.Snapshot, error)

LoadSnapshot retrieves the snapshot for the given projection type and filter. Returns (nil, nil) if no snapshot exists.

func (*EventStore) Query

Query retrieves events from the Postgres event store based on the provided eventstore.Filter criteria and returns them as eventstore.StorableEvents as well as the MaxSequenceNumberUint for this "dynamic event stream" at the time of the query.

func (*EventStore) SaveSnapshot

func (es *EventStore) SaveSnapshot(ctx context.Context, snapshot eventstore.Snapshot) error

SaveSnapshot stores or updates a snapshot for the given projection type and filter. If a snapshot already exists, it will be updated only if the new sequence number is higher.

type Logger

type Logger = eventstore.Logger

Logger is an alias for eventstore.Logger for convenience when using postgresengine. It provides methods for SQL query logging, operational metrics, warnings, and error reporting.

type MetricsCollector

type MetricsCollector = eventstore.MetricsCollector

MetricsCollector is an alias for eventstore.MetricsCollector for convenience when using postgresengine. It provides methods for collecting EventStore performance and operational metrics.

type Option

type Option func(*EventStore) error

Option defines a functional option for configuring EventStore.

func WithContextualLogger

func WithContextualLogger(logger eventstore.ContextualLogger) Option

WithContextualLogger sets the contextual logger for the EventStore. The contextual logger will receive log messages with context information including automatic trace/span correlation when tracing is enabled, enabling unified observability.

func WithLogger

func WithLogger(logger eventstore.Logger) Option

WithLogger sets the logger for the EventStore. The logger will receive messages at different levels based on the logger's configured level:

Debug level: SQL queries with execution timing (development use) Info level: Event counts, durations, concurrency conflicts (production-safe) Warn level: Non-critical issues like cleanup failures Error level: Critical failures that cause operation failures.

func WithMetrics

func WithMetrics(collector eventstore.MetricsCollector) Option

WithMetrics sets the metricsCollector collector for the EventStore. The metricsCollector collector will receive performance and operational metricsCollector including query/append durations, event counts, concurrency conflicts, and database errors.

func WithTableName

func WithTableName(tableName string) Option

WithTableName sets the table name for the EventStore.

func WithTracing

func WithTracing(collector eventstore.TracingCollector) Option

WithTracing sets the tracing collector for the EventStore. The tracing collector will receive distributed tracing information including span creation for query/append operations, context propagation, and error tracking.

type SpanContext

type SpanContext = eventstore.SpanContext

SpanContext is an alias for eventstore.SpanContext for convenience when using postgresengine. It represents an active tracing span that can be finished and updated with attributes.

type TracingCollector

type TracingCollector = eventstore.TracingCollector

TracingCollector is an alias for eventstore.TracingCollector for convenience when using postgresengine. It provides methods for collecting distributed tracing information from EventStore operations.

Directories

Path Synopsis
internal
adapters
Package adapters provide database adapter implementations for the PostgreSQL event store.
Package adapters provide database adapter implementations for the PostgreSQL event store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL