dynamic-streams-eventstore-go

module
v1.4.1-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2025 License: GPL-3.0

README ΒΆ

dynamic-streams-eventstore-go

Go Report Card codecov GoDoc Go Version Release

A Go-based Event Store implementation for Event Sourcing with PostgreSQL, operating on Dynamic Event Streams (also known as Dynamic Consistency Boundaries).

Unlike traditional event stores with fixed streams tied to specific entities, this approach enables atomic entity-independent operations while maintaining strong consistency through PostgreSQL's ACID guarantees.

✨ Key Features

  • πŸ”„ Dynamic Event Streams: Query and modify events across multiple entities atomically
  • πŸ“Έ Snapshot Support: Efficient projection state storage with incremental updates and sequence number tracking
  • ⚑ High Performance: Sub-millisecond queries (~0.36ms), ~3.1ms atomic appends with optimistic locking
  • πŸ›‘οΈ ACID Transactions: PostgreSQL-backed consistency without distributed transactions
  • 🎯 Fluent Filter API: Type-safe, expressive event filtering with compile-time validation
  • πŸ“Š JSON-First: Efficient JSONB storage with GIN index optimization
  • πŸ”— Multiple Adapters: Support for pgx/v5, database/sql, and sqlx database connections
  • 🏒 Primary-Replica Support: Context-based query routing for PostgreSQL streaming replication setups
  • πŸ“ Structured Logging: Configurable SQL query logging and operational monitoring (slog, zerolog, logrus compatible)
  • πŸ“ OpenTelemetry Compatible Contextual Logging: Context-aware logging with automatic trace correlation
  • πŸ“ˆ OpenTelemetry Compatible Metrics: Comprehensive observability with duration, counters, error tracking, and context cancellation/timeout detection
  • πŸ” OpenTelemetry Compatible Tracing: Dependency-free tracing interface for OpenTelemetry, Jaeger, and custom backends
  • πŸ”Œ OpenTelemetry Ready-to-Use Adapters: Official plug-and-play adapters for immediate OpenTelemetry integration

πŸš€ Quick Start

go get github.com/AntonStoeckl/dynamic-streams-eventstore-go
// Create event store with pgx adapter (default)
eventStore, err := postgresengine.NewEventStoreFromPGXPool(pgxPool)
if err != nil {
    log.Fatal(err)
}

// Or use alternative adapters:
// eventStore, err := postgresengine.NewEventStoreFromSQLDB(sqlDB)
// eventStore, err := postgresengine.NewEventStoreFromSQLX(sqlxDB)

// Query events spanning multiple entities
filter := BuildEventFilter().
    Matching().
    AnyEventTypeOf(
        "BookCopyAddedToCirculation", "BookCopyRemovedFromCirculation",
        "ReaderRegistered", "BookCopyLentToReader", "BookCopyReturnedByReader").
    AndAnyPredicateOf(
        P("BookID", bookID),
        P("ReaderID", readerID)).
    Finalize()

// Atomic operation: Query β†’ Business Logic β†’ Append
events, maxSeq, _ := eventStore.Query(ctx, filter)
newEvent := applyBusinessLogic(events)
err := eventStore.Append(ctx, filter, maxSeq, newEvent)

// Snapshot support for efficient projections
snapshot, _ := eventStore.LoadSnapshot(ctx, "BooksInCirculation", filter)
// ... build projection from events since snapshot ...
newSnapshot, _ := eventstore.BuildSnapshot("BooksInCirculation", filter.Hash(), maxSeq, projectionData)
eventStore.SaveSnapshot(ctx, newSnapshot)

πŸ’‘ The Dynamic Streams Advantage

Traditional Event Sourcing:

BookCirculation: [BookCopyAddedToCirculation, BookCopyRemovedFromCirculation, ...]  ← Separate streams
ReaderAccount:   [ReaderRegistered, ReaderContractCanceled, ...]                    ← Separate streams  
BookLending:     [BookCopyLentToReader, BookCopyReturnedByReader, ...]              ← Separate streams

Dynamic Event Streams:

Entity-independent: [BookCopyAddedToCirculation, BookCopyRemovedFromCirculation, ReaderRegistered, 
                     ReaderContractCanceled, BookCopyLentToReader, BookCopyReturnedByReader, ...]  ← Single atomic boundary

This eliminates complex synchronization between entities while maintaining strong consistency (see Performance for detailed benchmarks).
See Core Concepts for a more detailed description.

πŸ”Œ OpenTelemetry Integration

For users with existing OpenTelemetry setups, we provide ready-to-use adapters that require zero configuration. The EventStore library uses dependency-free observability interfaces (Logger, ContextualLogger, MetricsCollector, TracingCollector) to avoid forcing specific observability dependencies on users. Our OpenTelemetry adapters bridge those interfaces to OpenTelemetry, providing:

  • Zero-config integration for OpenTelemetry users
  • Automatic trace correlation in logs
  • Production-ready implementations using OpenTelemetry best practices
  • Engine-agnostic design - works with any EventStore engine (PostgreSQL, future MongoDB, etc.)
Quick Start
import "github.com/AntonStoeckl/dynamic-streams-eventstore-go/eventstore/oteladapters"

// Zero-config OpenTelemetry integration
tracer := otel.Tracer("eventstore")
meter := otel.Meter("eventstore")

eventStore, err := postgresengine.NewEventStoreFromPGXPool(pgxPool,
    postgresengine.WithTracing(oteladapters.NewTracingCollector(tracer)),
    postgresengine.WithMetrics(oteladapters.NewMetricsCollector(meter)),
    postgresengine.WithContextualLogger(oteladapters.NewSlogBridgeLogger("eventstore")),
)
Available Adapters
1. Contextual Logger Adapters

SlogBridgeLogger (Recommended) Uses the official OpenTelemetry slog bridge for automatic trace correlation:

// Option 1: Pure OpenTelemetry with automatic trace correlation
logger := oteladapters.NewSlogBridgeLogger("eventstore")

// Option 2: Use your existing slog.Handler (no trace correlation)
slogHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})
logger := oteladapters.NewSlogBridgeLoggerWithHandler(slogHandler)

Benefits:

  • βœ… Automatic trace/span ID injection when using NewSlogBridgeLogger()
  • βœ… Zero configuration - uses global OpenTelemetry LoggerProvider
  • βœ… Handler compatibility - NewSlogBridgeLoggerWithHandler() for existing setups
  • βœ… Production-ready with minimal setup

Trace Correlation Example:

// Without trace context:
{"level":"INFO","msg":"Query executed","duration":"150ms"}

// With active trace context:
{"level":"INFO","msg":"Query executed","duration":"150ms","trace_id":"abc123","span_id":"def456"}

OTelLogger (Advanced) Direct OpenTelemetry logging API integration for when you need direct control over OpenTelemetry log records.

2. Metrics Collector

Maps EventStore metrics to OpenTelemetry instruments:

meter := otel.Meter("eventstore")
collector := oteladapters.NewMetricsCollector(meter)

Instrument Mapping:

  • RecordDuration(...) β†’ Histogram (for operation durations)
  • IncrementCounter(...) β†’ Counter (for operation counts, errors, cancellations, timeouts)
  • RecordValue(...) β†’ Gauge (for current values, concurrent operations)

Context Error Detection:

  • Context Cancellation: Automatically detects context.Canceled errors from user/client cancellations
  • Context Timeout: Automatically detects context.DeadlineExceeded errors from system timeouts
  • Robust Error Handling: Works with database driver error wrapping (errors.Join, custom wrappers)
  • Separate Metrics: Distinct tracking for cancellations vs timeouts vs regular errors
3. Tracing Collector

Creates OpenTelemetry spans for EventStore operations:

tracer := otel.Tracer("eventstore")
collector := oteladapters.NewTracingCollector(tracer)

Features:

  • Automatic span creation for Query/Append operations
  • Context propagation across operations
  • Error status mapping and attribute injection
  • Proper span lifecycle management
Production Configuration

For production environments, consider:

// Use OTLP exporters instead of stdout
traceExporter := otlptrace.New(...)
metricExporter := otlpmetric.New(...)
logExporter := otlplog.New(...)

// Configure with environment variables
// OTEL_EXPORTER_OTLP_ENDPOINT=https://your-collector:4317
// OTEL_SERVICE_NAME=your-service
// OTEL_SERVICE_VERSION=1.0.0
// OTEL_ENVIRONMENT=production
Complete Examples

πŸ“– Complete OpenTelemetry Setup β†’

  • Full OpenTelemetry tracing, metrics, and logging setup
  • EventStore integration with all adapters
  • Production configuration patterns

πŸ“– Slog Integration Examples β†’

  • Slog with OpenTelemetry trace correlation
  • Slog-only integration (without full OpenTelemetry)
  • Custom slog handler integration

πŸ“š Documentation

πŸ—οΈ Architecture

Core Components:

  • eventstore/postgresengine/postgres.go β€” PostgreSQL implementation with CTE-based optimistic locking
  • eventstore/postgresengine/internal/adapters/ β€” Database adapter abstraction (pgx, sql.DB, sqlx)
  • eventstore/filter.go β€” Fluent filter builder for entity-independent queries
  • eventstore/storable_event.go β€” Storage-agnostic event DTOs

Database Adapters: The event store supports three PostgreSQL adapters, switchable via factory functions:

  • pgx.Pool (default): High-performance connection pooling
  • database/sql: Standard library with lib/pq driver
  • sqlx: Enhanced database/sql with additional features

Primary-Replica Support: Optional PostgreSQL streaming replication support with context-based query routing:

⚠️ CRITICAL RULE: Always use WithStrongConsistency() for command handlers (read-check-write operations) and WithEventualConsistency() only for pure query handlers (read-only operations). Mixing these up will cause concurrency conflicts or stale data issues.

import "github.com/AntonStoeckl/dynamic-streams-eventstore-go/eventstore"

// Command handlers - require strong consistency (routes to primary)
ctx = eventstore.WithStrongConsistency(ctx)
events, maxSeq, err := eventStore.Query(ctx, filter)
err = eventStore.Append(ctx, filter, maxSeq, newEvent)

// Query handlers - can use eventual consistency (routes to replica)
ctx = eventstore.WithEventualConsistency(ctx)
events, _, err := eventStore.Query(ctx, filter)

Consistency Guarantees:

  • Strong Consistency (default): All operations use primary database, ensuring read-after-write consistency
  • Eventual Consistency: Read operations may use replica database, trading consistency for performance
  • Safe Defaults: Strong consistency by default prevents subtle bugs in event sourcing scenarios

Performance Benefits:

  • Reduced primary load for read-heavy workloads using replica for queries
  • Optimal load distribution: Writes to primary, reads from replica
  • Proper consistency guarantees without performance penalties

Key Pattern:

-- Same WHERE clause used in Query and Append for consistency
WHERE event_type IN ('BookCopyLentToReader', 'ReaderRegistered') 
  AND (payload @> '{"BookID": "123"}' OR payload @> '{"ReaderID": "456"}')

⚑ Performance

With 2.5M events in PostgreSQL:

  • Query: ~0.36 ms average
  • Append: ~3.1 ms average

See Performance Documentation for detailed benchmarks and optimization strategies.

πŸ§ͺ Testing

See Development Guide for complete testing instructions including adapter switching and benchmarks.

🀝 Contributing

See Development Guide for contribution guidelines, setup instructions, and architecture details.

πŸ“„ License

This project is licensed under the GNU GPLv3 β€” see LICENSE.txt for details.

πŸ™ Acknowledgments

Inspired by Sara Pellegrini's work on Dynamic Consistency Boundaries and Rico Fritsche's PostgreSQL CTE implementation patterns.

Directories ΒΆ

Path Synopsis
Package eventstore provides core abstractions and types for event sourcing with Dynamic Event Streams.
Package eventstore provides core abstractions and types for event sourcing with Dynamic Event Streams.
oteladapters
Package oteladapters provides OpenTelemetry adapters for the eventstore observability interfaces.
Package oteladapters provides OpenTelemetry adapters for the eventstore observability interfaces.
postgresengine
Package postgresengine provides a PostgreSQL implementation of the eventstore interface.
Package postgresengine provides a PostgreSQL implementation of the eventstore interface.
postgresengine/internal/adapters
Package adapters provide database adapter implementations for the PostgreSQL event store.
Package adapters provide database adapter implementations for the PostgreSQL event store.
testutil
eventstore/estesthelpers
Package estesthelpers provides EventStore-agnostic test utilities and fixtures.
Package estesthelpers provides EventStore-agnostic test utilities and fixtures.
eventstore/fixtures
Package fixtures contains minimal test events for EventStore testing.
Package fixtures contains minimal test events for EventStore testing.
eventstore/shared
Package shared provides EventStore interfaces and common types for testing.
Package shared provides EventStore interfaces and common types for testing.
observability/config
Package config provides observability configuration for EventStore testing.
Package config provides observability configuration for EventStore testing.
observability/testdoubles
Package testdoubles provides test doubles (spies) for observability interfaces.
Package testdoubles provides test doubles (spies) for observability interfaces.
postgresengine/config
Package config provides PostgreSQL database configuration for EventStore testing.
Package config provides PostgreSQL database configuration for EventStore testing.
postgresengine/pgtesthelpers
Package pgtesthelpers provides test utilities for PostgreSQL EventStore testing with multi-adapter support.
Package pgtesthelpers provides test utilities for PostgreSQL EventStore testing with multi-adapter support.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL