es

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2025 License: MIT Imports: 5 Imported by: 0

README

es — Event Sourcing in Go

Go Reference Go Report Card

Event Sourcing library for Go, designed for building scalable, event-driven applications with CQRS and DDD principles.

✨ Features

  • Lightweight Event Sourcing primitives for building event-driven systems.
  • Supports generics for flexible aggregate IDs of nearly any Go type.
  • Flexible Event Store abstractions for custom persistence layers.
  • Built-in snapshot support to optimize aggregate state recovery.
  • Pluggable aggregate root implementations for domain modeling.
  • Support for multiple storage backends, including SQL and NoSQL.
  • Extensible pre- and post-hook system for custom event processing.

🚀 Getting Started

Installation
go get github.com/stackus/es

📚 Usage

There are a few basic concepts to understand when using this library:

  • Aggregate Root: The primary entity in an event-sourced system.
  • Aggregate ID: The unique identifier for an aggregate root.
  • Event: A change that has occurred to an aggregate root.
  • Aggregate Store: The persistence layer for storing and retrieving events and snapshots for an aggregate root.
1. Define an Aggregate
// type Aggregate[K comparable] interface {
// 	AggregateType() string
// 	ApplyChange(event es.EventPayload) error
// }

type Order struct {
	es.AggregateBase[uuid.UUID] // embed the AggregateBase
	Total int
}

// implement the Aggregate[K] interface; implement the AggregateType method
func (o *Order) AggregateType() string { return "Order" }

// implement the ApplyChange method
func (o *Order) ApplyChange(event es.EventPayload) error {
	switch e := event.(type) {
	case *OrderCreated:
		o.Total = e.Total
	}
	return nil
}
2. Create an Aggregate ID
// type AggregateID[K comparable] interface {
//	Get() K
//	New() K
//	Set(id K)
//	IsSet() bool
// }

type RootID uuid.UUID

// implement the AggregateID interface for the RootID type
func (r *RootID) Get() uuid.UUID    { return uuid.UUID(*r) }
func (r *RootID) New() uuid.UUID    { return uuid.New() }
func (r *RootID) Set(id uuid.UUID)  { *r = RootID(id) }
func (r *RootID) IsSet() bool       { return *r != RootID(uuid.Nil) }

You are free to use whatever kind of ID you want, as long as it you implement the es.AggregateID[K] interface. There are tests and examples in this repository that show the usage of string and int IDs as well.

TODO: Move docs for the ID before the Aggregate? It seems like it would be more logical to explain the ID before the Aggregate.

3. Define Events

A simple Go struct with exported fields and a Kind() string method will do just fine:

type OrderCreated struct {
	Total   int
}
func (o *OrderCreated) Kind() string { return "OrderCreated" }
4. Create a Constructor or Factory Function
// example of simple constructor
func NewOrder(id *RootID) *Order {
	return &Order{
		AggregateBase: es.NewAggregateBase(id),
	}
}

// example of factory function
func CreateOrder(id *RootID, total int) (*Order, error) {
	order := NewOrder(id)

	// record a change to the new aggregate
	if err := order.TrackChange(order, &OrderCreated{
		Total: total,
	}); err != nil {
		return nil, err
	}

	return order, nil
}

The TrackChange(aggregate es.AggregateRoot[K], event any) error method is used to apply changes to an aggregate root. This method is provided by the embedded es.AggregateRoot[K] in the aggregate struct.

The changes are applied to the aggregate with the previously seen ApplyChange(event any) error method implemented in Order.

5. Create an Event Store
a. Create a repository that implements the es.EventRepository interface:
repository := memory.NewEventRepository[uuid.UUID]()
b. Create an instance of the event store:
eventStore := es.NewEventStore(reg, repository)
c. All events that you want to store must be registered with the store:
es.RegisterEvent(eventStore, &OrderCreated{})
// register more events ...
6. Loading and Saving Events

To load all changes for an aggregate, you will do something similar to this:

id := RootID(someOrderID)
order := NewOrder(&id)

err := eventStore.Load(ctx, order)
if err != nil {
	return err
}

To save uncommitted changes made to an aggregate, you will do something similar to this:

err := eventStore.Save(ctx, order)
if err != nil {
	return err
}

Both of these methods will use the hooks you provide to process the events before and after they are saved or loaded.

Hooks are an optional third variadic parameter to the Load and Save methods. The types of hooks available include pre-hooks and post-hooks, for example EventsPreSave and EventsPostLoad.


var hooks []es.Hook[uuid.UUID]
// add a pre-save hook
hooks = append(hooks, es.EventsPreSaveHook(func(ctx context.Context, aggregate es.Aggregate[uuid.UUID], events []es.Event[uuid.UUID]) error {
	// do something before saving
	return nil
}))
// add a post-save hook
hooks = append(hooks, es.EventsPostSaveHook(func(ctx context.Context, aggregate es.Aggregate[uuid.UUID], events []es.Event[uuid.UUID]) error {
	// do something after saving
	return nil
}))

err := eventStore.Save(ctx, order, hooks...)

Use these hooks to add custom behavior to the saving and loading of events. Logging, "domain events", and other behaviors can be added here.

Snapshots

Snapshots are a way to optimize the loading of an aggregate by storing the state of the aggregate at a certain point in time.

Using Snapshots is entirely optional, but can be invaluable when you have aggregates with a large number of events.

1. Define a Snapshot

Like the events, a snapshot is a simple Go struct with exported fields and a Kind() string method.

type OrderSnapshot struct {
	Total int
}
func (o *OrderSnapshot) Kind() string { return "OrderSnapshot" }
2. Add the required methods to your Aggregate
// type SnapshotAggregate[K comparable] interface {
// 	CreateSnapshot() es.SnapshotPayload
//	ApplySnapshot(snapshot es.SnapshotPayload) error
// }

func (o *Order) CreateSnapshot() es.SnapshotPayload {
	return &OrderSnapshot{
		Total: o.Total,
	}
}

func (o *Order) ApplySnapshot(snapshot es.SnapshotPayload) error {
	switch s := snapshot.(type) {
	case *OrderSnapshot:
		o.Total = s.Total
	}
	
	return nil
}
3. Create a Snapshot Store
a. Create a repository that implements the es.SnapshotRepository interface:
snapshotRepository := memory.NewSnapshotRepository[uuid.UUID]()
b. Create an instance of the snapshot store:
snapshotStore := es.NewSnapshotStore(
	eventStore, // we will use the event store we created earlier to save events
	snapshotRepository, 
	es.NewFrequencySnapshotStrategy(10), // create a new snapshot every 10 events
)

There are other strategies available, such as es.NewParticularChangesSnapshotStrategy(changes...), which creates a new snapshot when a particular change has occurred. Of course, you can also create your own strategy by implementing the es.SnapshotStrategy interface.

c. Register the snapshots with the snapshot store:
es.RegisterSnapshot(snapshotStore, &OrderSnapshot{})
4. Loading and Saving Snapshots

The SnapshotStore has the same Load and Save methods as the EventStore. They both implement the AggregateStore[K] interface.

This means that we also have access to the same hooks that we used with the EventStore. The only difference is that the hooks are applied to snapshots instead of events, so they are of type SnapshotPre* and SnapshotPost*.

These hooks are used by the snapshot store to hook into the saving and loading of events.

📜 License

This project is licensed under the MIT License—see the LICENSE file for details.

🤝 Contributing

Contributions, issues, and feature requests are welcome! Feel free to check the issues page.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RegisterEvent added in v0.2.0

func RegisterEvent[K comparable, T EventPayload](eventStore *EventStore[K], event T)

func RegisterSnapshot added in v0.2.0

func RegisterSnapshot[K comparable, T SnapshotPayload](snapshotStore *SnapshotStore[K], snapshot T)

Types

type Aggregate

type Aggregate[K comparable] interface {
	AggregateID() K
	AggregateVersion() int
	TrackChange(aggregate AggregateRoot[K], event EventPayload) error
	TrackChanges(aggregate AggregateRoot[K], events ...EventPayload) error
	Changes() []EventPayload
	SetID(K)
	SetVersion(int)
	// contains filtered or unexported methods
}

func NewAggregate

func NewAggregate[K comparable](id AggregateID[K]) Aggregate[K]

type AggregateID

type AggregateID[K comparable] interface {
	Get() K      // Get the current ID; this can return a zero value and IsSet should be used to check for zero values
	New() K      // Generate new ID based on type-specific rules
	Set(K)       // Set the ID if it is not already set
	IsSet() bool // Check against type-specific zero value rules
}

type AggregateRoot

type AggregateRoot[K comparable] interface {
	Aggregate[K]
	AggregateType() string
	ApplyChange(event EventPayload) error
}

type AggregateStore

type AggregateStore[K comparable] interface {
	Load(ctx context.Context, aggregate AggregateRoot[K], hooks ...Hook[K]) error
	Save(ctx context.Context, aggregate AggregateRoot[K], hooks ...Hook[K]) error
}

type ErrAggregateVersionConflict

type ErrAggregateVersionConflict struct {
	AggregateID      any
	AggregateType    string
	AggregateVersion int
}

func (ErrAggregateVersionConflict) Error

type ErrUnknownAggregateChangeType

type ErrUnknownAggregateChangeType struct {
	AggregateType string
	Change        EventPayload
}

func (ErrUnknownAggregateChangeType) Error

type ErrUnknownAggregateSnapshotType

type ErrUnknownAggregateSnapshotType struct {
	AggregateType string
	Snapshot      SnapshotPayload
}

func (ErrUnknownAggregateSnapshotType) Error

type ErrUnregisteredEvent added in v0.2.0

type ErrUnregisteredEvent string

func (ErrUnregisteredEvent) Error added in v0.2.0

func (e ErrUnregisteredEvent) Error() string

type ErrUnregisteredSnapshot added in v0.2.0

type ErrUnregisteredSnapshot string

func (ErrUnregisteredSnapshot) Error added in v0.2.0

func (e ErrUnregisteredSnapshot) Error() string

type Event

type Event[K comparable] struct {
	AggregateID      K
	AggregateType    string
	AggregateVersion int
	EventType        string
	EventData        []byte
	OccurredAt       time.Time
}

type EventLoadHooks

type EventLoadHooks[K comparable] interface {
	EventsPreLoad(ctx context.Context, aggregate AggregateRoot[K]) error
	EventsPostLoad(ctx context.Context, aggregate AggregateRoot[K], events []Event[K]) error
}

type EventPayload added in v0.2.0

type EventPayload interface {
	Kind() string
}

type EventRepository

type EventRepository[K comparable] interface {
	Load(ctx context.Context, aggregate AggregateRoot[K], hooks EventLoadHooks[K]) ([]Event[K], error)
	Save(ctx context.Context, aggregate AggregateRoot[K], events []Event[K], hooks EventSaveHooks[K]) error
}

type EventSaveHooks

type EventSaveHooks[K comparable] interface {
	EventsPreSave(ctx context.Context, aggregate AggregateRoot[K], events []Event[K]) error
	EventsPostSave(ctx context.Context, aggregate AggregateRoot[K], events []Event[K]) error
}

type EventStore added in v0.2.0

type EventStore[K comparable] struct {
	// contains filtered or unexported fields
}

func NewEventStore

func NewEventStore[K comparable](
	repository EventRepository[K],
	hooks ...Hook[K],
) *EventStore[K]

func (*EventStore[K]) Load added in v0.2.0

func (s *EventStore[K]) Load(ctx context.Context, aggregate AggregateRoot[K], hooks ...Hook[K]) error

func (*EventStore[K]) Save added in v0.2.0

func (s *EventStore[K]) Save(ctx context.Context, aggregate AggregateRoot[K], hooks ...Hook[K]) error

func (*EventStore[K]) WithRepository added in v0.2.1

func (s *EventStore[K]) WithRepository(repository EventRepository[K]) *EventStore[K]

WithRepository returns a new EventStore with the provided repository.

type EventsPostLoad

type EventsPostLoad[K comparable] func(ctx context.Context, aggregate AggregateRoot[K], events []Event[K]) error

type EventsPostSave

type EventsPostSave[K comparable] func(ctx context.Context, aggregate AggregateRoot[K], events []Event[K]) error

type EventsPreLoad

type EventsPreLoad[K comparable] func(ctx context.Context, aggregate AggregateRoot[K]) error

type EventsPreSave

type EventsPreSave[K comparable] func(ctx context.Context, aggregate AggregateRoot[K], events []Event[K]) error

type Hook

func EventsPostLoadHook

func EventsPostLoadHook[K comparable](fn EventsPostLoad[K]) Hook[K]

func EventsPostSaveHook

func EventsPostSaveHook[K comparable](fn EventsPostSave[K]) Hook[K]

func EventsPreLoadHook

func EventsPreLoadHook[K comparable](fn EventsPreLoad[K]) Hook[K]

func EventsPreSaveHook

func EventsPreSaveHook[K comparable](fn EventsPreSave[K]) Hook[K]

func SnapshotPostLoadHook

func SnapshotPostLoadHook[K comparable](fn SnapshotPostLoad[K]) Hook[K]

func SnapshotPostSaveHook

func SnapshotPostSaveHook[K comparable](fn SnapshotPostSave[K]) Hook[K]

func SnapshotPreLoadHook

func SnapshotPreLoadHook[K comparable](fn SnapshotPreLoad[K]) Hook[K]

func SnapshotPreSaveHook

func SnapshotPreSaveHook[K comparable](fn SnapshotPreSave[K]) Hook[K]

type Hooks

type Hooks[K comparable] []Hook[K]

func (Hooks[K]) EventsPostLoad

func (h Hooks[K]) EventsPostLoad(ctx context.Context, aggregate AggregateRoot[K], events []Event[K]) error

func (Hooks[K]) EventsPostSave

func (h Hooks[K]) EventsPostSave(ctx context.Context, aggregate AggregateRoot[K], events []Event[K]) error

func (Hooks[K]) EventsPreLoad

func (h Hooks[K]) EventsPreLoad(ctx context.Context, aggregate AggregateRoot[K]) error

func (Hooks[K]) EventsPreSave

func (h Hooks[K]) EventsPreSave(ctx context.Context, aggregate AggregateRoot[K], events []Event[K]) error

func (Hooks[K]) SnapshotPostLoad

func (h Hooks[K]) SnapshotPostLoad(ctx context.Context, aggregate AggregateRoot[K], snapshot *Snapshot[K]) error

func (Hooks[K]) SnapshotPostSave

func (h Hooks[K]) SnapshotPostSave(ctx context.Context, aggregate AggregateRoot[K], snapshot Snapshot[K]) error

func (Hooks[K]) SnapshotPreLoad

func (h Hooks[K]) SnapshotPreLoad(ctx context.Context, aggregate AggregateRoot[K]) error

func (Hooks[K]) SnapshotPreSave

func (h Hooks[K]) SnapshotPreSave(ctx context.Context, aggregate AggregateRoot[K], snapshot Snapshot[K]) error

type Snapshot

type Snapshot[K comparable] struct {
	AggregateID      K
	AggregateType    string
	AggregateVersion int
	SnapshotType     string
	SnapshotData     []byte
	CreatedAt        time.Time
}

type SnapshotAggregate

type SnapshotAggregate[K comparable] interface {
	CreateSnapshot() SnapshotPayload
	ApplySnapshot(snapshot SnapshotPayload) error
	Aggregate[K]
}

type SnapshotLoadHooks

type SnapshotLoadHooks[K comparable] interface {
	SnapshotPreLoad(ctx context.Context, aggregate AggregateRoot[K]) error
	SnapshotPostLoad(ctx context.Context, aggregate AggregateRoot[K], snapshot *Snapshot[K]) error
}

type SnapshotPayload added in v0.2.0

type SnapshotPayload interface {
	Kind() string
}

type SnapshotPostLoad

type SnapshotPostLoad[K comparable] func(ctx context.Context, aggregate AggregateRoot[K], snapshot *Snapshot[K]) error

type SnapshotPostSave

type SnapshotPostSave[K comparable] func(ctx context.Context, aggregate AggregateRoot[K], snapshot Snapshot[K]) error

type SnapshotPreLoad

type SnapshotPreLoad[K comparable] func(ctx context.Context, aggregate AggregateRoot[K]) error

type SnapshotPreSave

type SnapshotPreSave[K comparable] func(ctx context.Context, aggregate AggregateRoot[K], snapshot Snapshot[K]) error

type SnapshotRepository

type SnapshotRepository[K comparable] interface {
	Load(ctx context.Context, aggregate AggregateRoot[K], hooks SnapshotLoadHooks[K]) (*Snapshot[K], error)
	Save(ctx context.Context, aggregate AggregateRoot[K], snapshot Snapshot[K], hooks SnapshotSaveHooks[K]) error
}

type SnapshotSaveHooks

type SnapshotSaveHooks[K comparable] interface {
	SnapshotPreSave(ctx context.Context, aggregate AggregateRoot[K], snapshot Snapshot[K]) error
	SnapshotPostSave(ctx context.Context, aggregate AggregateRoot[K], snapshot Snapshot[K]) error
}

type SnapshotStore added in v0.2.0

type SnapshotStore[K comparable] struct {
	// contains filtered or unexported fields
}

func NewSnapshotStore

func NewSnapshotStore[K comparable](
	eventStore *EventStore[K],
	repository SnapshotRepository[K],
	strategy SnapshotStrategy[K],
	hooks ...Hook[K],
) *SnapshotStore[K]

func (*SnapshotStore[K]) Load added in v0.2.0

func (s *SnapshotStore[K]) Load(ctx context.Context, aggregate AggregateRoot[K], hooks ...Hook[K]) error

func (*SnapshotStore[K]) Save added in v0.2.0

func (s *SnapshotStore[K]) Save(ctx context.Context, aggregate AggregateRoot[K], hooks ...Hook[K]) error

func (*SnapshotStore[K]) WithEventStore added in v0.2.1

func (s *SnapshotStore[K]) WithEventStore(eventStore *EventStore[K]) *SnapshotStore[K]

WithEventStore returns a new SnapshotStore with the provided EventStore.

func (*SnapshotStore[K]) WithRepository added in v0.2.1

func (s *SnapshotStore[K]) WithRepository(repository SnapshotRepository[K]) *SnapshotStore[K]

WithRepository returns a new SnapshotStore with the provided SnapshotRepository.

type SnapshotStrategy

type SnapshotStrategy[K comparable] interface {
	ShouldSnapshot(aggregate AggregateRoot[K]) bool
}

func NewFrequencySnapshotStrategy

func NewFrequencySnapshotStrategy[K comparable](frequency int) SnapshotStrategy[K]

NewFrequencySnapshotStrategy creates a new SnapshotStrategy that takes in a frequency value.

This strategy is useful when you want to snapshot the aggregate every N changes.

func NewParticularChangesSnapshotStrategy

func NewParticularChangesSnapshotStrategy[K comparable](changes []any) SnapshotStrategy[K]

NewParticularChangesSnapshotStrategy creates a new SnapshotStrategy that takes in a list of changes.

This strategy is useful when you want to snapshot the aggregate when a particular change is applied.

Directories

Path Synopsis
repositories
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL