postgres

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package postgres provides a Postgres-backed es.EventStore for the synapse event sourcing toolkit, built on jackc/pgx/v5 and pgxpool.

Schema:

events(global_position BIGSERIAL PK, event_id, stream_id, version,
       type, content_type, recorded_at,
       causation, correlation, metadata JSONB, payload BYTEA,
       UNIQUE(stream_id, version))

The schema is applied on New via CREATE TABLE IF NOT EXISTS, so repeated calls are idempotent. Pass WithoutMigrate when the schema is managed by an external tool (goose, golang-migrate, atlas).

Concurrency model:

  • Append runs in parallel: no global serialization lock. Each event row records the writing transaction's xid (via DEFAULT pg_current_xact_id()); global-path subscribers filter with pg_snapshot_xmin so they only see rows whose transactions have definitely committed. Per-stream conflicts are handled by UNIQUE(stream_id, version) → *es.ConflictError. The cost is subscriber latency bounded by the oldest in-flight transaction; operators should configure idle_in_transaction_session_timeout. See ADR-0031.
  • Append emits a NOTIFY on the channel "synapse_events" inside the same transaction, so the wake-up fires at COMMIT.
  • A single shared goroutine per Store holds one connection running LISTEN "synapse_events" and, on each notification, wakes every live subscriber through an in-process broadcast. Woken subscribers run a cursor SELECT on a pooled connection and release it; they hold no connection while waiting. So the number of concurrent live subscribers is independent of pool size — the pool only needs to cover the one listener connection plus concurrent reads and appends. The listener starts lazily on the first live subscription and runs until Store.Close. See ADR-0025.

Because the Store owns that goroutine and its connection, callers must call Store.Close before closing the pool.

Index

Constants

This section is empty.

Variables

View Source
var Schema string

Schema is the SQL DDL this Store requires. It is exported so users who manage migrations externally can feed it to their own tooling.

Functions

func Migrate

func Migrate(ctx context.Context, pool *pgxpool.Pool) error

Migrate applies Schema to the pool. Idempotent.

Types

type Option

type Option func(*options)

Option configures New.

func WithReadReplica added in v0.4.0

func WithReadReplica(pool *pgxpool.Pool) Option

WithReadReplica routes catch-up reads (Subscribe, SubscribeStream, Streams, StreamsBySubject, and the post-NOTIFY cursor in live tail) to pool. Append, Load, the LISTEN connection, and the head() query the Append path uses remain on the primary because they are either writes or read-your-own-writes paths. See ADR-0038.

pool is borrowed; the caller owns its lifecycle. A nil pool is a no-op so dynamic wiring code can pass either.

A separate streaming-replica Postgres instance is the intended shape, but the option does not check the replica's relationship to the primary — operators are free to wire two unrelated pools if they have a reason. The Store stays correct as long as the schema matches and the replica has eventually-consistent visibility into the events the primary appends.

func WithoutMigrate

func WithoutMigrate() Option

WithoutMigrate disables the automatic schema migration that New performs by default.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is a Postgres-backed es.EventStore.

The caller owns the pool(s) and is responsible for closing them. The Store additionally owns a single shared LISTEN goroutine that is started lazily on the first live subscription and holds one connection on the primary for its lifetime; Store.Close stops it. Call Close before closing the pool. See ADR-0025.

func New

func New(ctx context.Context, pool *pgxpool.Pool, opts ...Option) (*Store, error)

New returns a Store wrapping pool. By default applies Schema; pass WithoutMigrate to skip when migrations are external.

The returned Store must be closed with Store.Close before the pool is closed.

func (*Store) Append

func (s *Store) Append(
	ctx context.Context,
	stream es.StreamID,
	expected es.Revision,
	events ...es.RawEnvelope,
) (es.Revision, error)

Append implements es.EventStore.

The flow inside one transaction: SELECT the current head version for the stream, validate against expected, INSERT events with their per-stream versions (the xid column is populated by DEFAULT), NOTIFY with the new max global_position, COMMIT. Concurrent appends to different streams run in parallel; concurrent appends to the same stream race on UNIQUE(stream_id, version) and the loser receives *es.ConflictError. See ADR-0031.

func (*Store) Close

func (s *Store) Close()

Close stops the shared LISTEN goroutine and releases the connection it holds. Call Close before closing the underlying pool; otherwise pgxpool's Close blocks waiting for the listener's connection. Close is idempotent.

After Close, live subscriptions no longer receive wake-ups (their one-shot catch-up read still works); cancel their contexts to stop them.

func (*Store) Load

func (s *Store) Load(
	ctx context.Context,
	stream es.StreamID,
	opts es.ReadOptions,
) iter.Seq2[es.RawEnvelope, error]

Load implements es.EventStore.

func (*Store) Streams added in v0.3.0

func (s *Store) Streams(ctx context.Context) iter.Seq2[es.StreamID, error]

Streams implements es.EventStore. It SELECTs every distinct stream id from the events table in ascending order and streams them through the iterator. The query relies on the leading column of UNIQUE(stream_id, version) for ordering; large event tables may want a dedicated streams table or a covering index in the future.

Routes to the read pool (ADR-0038) — a replica may show freshly-appended streams a few ms later than the primary would.

func (*Store) StreamsBySubject added in v0.3.0

func (s *Store) StreamsBySubject(ctx context.Context, subject string) iter.Seq2[es.StreamID, error]

StreamsBySubject implements es.EventStore. Uses the partial index events_subject_idx (see schema.sql); cost scales with the number of events tagged with subject, not the total log size.

Routes to the read pool (ADR-0038).

func (*Store) Subscribe

func (s *Store) Subscribe(ctx context.Context, opts es.SubscriptionOptions) iter.Seq2[es.RawEnvelope, error]

Subscribe implements es.EventStore.

func (*Store) SubscribeStream

func (s *Store) SubscribeStream(ctx context.Context, stream es.StreamID, opts es.SubscriptionOptions) iter.Seq2[es.RawEnvelope, error]

SubscribeStream implements es.EventStore.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL