eventstore

module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2025 License: MIT

README ΒΆ

Event Sourcing Framework for Go

An alpha version Event Sourcing and CQRS framework for Go with Protocol Buffers code generation, built-in observability, and flexible storage backends.

Go Version License

⚠️ Security Notice

This project is in Beta. Significant security features have been implemented, but production deployments require careful configuration.

βœ… Security Improvements
  • βœ… Secure credentials management - pkg/security/credentials with encryption support (AWS, GCP, Azure, Vault)
  • βœ… SQL injection protection - Comprehensive input validation and parameterized queries
  • βœ… Error Handling - pkg/errorx provides safe error propagation and sanitization patterns
  • βœ… Input validation - Defense-in-depth validation across event store operations
  • βœ… Encrypted Storage - Support for encrypted key stores and multi-tenant data
πŸ›‘οΈ Production Checklist
  • ⚠️ TLS Configuration - Ensure NATS and Database connections use TLS (requires explicit setup)
  • ⚠️ Authorization - Implement ABAC/RBAC using your preferred policy engine (e.g., OPA)
  • ⚠️ Rate Limiting - Configure rate limits in pkg/config and ensure enforcement at the gateway level
  • ⚠️ Secret Management - Use a production secret manager (AWS Secrets Manager, Vault) instead of local files

πŸ“š See Security Credentials Guide for configuration details.


Overview

This framework provides everything you need to build event-sourced systems in Go:

  • Type-safe code generation from Protocol Buffers definitions
  • Clean CQRS patterns with automatic command/query routing
  • Flexible projections with built-in checkpoint management
  • Multiple storage backends (SQLite/LibSQL: local, Turso cloud, embedded replica; PostgreSQL planned)
  • Event streaming via NATS JetStream
  • Built-in observability with OpenTelemetry integration
  • Service lifecycle management for production deployments
  • Event analytics for debugging and insights (automatic tracking, persisted in snapshots)
  • Snapshots for 20-100x performance improvements
  • Event seeding for migrations and bootstrapping (idempotent, deterministic)
  • Secure credentials with AWS/GCP/Azure/Vault integration

Quick Start

Prerequisites
  • Go 1.25+
  • buf (for Protobuf generation)
  • protoc-gen-go, protoc-gen-connect-go
  • NATS server (for messaging)
Installation
# Clone the repository
git clone https://github.com/plaenen/eventstore.git
cd eventstore

# Install dependencies
go mod download

# Generate code
task generate

# Run tests
task test
Building Your First Application

This guide walks through building a scalable, multi-tenant application using Event Sourcing and CQRS.

1. Define Service & Domain (Proto)

Define your service, commands, events, and aggregate state in .proto files.

proto/account/domain/v1/account.proto (Domain Model):

syntax = "proto3";
package account.domain.v1;

import "eventsourcing/options.proto";

// Aggregate State
message Account {
  option (eventsourcing.aggregate_root) = { id_field: "account_id" };
  string account_id = 1;
  string balance = 2;
  string status = 3;
}

// Events
message AccountOpenedEvent {
  option (eventsourcing.event) = { aggregate_name: "Account" };
  string account_id = 1;
  string owner_name = 2;
}

message MoneyDepositedEvent {
  option (eventsourcing.event) = { aggregate_name: "Account" };
  string account_id = 1;
  string amount = 2;
  string new_balance = 3;
}

proto/account/service/v1/account.proto (API Definition):

syntax = "proto3";
package account.service.v1;

import "eventsourcing/options.proto";
import "cqrs/options.proto";
import "account/domain/v1/account.proto";

service AccountCommandService {
  option (cqrs.service) = {
    service_type: SERVICE_TYPE_COMMAND
    generate_client: true
  };

  rpc OpenAccount(OpenAccountCommand) returns (OpenAccountResponse);
  rpc Deposit(DepositCommand) returns (DepositResponse);
}

message OpenAccountCommand { ... }
message OpenAccountResponse { ... }
message DepositCommand { ... }
message DepositResponse { ... }
2. Generate Code

Run buf generate to create type-safe Go code, including:

  • AccountAggregate: Domain object with Apply* methods.
  • AccountRepository: For loading/saving aggregates.
  • AccountEventApplier: Interface for domain logic.
  • AccountCommandServiceHandler: Interface for service implementation.
3. Implement Domain Logic (Appliers)

Implement AccountEventApplier to define how events mutate state. This is pure domain logic.

type AccountAppliers struct{}

func (a *AccountAppliers) ApplyAccountOpenedEvent(agg *accountdomainv1.Account, e *accountdomainv1.AccountOpenedEvent) error {
    agg.AccountId = e.AccountId
    agg.Status = accountdomainv1.AccountStatus_ACCOUNT_STATUS_OPEN
    agg.Balance = "0"
    return nil
}

func (a *AccountAppliers) ApplyMoneyDepositedEvent(agg *accountdomainv1.Account, e *accountdomainv1.MoneyDepositedEvent) error {
    agg.Balance = e.NewBalance
    return nil
}
4. Implement Command Handler

Implement AccountCommandServiceHandler to coordinate loading, validating, and saving.

type AccountHandler struct {
    repo *accountservicev1.AccountRepository
}

func (h *AccountHandler) Deposit(ctx context.Context, cmd *accountservicev1.DepositCommand) (*accountservicev1.DepositResponse, error) {
    // 1. Validate Command
    if cmd.Amount <= 0 { return nil, fmt.Errorf("invalid amount") }

    // 2. Load & Mutate (with optimistic locking retry)
    err := h.repo.RetryOnConflict(cmd.AccountId, 3, func(agg *accountdomainv1.Account) error {
        // Business Rule Check
        if agg.Status != accountdomainv1.AccountStatus_ACCOUNT_STATUS_OPEN { return fmt.Errorf("account closed") }

        // Calculate new state & Emit Event
        event := &accountdomainv1.MoneyDepositedEvent{
            AccountId: cmd.AccountId,
            Amount: cmd.Amount,
            NewBalance: newBalance,
        }
        
        // Apply Event (updates in-memory state)
        return agg.ApplyMoneyDepositedEvent(event)
    })

    return &accountservicev1.DepositResponse{...}, err
}
5. Create Projections (Read Models)

Projections listen to the event stream and update a read-optimized database.

func (p *AccountProjection) HandleEvent(ctx context.Context, event *eventsourcing.Event) error {
    switch e := event.Payload.(type) {
    case *accountv1.AccountOpenedEvent:
        _, err := p.db.Exec("INSERT INTO accounts (id, balance) VALUES (?, ?)", e.AccountId, 0)
        return err
    case *accountv1.MoneyDepositedEvent:
        _, err := p.db.Exec("UPDATE accounts SET balance = ? WHERE id = ?", e.NewBalance, e.AccountId)
        return err
    }
    return nil
}
6. Wiring It All Together
func main() {
    // 1. Initialize Infrastructure
    nc, _ := nats.Connect("nats://localhost:4222")
    eventStore, _ := sqlite.NewEventStore(sqlite.WithDSN("events.db"))
    
    // 2. Initialize Components
    repo := accountservicev1.NewAccountRepository(eventStore, domain.NewAccountAppliers())
    handler := handlers.NewAccountHandler(repo)
    
    // 3. Start NATS Server
    server, _ := nats.NewServer(&nats.ServerConfig{Connection: nc})
    server.RegisterHandler("commands.account.deposit", handler.Deposit)
    server.Start(context.Background())
}
Secure Credential Management

Use the credentials provider for secure authentication across cloud providers:

import "github.com/plaenen/eventstore/pkg/security/credentials"

// Production: AWS Secrets Manager
provider, err := credentials.NewSecretProvider(ctx,
    "awskms://arn:aws:secretsmanager:us-east-1:123456789:secret:nats-creds")

// Get credentials with automatic caching
creds, err := provider.GetCredentials(ctx)

// Use with NATS
nc, err := nats.Connect(
    natsURL,
    nats.UserInfo(creds.Username, creds.Password),
)
defer provider.Close()

Supported Backends:

  • AWS Secrets Manager
  • GCP Secret Manager
  • Azure Key Vault
  • HashiCorp Vault
  • Local files (development)
  • Environment variables (simple cases)

πŸ“š See Security Credentials Guide for complete examples

Core Concepts

Architecture

The framework follows clean architecture principles with clear separation of concerns:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Application Layer                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚   Commands  β”‚  β”‚   Queries    β”‚  β”‚  Projections  β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Domain Layer                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ Aggregates  β”‚  β”‚    Events    β”‚  β”‚   Commands    β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                Infrastructure Layer                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ Event Store  β”‚  β”‚   Messaging  β”‚  β”‚     CQRS     β”‚  β”‚
β”‚  β”‚  (SQLite)    β”‚  β”‚    (NATS)    β”‚  β”‚    (NATS)    β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Package Structure
pkg/
β”œβ”€β”€ domain/           # Pure domain types (Event, Command, Aggregate)
β”œβ”€β”€ store/            # Event persistence (EventStore, Repository, Snapshots)
β”‚   └── sqlite/      # SQLite implementation
β”œβ”€β”€ cqrs/            # Command/Query handling (request/reply)
β”‚   └── nats/        # NATS implementation
β”œβ”€β”€ messaging/       # Event publishing/subscription (pub/sub)
β”‚   └── nats/        # NATS JetStream implementation
β”œβ”€β”€ infrastructure/  # Pure infrastructure utilities
β”‚   └── nats/        # Embedded NATS server
β”œβ”€β”€ observability/   # OpenTelemetry integration
β”œβ”€β”€ runtime/         # Service lifecycle management
└── multitenancy/    # Multi-tenant support

Key Features

1. Code Generation

Generate type-safe, idiomatic Go code from Protocol Buffers:

# Generate everything
buf generate

# Generated files include:
# - Aggregate implementations with event sourcing
# - Command/query handlers
# - Client SDKs
# - Event appliers
# - NATS service integrations
2. Projections

Build read models with automatic transaction and checkpoint management:

projection, err := sqlite.NewSQLiteProjectionBuilder(
    "account-balance",
    db,
    checkpointStore,
    eventStore,
).
    WithSchema(func(ctx context.Context, db *sql.DB) error {
        _, err := db.Exec(`CREATE TABLE IF NOT EXISTS account_balance (...)`)
        return err
    }).
    On(accountv1.OnAccountOpened(func(ctx context.Context, event *accountv1.AccountOpenedEvent, envelope *domain.EventEnvelope) error {
        tx, _ := sqlite.TxFromContext(ctx)
        _, err := tx.Exec("INSERT INTO account_balance ...")
        return err
    })).
    Build()
3. Event Streaming

Real-time event processing with NATS JetStream:

// Publish events
bus, _ := natseventbus.NewEventBus(config)
bus.Publish(events)

// Subscribe to events
filter := eventsourcing.EventFilter{
    AggregateTypes: []string{"Account"},
}
bus.Subscribe(filter, func(event *eventsourcing.EventEnvelope) error {
    // Handle event
    return nil
})
4. Observability

Built-in OpenTelemetry support for traces and metrics:

tel, _ := observability.Init(ctx, observability.Config{
    ServiceName:     "account-service",
    ServiceVersion:  "1.0.0",
    TraceExporter:   exporter,
    TraceSampleRate: 1.0,
})
defer tel.Shutdown(ctx)

// Automatic tracing for commands, queries, and events
5. Service Management

Production-ready service lifecycle management:

runner := runner.New(
    []runner.Service{
        eventBusService,
        commandService,
        projectionService,
    },
    runner.WithLogger(logger),
    runner.WithShutdownTimeout(30 * time.Second),
)

// Handles SIGTERM/SIGINT gracefully
runner.Run(ctx)
6. Database Options

Multiple deployment modes for different use cases:

SQLite (Local Development)
eventStore, err := sqlite.NewEventStore(
    sqlite.WithFilename("events.db"),
)
LibSQL Remote (Turso Cloud)
eventStore, err := sqlite.NewEventStore(
    sqlite.WithLibSQLRemote(
        "libsql://your-db.turso.io",
        os.Getenv("TURSO_AUTH_TOKEN"),
    ),
)
LibSQL Embedded Replica (Local-First + Cloud Sync)
eventStore, err := sqlite.NewEventStore(
    sqlite.WithLibSQLEmbeddedReplica(
        "./local.db",
        "libsql://your-db.turso.io",
        os.Getenv("TURSO_AUTH_TOKEN"),
    ),
)

πŸ“š See LibSQL Usage Guide for complete configuration options

7. Event Analytics

Automatic event tracking for debugging and insights:

// Load aggregate
order, _ := repo.Load("order-123")

// Get analytics (automatically tracked)
analytics := order.Analytics()
fmt.Printf("Total events: %d\n", analytics.TotalEvents)
fmt.Printf("OrderPlaced: %d times\n", analytics.GetCount("OrderPlaced"))

// Detailed stats with timestamps
stats := analytics.GetStats("OrderPlaced")
fmt.Printf("First: %s, Last: %s\n", stats.FirstApplied, stats.LastApplied)

// Event distribution analysis
distribution := analytics.GetDistribution()
for eventType, pct := range distribution {
    fmt.Printf("%s: %.1f%%\n", eventType, pct)
}

Features:

  • Automatic tracking during event replay
  • Persisted in snapshots
  • No performance overhead
  • Useful for debugging and optimization

πŸ“š See Event Analytics Guide

8. Snapshots for Performance

Optimize aggregate loading with automatic snapshots:

// Enable snapshots
snapshotStore := sqlite.NewSnapshotStore(eventStore.DB())
repo := store.NewRepository(...).WithSnapshotStore(snapshotStore)

// Normal loading (uses snapshots automatically)
order, _ := repo.Load("order-123") // 20-100x faster!

// Save snapshots periodically
if order.Version() % 100 == 0 {
    repo.SaveSnapshot(order)
}

Performance Gains:

  • 10,000 events: 500ms β†’ 25ms (20x faster)
  • 100,000 events: 5,000ms β†’ 50ms (100x faster)
  • Analytics automatically preserved in snapshots

πŸ“š See Snapshot Guide

9. Event Seeding for Migrations

Deterministic, idempotent event seeding for migrations and bootstrapping:

// Bootstrap admin user
admin := NewUser("admin-001")
admin.Create("admin@example.com", "Admin")
admin.AssignRole("super_admin")

// Seed with default options (idempotent)
opts := domain.DefaultSeedOptions()
opts.CustomTags = map[string]string{
    "migration": "v1.0.0",
    "source":    "bootstrap",
}

result, err := repo.SeedAggregate(admin, 0, opts)
fmt.Printf("Saved: %d, Skipped: %d\n", result.Saved, result.Skipped)

Features:

  • Idempotent (safe to run multiple times)
  • Deterministic ID generation
  • Constraint ownership checking
  • Custom metadata for data lineage

Use Cases:

  • Database migrations (historical data import)
  • Bootstrap data (admin users, system configs)
  • Test fixtures (deterministic test data)

πŸ“š See Event Seeding Guide

Examples

Complete Examples

See the examples/ directory for complete, runnable examples:

Run any example:

go run ./examples/cmd/bankaccount-observability

Documentation

Getting Started
Guides
  • Projection Patterns - Building read models (Generic, SQLite, NATS)

  • Event Upcasting - Schema evolution and backward compatibility

  • SDK Generation - Generating unified SDKs

  • Domain Layer - Core interfaces for Aggregates, Events, and Commands. Defines the AggregateRoot and EventEnvelope types.

  • Event Store - Persistence layer for storing events. Includes:

    • pkg/store/sqlite: SQLite/LibSQL implementation with support for local files, Turso, and embedded replicas.
  • CQRS - Command Query Responsibility Segregation framework.

    • pkg/cqrs/nats: NATS-based transport for command routing and query handling.
  • Messaging - Event publishing and subscription infrastructure.

    • pkg/messaging/nats: JetStream implementation for reliable event streaming.
  • Identity - Identity and Access Management (IAM) service.

    • pkg/identity/store/sqlite: Secure credential storage using SQLite.
  • Runtime Services - Service lifecycle management, graceful shutdown, and dependency injection.

  • Observability - OpenTelemetry integration for distributed tracing and metrics.

  • Security - Security utilities including encryption and credential management.

All Documentation

See the Documentation Index for a complete guide to all documentation, organized by topic and learning path.

Contributing

We welcome contributions! Please see our Contributing Guide for details on:

  • Setting up your development environment
  • Code style and conventions
  • Testing requirements
  • Pull request process

Community

  • GitHub Issues - Bug reports and feature requests
  • GitHub Discussions - Questions and community support

License

MIT License - see LICENSE file for details

Acknowledgments

Built with:


Ready to build event-sourced systems? Explore the examples to get started!

Directories ΒΆ

Path Synopsis
cmd
protoc-gen-cqrs command
protoc-gen-cqrs generates CQRS infrastructure from proto files.
protoc-gen-cqrs generates CQRS infrastructure from proto files.
protoc-gen-eventsourcing command
protoc-gen-eventsourcing generates event sourcing domain model from proto files.
protoc-gen-eventsourcing generates event sourcing domain model from proto files.
examples
bankaccount/domain
Package domain contains business logic and domain implementations.
Package domain contains business logic and domain implementations.
github.com
pkg
config
Package config provides dynamic configuration management using Go Cloud Development Kit.
Package config provides dynamic configuration management using Go Cloud Development Kit.
datastarx
Common types for DataStar This is for dynamic feedback from the client for errors and validation messages
Common types for DataStar This is for dynamic feedback from the client for errors and validation messages
messaging
Package messaging provides backward compatibility aliases for types that have been moved to the eventsourcing package.
Package messaging provides backward compatibility aliases for types that have been moved to the eventsourcing package.
observability
Package observability provides OpenTelemetry-based tracing and metrics with backend-agnostic configuration for production observability.
Package observability provides OpenTelemetry-based tracing and metrics with backend-agnostic configuration for production observability.
runtime/embeddednats
Package embeddednats provides a runner.Service adapter for embedded NATS server.
Package embeddednats provides a runner.Service adapter for embedded NATS server.
runtime/eventbus
Package eventbus provides a runner.Service adapter for NATS EventBus.
Package eventbus provides a runner.Service adapter for NATS EventBus.
security/credentials
Package credentials provides secure credential management using Go Cloud Development Kit.
Package credentials provides secure credential management using Go Cloud Development Kit.
security/encryption
Package encryption provides data encryption at rest for the EventSourcing framework.
Package encryption provides data encryption at rest for the EventSourcing framework.
security/tls
Package tls provides TLS configuration and certificate management for secure transport.
Package tls provides TLS configuration and certificate management for secure transport.
validation
Package validation provides both simple and rich validation functions for input validation across different architectural layers.
Package validation provides both simple and rich validation functions for input validation across different architectural layers.
proto
repro

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL