dynamic-streams-eventstore-go

A Go-based Event Store implementation for Event Sourcing with PostgreSQL, operating on Dynamic Event Streams (also known as Dynamic Consistency Boundaries).
Unlike traditional event stores with fixed streams tied to specific entities, this approach enables atomic entity-independent operations while maintaining strong consistency through PostgreSQL's ACID guarantees.
β¨ Key Features
- π Dynamic Event Streams: Query and modify events across multiple entities atomically
- πΈ Snapshot Support: Efficient projection state storage with incremental updates and sequence number tracking
- β‘ High Performance: Sub-millisecond queries (~0.36ms), ~3.1ms atomic appends with optimistic locking
- π‘οΈ ACID Transactions: PostgreSQL-backed consistency without distributed transactions
- π― Fluent Filter API: Type-safe, expressive event filtering with compile-time validation
- π JSON-First: Efficient JSONB storage with GIN index optimization
- π Multiple Adapters: Support for pgx/v5, database/sql, and sqlx database connections
- π’ Primary-Replica Support: Context-based query routing for PostgreSQL streaming replication setups
- π Structured Logging: Configurable SQL query logging and operational monitoring (slog, zerolog, logrus compatible)
- π OpenTelemetry Compatible Contextual Logging: Context-aware logging with automatic trace correlation
- π OpenTelemetry Compatible Metrics: Comprehensive observability with duration, counters, error tracking, and context cancellation/timeout detection
- π OpenTelemetry Compatible Tracing: Dependency-free tracing interface for OpenTelemetry, Jaeger, and custom backends
- π OpenTelemetry Ready-to-Use Adapters: Official plug-and-play adapters for immediate OpenTelemetry integration
π Quick Start
go get github.com/AntonStoeckl/dynamic-streams-eventstore-go
// Create event store with pgx adapter (default)
eventStore, err := postgresengine.NewEventStoreFromPGXPool(pgxPool)
if err != nil {
log.Fatal(err)
}
// Or use alternative adapters:
// eventStore, err := postgresengine.NewEventStoreFromSQLDB(sqlDB)
// eventStore, err := postgresengine.NewEventStoreFromSQLX(sqlxDB)
// Query events spanning multiple entities
filter := BuildEventFilter().
Matching().
AnyEventTypeOf(
"BookCopyAddedToCirculation", "BookCopyRemovedFromCirculation",
"ReaderRegistered", "BookCopyLentToReader", "BookCopyReturnedByReader").
AndAnyPredicateOf(
P("BookID", bookID),
P("ReaderID", readerID)).
Finalize()
// Atomic operation: Query β Business Logic β Append
events, maxSeq, _ := eventStore.Query(ctx, filter)
newEvent := applyBusinessLogic(events)
err := eventStore.Append(ctx, filter, maxSeq, newEvent)
// Snapshot support for efficient projections
snapshot, _ := eventStore.LoadSnapshot(ctx, "BooksInCirculation", filter)
// ... build projection from events since snapshot ...
newSnapshot, _ := eventstore.BuildSnapshot("BooksInCirculation", filter.Hash(), maxSeq, projectionData)
eventStore.SaveSnapshot(ctx, newSnapshot)
π‘ The Dynamic Streams Advantage
Traditional Event Sourcing:
BookCirculation: [BookCopyAddedToCirculation, BookCopyRemovedFromCirculation, ...] β Separate streams
ReaderAccount: [ReaderRegistered, ReaderContractCanceled, ...] β Separate streams
BookLending: [BookCopyLentToReader, BookCopyReturnedByReader, ...] β Separate streams
Dynamic Event Streams:
Entity-independent: [BookCopyAddedToCirculation, BookCopyRemovedFromCirculation, ReaderRegistered,
ReaderContractCanceled, BookCopyLentToReader, BookCopyReturnedByReader, ...] β Single atomic boundary
This eliminates complex synchronization between entities while maintaining strong consistency
(see Performance for detailed benchmarks).
See Core Concepts for a more detailed description.
π OpenTelemetry Integration
For users with existing OpenTelemetry setups, we provide ready-to-use adapters that require zero configuration. The EventStore library uses dependency-free observability interfaces (Logger
, ContextualLogger
, MetricsCollector
, TracingCollector
) to avoid forcing specific observability dependencies on users. Our OpenTelemetry adapters bridge those interfaces to OpenTelemetry, providing:
- Zero-config integration for OpenTelemetry users
- Automatic trace correlation in logs
- Production-ready implementations using OpenTelemetry best practices
- Engine-agnostic design - works with any EventStore engine (PostgreSQL, future MongoDB, etc.)
Quick Start
import "github.com/AntonStoeckl/dynamic-streams-eventstore-go/eventstore/oteladapters"
// Zero-config OpenTelemetry integration
tracer := otel.Tracer("eventstore")
meter := otel.Meter("eventstore")
eventStore, err := postgresengine.NewEventStoreFromPGXPool(pgxPool,
postgresengine.WithTracing(oteladapters.NewTracingCollector(tracer)),
postgresengine.WithMetrics(oteladapters.NewMetricsCollector(meter)),
postgresengine.WithContextualLogger(oteladapters.NewSlogBridgeLogger("eventstore")),
)
Available Adapters
1. Contextual Logger Adapters
SlogBridgeLogger (Recommended)
Uses the official OpenTelemetry slog bridge for automatic trace correlation:
// Option 1: Pure OpenTelemetry with automatic trace correlation
logger := oteladapters.NewSlogBridgeLogger("eventstore")
// Option 2: Use your existing slog.Handler (no trace correlation)
slogHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})
logger := oteladapters.NewSlogBridgeLoggerWithHandler(slogHandler)
Benefits:
- β
Automatic trace/span ID injection when using
NewSlogBridgeLogger()
- β
Zero configuration - uses global OpenTelemetry LoggerProvider
- β
Handler compatibility -
NewSlogBridgeLoggerWithHandler()
for existing setups
- β
Production-ready with minimal setup
Trace Correlation Example:
// Without trace context:
{"level":"INFO","msg":"Query executed","duration":"150ms"}
// With active trace context:
{"level":"INFO","msg":"Query executed","duration":"150ms","trace_id":"abc123","span_id":"def456"}
OTelLogger (Advanced)
Direct OpenTelemetry logging API integration for when you need direct control over OpenTelemetry log records.
2. Metrics Collector
Maps EventStore metrics to OpenTelemetry instruments:
meter := otel.Meter("eventstore")
collector := oteladapters.NewMetricsCollector(meter)
Instrument Mapping:
RecordDuration(...)
β Histogram (for operation durations)
IncrementCounter(...)
β Counter (for operation counts, errors, cancellations, timeouts)
RecordValue(...)
β Gauge (for current values, concurrent operations)
Context Error Detection:
- Context Cancellation: Automatically detects
context.Canceled
errors from user/client cancellations
- Context Timeout: Automatically detects
context.DeadlineExceeded
errors from system timeouts
- Robust Error Handling: Works with database driver error wrapping (
errors.Join
, custom wrappers)
- Separate Metrics: Distinct tracking for cancellations vs timeouts vs regular errors
3. Tracing Collector
Creates OpenTelemetry spans for EventStore operations:
tracer := otel.Tracer("eventstore")
collector := oteladapters.NewTracingCollector(tracer)
Features:
- Automatic span creation for Query/Append operations
- Context propagation across operations
- Error status mapping and attribute injection
- Proper span lifecycle management
Production Configuration
For production environments, consider:
// Use OTLP exporters instead of stdout
traceExporter := otlptrace.New(...)
metricExporter := otlpmetric.New(...)
logExporter := otlplog.New(...)
// Configure with environment variables
// OTEL_EXPORTER_OTLP_ENDPOINT=https://your-collector:4317
// OTEL_SERVICE_NAME=your-service
// OTEL_SERVICE_VERSION=1.0.0
// OTEL_ENVIRONMENT=production
Complete Examples
π Complete OpenTelemetry Setup β
- Full OpenTelemetry tracing, metrics, and logging setup
- EventStore integration with all adapters
- Production configuration patterns
π Slog Integration Examples β
- Slog with OpenTelemetry trace correlation
- Slog-only integration (without full OpenTelemetry)
- Custom slog handler integration
π Documentation
ποΈ Architecture
Core Components:
eventstore/postgresengine/postgres.go
β PostgreSQL implementation with CTE-based optimistic locking
eventstore/postgresengine/internal/adapters/
β Database adapter abstraction (pgx, sql.DB, sqlx)
eventstore/filter.go
β Fluent filter builder for entity-independent queries
eventstore/storable_event.go
β Storage-agnostic event DTOs
Database Adapters:
The event store supports three PostgreSQL adapters, switchable via factory functions:
- pgx.Pool (default): High-performance connection pooling
- database/sql: Standard library with lib/pq driver
- sqlx: Enhanced database/sql with additional features
Primary-Replica Support:
Optional PostgreSQL streaming replication support with context-based query routing:
β οΈ CRITICAL RULE: Always use WithStrongConsistency()
for command handlers (read-check-write operations) and WithEventualConsistency()
only for pure query handlers (read-only operations). Mixing these up will cause concurrency conflicts or stale data issues.
import "github.com/AntonStoeckl/dynamic-streams-eventstore-go/eventstore"
// Command handlers - require strong consistency (routes to primary)
ctx = eventstore.WithStrongConsistency(ctx)
events, maxSeq, err := eventStore.Query(ctx, filter)
err = eventStore.Append(ctx, filter, maxSeq, newEvent)
// Query handlers - can use eventual consistency (routes to replica)
ctx = eventstore.WithEventualConsistency(ctx)
events, _, err := eventStore.Query(ctx, filter)
Consistency Guarantees:
- Strong Consistency (default): All operations use primary database, ensuring read-after-write consistency
- Eventual Consistency: Read operations may use replica database, trading consistency for performance
- Safe Defaults: Strong consistency by default prevents subtle bugs in event sourcing scenarios
Performance Benefits:
- Reduced primary load for read-heavy workloads using replica for queries
- Optimal load distribution: Writes to primary, reads from replica
- Proper consistency guarantees without performance penalties
Key Pattern:
-- Same WHERE clause used in Query and Append for consistency
WHERE event_type IN ('BookCopyLentToReader', 'ReaderRegistered')
AND (payload @> '{"BookID": "123"}' OR payload @> '{"ReaderID": "456"}')
With 2.5M events in PostgreSQL:
- Query: ~0.36 ms average
- Append: ~3.1 ms average
See Performance Documentation for detailed benchmarks and optimization strategies.
π§ͺ Testing
See Development Guide for complete testing instructions including adapter switching and benchmarks.
π€ Contributing
See Development Guide for contribution guidelines, setup instructions, and architecture details.
π License
This project is licensed under the GNU GPLv3 β see LICENSE.txt for details.
π Acknowledgments
Inspired by Sara Pellegrini's work on Dynamic Consistency Boundaries and Rico Fritsche's PostgreSQL CTE implementation patterns.