Documentation
¶
Overview ¶
Package postgresengine provides a PostgreSQL implementation of the eventstore interface.
This package implements dynamic event streams using PostgreSQL as the storage backend, supporting multiple database adapters (pgx, sql.DB, sqlx) with atomic operations and concurrency control.
Key features:
- Multiple database adapter support (PGX, SQL, SQLX)
- Atomic event appending with concurrency conflict detection
- Dynamic event stream filtering with JSON predicate support
- Configurable table names and structured logging support
- OpenTelemetry-compatible metrics and distributed tracing for comprehensive observability
- Transaction-safe operations with proper resource cleanup
Usage examples:
// Basic usage db, _ := pgxpool.New(context.Background(), dsn) store, _ := postgresengine.NewEventStoreFromPGXPool(db) // With logging, metrics, and tracing (production observability) store, _ := postgresengine.NewEventStoreFromPGXPool( db, postgresengine.WithTableName("my_events"), postgresengine.WithLogger(logger), postgresengine.WithMetrics(metricsCollector), postgresengine.WithTracing(tracingCollector), ) events, maxSeq, _ := store.Query(ctx, filter) err := store.Append(ctx, filter, maxSeq, newEvent)
Index ¶
- type ContextualLogger
- type EventStore
- func NewEventStoreFromPGXPool(db *pgxpool.Pool, options ...Option) (*EventStore, error)
- func NewEventStoreFromPGXPoolAndReplica(db *pgxpool.Pool, replica *pgxpool.Pool, options ...Option) (*EventStore, error)
- func NewEventStoreFromSQLDB(db *sql.DB, options ...Option) (*EventStore, error)
- func NewEventStoreFromSQLDBAndReplica(db *sql.DB, replica *sql.DB, options ...Option) (*EventStore, error)
- func NewEventStoreFromSQLX(db *sqlx.DB, options ...Option) (*EventStore, error)
- func NewEventStoreFromSQLXAndReplica(db *sqlx.DB, replica *sqlx.DB, options ...Option) (*EventStore, error)
- func (es *EventStore) Append(ctx context.Context, filter eventstore.Filter, ...) error
- func (es *EventStore) DeleteSnapshot(ctx context.Context, projectionType string, filter eventstore.Filter) error
- func (es *EventStore) LoadSnapshot(ctx context.Context, projectionType string, filter eventstore.Filter) (*eventstore.Snapshot, error)
- func (es *EventStore) Query(ctx context.Context, filter eventstore.Filter) (eventstore.StorableEvents, eventstore.MaxSequenceNumberUint, error)
- func (es *EventStore) SaveSnapshot(ctx context.Context, snapshot eventstore.Snapshot) error
- type Logger
- type MetricsCollector
- type Option
- type SpanContext
- type TracingCollector
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ContextualLogger ¶
type ContextualLogger = eventstore.ContextualLogger
ContextualLogger is an alias for eventstore.ContextualLogger for convenience when using postgresengine. It provides methods for context-aware logging with automatic trace correlation.
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore represents a storage mechanism for appending and querying events in an event sourcing implementation. It leverages a database adapter and supports customizable logging, metricsCollector collection, tracing, and event table configuration.
func NewEventStoreFromPGXPool ¶
func NewEventStoreFromPGXPool(db *pgxpool.Pool, options ...Option) (*EventStore, error)
NewEventStoreFromPGXPool creates a new EventStore using a pgx.Pool with optional configuration.
func NewEventStoreFromPGXPoolAndReplica ¶
func NewEventStoreFromPGXPoolAndReplica(db *pgxpool.Pool, replica *pgxpool.Pool, options ...Option) (*EventStore, error)
NewEventStoreFromPGXPoolAndReplica creates a new EventStore using a primary pgx.Pool and a replica pgx.Pool with optional configuration.
func NewEventStoreFromSQLDB ¶
func NewEventStoreFromSQLDB(db *sql.DB, options ...Option) (*EventStore, error)
NewEventStoreFromSQLDB creates a new EventStore using a sql.DB with optional configuration.
func NewEventStoreFromSQLDBAndReplica ¶
func NewEventStoreFromSQLDBAndReplica(db *sql.DB, replica *sql.DB, options ...Option) (*EventStore, error)
NewEventStoreFromSQLDBAndReplica creates a new EventStore using a primary sql.DB and a replica sql.DB with optional configuration.
func NewEventStoreFromSQLX ¶
func NewEventStoreFromSQLX(db *sqlx.DB, options ...Option) (*EventStore, error)
NewEventStoreFromSQLX creates a new EventStore using a sqlx.DB with optional configuration.
func NewEventStoreFromSQLXAndReplica ¶
func NewEventStoreFromSQLXAndReplica(db *sqlx.DB, replica *sqlx.DB, options ...Option) (*EventStore, error)
NewEventStoreFromSQLXAndReplica creates a new EventStore using a primary sqlx.DB and a replica sqlx.DB with optional configuration.
func (*EventStore) Append ¶
func (es *EventStore) Append( ctx context.Context, filter eventstore.Filter, expectedMaxSequenceNumber eventstore.MaxSequenceNumberUint, storableEvents ...eventstore.StorableEvent, ) error
Append attempts to append one or multiple eventstore.StorableEvent(s) onto the Postgres event store respecting concurrency constraints for this "dynamic event stream" based on the provided eventstore.Filter criteria and the expected MaxSequenceNumberUint.
The provided eventstore.Filter criteria should be the same as the ones used for the Query before making the business decisions.
The insert query to append multiple events atomically is heavier than the one built to append a single event. In event-sourced applications, one command/request should typically only produce one event. Only supply multiple events if you are sure that you need to append multiple events at once!
func (*EventStore) DeleteSnapshot ¶
func (es *EventStore) DeleteSnapshot(ctx context.Context, projectionType string, filter eventstore.Filter) error
DeleteSnapshot removes the snapshot for the given projection type and filter. Returns nil if the snapshot doesn't exist (idempotent operation).
func (*EventStore) LoadSnapshot ¶
func (es *EventStore) LoadSnapshot(ctx context.Context, projectionType string, filter eventstore.Filter) (*eventstore.Snapshot, error)
LoadSnapshot retrieves the snapshot for the given projection type and filter. Returns (nil, nil) if no snapshot exists.
func (*EventStore) Query ¶
func (es *EventStore) Query(ctx context.Context, filter eventstore.Filter) ( eventstore.StorableEvents, eventstore.MaxSequenceNumberUint, error, )
Query retrieves events from the Postgres event store based on the provided eventstore.Filter criteria and returns them as eventstore.StorableEvents as well as the MaxSequenceNumberUint for this "dynamic event stream" at the time of the query.
func (*EventStore) SaveSnapshot ¶
func (es *EventStore) SaveSnapshot(ctx context.Context, snapshot eventstore.Snapshot) error
SaveSnapshot stores or updates a snapshot for the given projection type and filter. If a snapshot already exists, it will be updated only if the new sequence number is higher.
type Logger ¶
type Logger = eventstore.Logger
Logger is an alias for eventstore.Logger for convenience when using postgresengine. It provides methods for SQL query logging, operational metrics, warnings, and error reporting.
type MetricsCollector ¶
type MetricsCollector = eventstore.MetricsCollector
MetricsCollector is an alias for eventstore.MetricsCollector for convenience when using postgresengine. It provides methods for collecting EventStore performance and operational metrics.
type Option ¶
type Option func(*EventStore) error
Option defines a functional option for configuring EventStore.
func WithContextualLogger ¶
func WithContextualLogger(logger eventstore.ContextualLogger) Option
WithContextualLogger sets the contextual logger for the EventStore. The contextual logger will receive log messages with context information including automatic trace/span correlation when tracing is enabled, enabling unified observability.
func WithLogger ¶
func WithLogger(logger eventstore.Logger) Option
WithLogger sets the logger for the EventStore. The logger will receive messages at different levels based on the logger's configured level:
Debug level: SQL queries with execution timing (development use) Info level: Event counts, durations, concurrency conflicts (production-safe) Warn level: Non-critical issues like cleanup failures Error level: Critical failures that cause operation failures.
func WithMetrics ¶
func WithMetrics(collector eventstore.MetricsCollector) Option
WithMetrics sets the metricsCollector collector for the EventStore. The metricsCollector collector will receive performance and operational metricsCollector including query/append durations, event counts, concurrency conflicts, and database errors.
func WithTableName ¶
WithTableName sets the table name for the EventStore.
func WithTracing ¶
func WithTracing(collector eventstore.TracingCollector) Option
WithTracing sets the tracing collector for the EventStore. The tracing collector will receive distributed tracing information including span creation for query/append operations, context propagation, and error tracking.
type SpanContext ¶
type SpanContext = eventstore.SpanContext
SpanContext is an alias for eventstore.SpanContext for convenience when using postgresengine. It represents an active tracing span that can be finished and updated with attributes.
type TracingCollector ¶
type TracingCollector = eventstore.TracingCollector
TracingCollector is an alias for eventstore.TracingCollector for convenience when using postgresengine. It provides methods for collecting distributed tracing information from EventStore operations.