es

package module
v0.0.0-...-e8f8035 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2025 License: MIT Imports: 16 Imported by: 1

README

Event Sourcing in Go

Build Status Report Card Go Reference codecov

Library to work with Event Sourcing as a Persistence Layer in Go

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HydrateState

func HydrateState[T Handler](t *testing.T, state T, contents ...Content) T

HydrateState is a test helper meant to make it easy to hydrate a state using event data.

func VerifyEvents

func VerifyEvents(t *testing.T, events []Content, expected ...any) bool

Types

type Config

type Config struct {
	// contains filtered or unexported fields
}

type Content

type Content interface {
	Name() string
}

Content is the application specific data model used in an Event.

type Event

type Event struct {
	// EntityID is the ID of the stream the event belongs to.
	EntityID string
	// EntityType is the type of the stream the event belongs to.
	EntityType string
	// EventNumber is the number of the event in the stream.
	EventNumber int64
	// EventTime is the time the event was recorded in the Store
	EventTime time.Time
	// Content is the actual content of the event. Expected to be a struct defined
	// by the application.
	Content Content
	// StoreEventID is the ID of the event assigned by the Store
	// The StoreEventID is a UUIDv7 with the underlying time matching the EventTime
	StoreEventID string
	// StoreEntityID is the ID of the stream assigned by the Store
	// The StoreEntityID is a UUIDv7 with the underlying time matching the EventTime
	// of the first event in the stream.
	StoreEntityID string
}

Event is a combination of the metadata and content of a business event in the system. It is part of a Stream that makes up the current state of a business entity.

type EventBus

type EventBus interface {
	// Write should write the events to all subscriptions
	Write(ctx context.Context, entityType string, events iter.Seq2[Event, error]) error
	// Subscribe a Handler by it's subscriptionID
	Subscribe(ctx context.Context, entityType string, subscriberID string, handler Handler) error
	// GetSubscriberIDs returns a list of all subscription IDs for the eventType
	GetSubscriberIDs(ctx context.Context, entityType string) ([]string, error)
	// WriteTo call all Handler with subscriberIDs with the events
	WriteTo(ctx context.Context, entityType string, events iter.Seq2[Event, error], subscriberIDs ...string) error
	Close() error
}

EventBus is responsible for distributing an Event to all subscribing Handler's after they are written to the Storage.

type EventUpgrade

type EventUpgrade interface {
	Upgrade(ctx context.Context, events iter.Seq2[Event, error]) iter.Seq2[Event, error]
}

EventUpgrade events into a new version. The events will all be from the same Stream and come in order by EventNumber.

The EventUpgrade is used when reading a Stream and when an Event is published to subscribing Handlers. As a result, it is only the events that are in-flight that will go through the EventUpgrade. Example: If you Open a Stream from EventNumber 3, Event 1 and 2 will not be in the events sequence.

type EventUpgradeFunc

type EventUpgradeFunc func(ctx context.Context, i iter.Seq2[Event, error]) iter.Seq2[Event, error]

func (EventUpgradeFunc) Upgrade

type Handler

type Handler interface {
	Handle(ctx context.Context, event Event) error
}

Handler is code that can Handle when an Event happens.

type HandlerFunc

type HandlerFunc func(ctx context.Context, event Event) error

HandlerFunc is a convenience type to allow an inline func to act as a Handler

func (HandlerFunc) Handle

func (fn HandlerFunc) Handle(ctx context.Context, event Event) error

type InMemoryEventBus

type InMemoryEventBus struct {
	// contains filtered or unexported fields
}

func NewInMemoryEventBus

func NewInMemoryEventBus() *InMemoryEventBus

func (*InMemoryEventBus) Close

func (bus *InMemoryEventBus) Close() error

func (*InMemoryEventBus) GetSubscriberIDs

func (bus *InMemoryEventBus) GetSubscriberIDs(ctx context.Context, entityType string) ([]string, error)

func (*InMemoryEventBus) Subscribe

func (bus *InMemoryEventBus) Subscribe(ctx context.Context, entityType, subscriberID string, handler Handler) error

func (*InMemoryEventBus) Write

func (bus *InMemoryEventBus) Write(ctx context.Context, entityType string, events iter.Seq2[Event, error]) error

func (*InMemoryEventBus) WriteTo

func (bus *InMemoryEventBus) WriteTo(ctx context.Context, entityType string, events iter.Seq2[Event, error], subscribers ...string) error

type Logger

type Logger interface {
	InfofCtx(ctx context.Context, template string, args ...any)
	ErrorfCtx(ctx context.Context, template string, args ...any)
}

type Option

type Option func(*Config)

func WithDefaultSlog

func WithDefaultSlog() Option

func WithEventBus

func WithEventBus(bus EventBus) Option

func WithEventUpgrades

func WithEventUpgrades(entityType string, upgrades ...EventUpgrade) Option

func WithEvents

func WithEvents(entityType string, contentTypes []Content) Option

func WithLogger

func WithLogger(logger Logger) Option

func WithNoopLogger

func WithNoopLogger() Option

func WithSlog

func WithSlog(log *slog.Logger) Option

type ReadWriter

type ReadWriter interface {
	Reader
	Writer
}

type Reader

type Reader interface {
	Read(ctx context.Context, entityType string, entityID string, eventNumber int64) iter.Seq2[Event, error]
}

Reader allows getting a sequence of Events for an EntityType and EntityID

type Storage

type Storage interface {
	// Read the events of of an entityType with the entityID from eventNumber
	Read(ctx context.Context, entityType string, entityID string, eventNumber int64) iter.Seq2[Event, error]
	// Write writes the events to the store.
	// All of the events must be written by sequence.
	// They should all be written or fully fail.
	Write(ctx context.Context, entityType string, events iter.Seq2[Event, error]) error
	// StartPublish should begin the process where newly written events are published to the Writer.
	// The publishing must be cancelled with the context
	StartPublish(ctx context.Context, w Writer) error
	// Register allows the Storage to Unmarshal multiple shapes of Content for an entityType.
	// It is considered an error if a Storage contains a shape of Content that have not been registered.
	Register(entityType string, types ...Content) error
	// GetEntityIDs returns a list of EntityIDs for the given entityType.
	// The returned list is ordered by the storeEntityID and limited in size by the limit.
	// The second return value is the next storeEntityID and works as a pagination token
	GetEntityIDs(ctx context.Context, entityType string, storeEntityID string, limit int64) ([]string, string, error)
}

Storage is the abstracts the persistence of a Store.

type Store

type Store struct {
	// contains filtered or unexported fields
}

func NewStore

func NewStore(storage Storage, opts ...Option) *Store

func (*Store) Close

func (s *Store) Close() error

func (*Store) GetSubscriberIDs

func (s *Store) GetSubscriberIDs(ctx context.Context, entityType string) ([]string, error)

GetSubscriberIDs returns a list of all subscriber IDs registered for a given entityType.

func (*Store) Open

func (s *Store) Open(ctx context.Context, entityType string, entityID string) Stream

Open a stream by the type and id of the entity. The Stream will be opened at the start and must be closed.

func (*Store) OpenFrom

func (s *Store) OpenFrom(ctx context.Context, entityType string, entityID string, eventNumber int64) Stream

OpenFrom opens a Stream so the first event read will be eventNumber + 1. The Stream must be closed.

func (*Store) Project

func (s *Store) Project(ctx context.Context, entityType, entityID string, handler Handler) (err error)

Project onto a Handler all Events by the type and id of the entity.

func (*Store) Start

func (s *Store) Start(ctx context.Context) error

Start starts the store by starting the storage and the event bus The method is blocking and will return when the store is closed. The method will return an error if the storage fails to publish events.

func (*Store) Subscribe

func (s *Store) Subscribe(ctx context.Context, entityType string, subscriberID string, handler Handler) error

Subscribe to all events on an entityType to be passed to the Handler

type Stream

type Stream interface {
	// Project iterates over all events in the stream and calls the handler for each event.
	// The Stream will stop projecting if the handler returns an error.
	Project(handler Handler) error
	// All returns a iter.Seq2 of all events in the stream.
	// The returned iter.Seq2 will stop and return an error if there was an error
	// reading the events.
	// Calling this method twice will return the same iter.Seq2
	All() iter.Seq2[Event, error]
	// Write writes the given events to the stream.
	// The Events will be written in the order they are given and starting
	// at the most recent event number + 1.
	Write(events ...Content) error
	// Position returns the current position of the stream.
	Position() int64
	Close() error
}

Stream is a sequence of Events that in combination represent the state of an entity. The Stream can be written and read from, which enables applications to alter and get the state.

type Writer

type Writer interface {
	Write(ctx context.Context, entityType string, events iter.Seq2[Event, error]) error
}

Writer allows writing a sequence of Events to an entityType

Directories

Path Synopsis
internal
storage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL