Documentation
¶
Overview ¶
Package postgres provides a Postgres-backed es.EventStore for the synapse event sourcing toolkit, built on jackc/pgx/v5 and pgxpool.
Schema:
events(global_position BIGSERIAL PK, event_id, stream_id, version,
type, content_type, recorded_at,
causation, correlation, metadata JSONB, payload BYTEA,
UNIQUE(stream_id, version))
The schema is applied on New via CREATE TABLE IF NOT EXISTS, so repeated calls are idempotent. Pass WithoutMigrate when the schema is managed by an external tool (goose, golang-migrate, atlas).
Concurrency model:
- Append runs in parallel: no global serialization lock. Each event row records the writing transaction's xid (via DEFAULT pg_current_xact_id()); global-path subscribers filter with pg_snapshot_xmin so they only see rows whose transactions have definitely committed. Per-stream conflicts are handled by UNIQUE(stream_id, version) → *es.ConflictError. The cost is subscriber latency bounded by the oldest in-flight transaction; operators should configure idle_in_transaction_session_timeout. See ADR-0031.
- Append emits a NOTIFY on the channel "synapse_events" inside the same transaction, so the wake-up fires at COMMIT.
- A single shared goroutine per Store holds one connection running LISTEN "synapse_events" and, on each notification, wakes every live subscriber through an in-process broadcast. Woken subscribers run a cursor SELECT on a pooled connection and release it; they hold no connection while waiting. So the number of concurrent live subscribers is independent of pool size — the pool only needs to cover the one listener connection plus concurrent reads and appends. The listener starts lazily on the first live subscription and runs until Store.Close. See ADR-0025.
Because the Store owns that goroutine and its connection, callers must call Store.Close before closing the pool.
Index ¶
- Variables
- func Migrate(ctx context.Context, pool *pgxpool.Pool) error
- type Option
- type Store
- func (s *Store) Append(ctx context.Context, stream es.StreamID, expected es.Revision, ...) (es.Revision, error)
- func (s *Store) Close()
- func (s *Store) Load(ctx context.Context, stream es.StreamID, opts es.ReadOptions) iter.Seq2[es.RawEnvelope, error]
- func (s *Store) Streams(ctx context.Context) iter.Seq2[es.StreamID, error]
- func (s *Store) StreamsBySubject(ctx context.Context, subject string) iter.Seq2[es.StreamID, error]
- func (s *Store) Subscribe(ctx context.Context, opts es.SubscriptionOptions) iter.Seq2[es.RawEnvelope, error]
- func (s *Store) SubscribeStream(ctx context.Context, stream es.StreamID, opts es.SubscriptionOptions) iter.Seq2[es.RawEnvelope, error]
Constants ¶
This section is empty.
Variables ¶
var Schema string
Schema is the SQL DDL this Store requires. It is exported so users who manage migrations externally can feed it to their own tooling.
Functions ¶
Types ¶
type Option ¶
type Option func(*options)
Option configures New.
func WithReadReplica ¶ added in v0.4.0
WithReadReplica routes catch-up reads (Subscribe, SubscribeStream, Streams, StreamsBySubject, and the post-NOTIFY cursor in live tail) to pool. Append, Load, the LISTEN connection, and the head() query the Append path uses remain on the primary because they are either writes or read-your-own-writes paths. See ADR-0038.
pool is borrowed; the caller owns its lifecycle. A nil pool is a no-op so dynamic wiring code can pass either.
A separate streaming-replica Postgres instance is the intended shape, but the option does not check the replica's relationship to the primary — operators are free to wire two unrelated pools if they have a reason. The Store stays correct as long as the schema matches and the replica has eventually-consistent visibility into the events the primary appends.
func WithoutMigrate ¶
func WithoutMigrate() Option
WithoutMigrate disables the automatic schema migration that New performs by default.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store is a Postgres-backed es.EventStore.
The caller owns the pool(s) and is responsible for closing them. The Store additionally owns a single shared LISTEN goroutine that is started lazily on the first live subscription and holds one connection on the primary for its lifetime; Store.Close stops it. Call Close before closing the pool. See ADR-0025.
func New ¶
New returns a Store wrapping pool. By default applies Schema; pass WithoutMigrate to skip when migrations are external.
The returned Store must be closed with Store.Close before the pool is closed.
func (*Store) Append ¶
func (s *Store) Append( ctx context.Context, stream es.StreamID, expected es.Revision, events ...es.RawEnvelope, ) (es.Revision, error)
Append implements es.EventStore.
The flow inside one transaction: SELECT the current head version for the stream, validate against expected, INSERT events with their per-stream versions (the xid column is populated by DEFAULT), NOTIFY with the new max global_position, COMMIT. Concurrent appends to different streams run in parallel; concurrent appends to the same stream race on UNIQUE(stream_id, version) and the loser receives *es.ConflictError. See ADR-0031.
func (*Store) Close ¶
func (s *Store) Close()
Close stops the shared LISTEN goroutine and releases the connection it holds. Call Close before closing the underlying pool; otherwise pgxpool's Close blocks waiting for the listener's connection. Close is idempotent.
After Close, live subscriptions no longer receive wake-ups (their one-shot catch-up read still works); cancel their contexts to stop them.
func (*Store) Load ¶
func (s *Store) Load( ctx context.Context, stream es.StreamID, opts es.ReadOptions, ) iter.Seq2[es.RawEnvelope, error]
Load implements es.EventStore.
func (*Store) Streams ¶ added in v0.3.0
Streams implements es.EventStore. It SELECTs every distinct stream id from the events table in ascending order and streams them through the iterator. The query relies on the leading column of UNIQUE(stream_id, version) for ordering; large event tables may want a dedicated streams table or a covering index in the future.
Routes to the read pool (ADR-0038) — a replica may show freshly-appended streams a few ms later than the primary would.
func (*Store) StreamsBySubject ¶ added in v0.3.0
StreamsBySubject implements es.EventStore. Uses the partial index events_subject_idx (see schema.sql); cost scales with the number of events tagged with subject, not the total log size.
Routes to the read pool (ADR-0038).
func (*Store) Subscribe ¶
func (s *Store) Subscribe(ctx context.Context, opts es.SubscriptionOptions) iter.Seq2[es.RawEnvelope, error]
Subscribe implements es.EventStore.
func (*Store) SubscribeStream ¶
func (s *Store) SubscribeStream(ctx context.Context, stream es.StreamID, opts es.SubscriptionOptions) iter.Seq2[es.RawEnvelope, error]
SubscribeStream implements es.EventStore.