eventsource

package module
v0.0.0-...-d8c2073 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 14, 2022 License: MIT Imports: 11 Imported by: 0

README

eventsource

A minimalist and highly customizable kit for building event-sourced applications.

Documentation

Overview

Example
package main

import (
	"context"
	"fmt"
	"time"
)

const (
	Packed   = "packed"
	PickedUp = "picked-up"
	Shipped  = "shipped"
)

type ShipmentPacked struct{ EventSkeleton }
type ShipmentPickedUp struct{ EventSkeleton }
type ShipmentShipped struct{ EventSkeleton }

type Shipment struct {
	AggregateRootBase
	Status string
}

func (s *Shipment) Handle(ctx context.Context, v interface{}) error {
	var e Event
	switch cmd := v.(type) {
	case PackShipment:
		e = &ShipmentPacked{EventSkeleton{
			At: time.Now(),
			ID: cmd.AggregateID(),
		}}
	case PickupShipment:
		e = &ShipmentPickedUp{EventSkeleton{
			At: time.Now(),
			ID: cmd.AggregateID(),
		}}
	case ShipShipment:
		e = &ShipmentShipped{EventSkeleton{
			At: time.Now(),
			ID: cmd.AggregateID(),
		}}
	default:
		return fmt.Errorf("unexpected command %T", cmd)
	}

	return s.Apply(s, e, true)
}

func (s *Shipment) On(e Event) error {
	switch v := e.(type) {
	case *ShipmentPacked:
		s.ID = v.AggregateID()
		s.Status = Packed
	case *ShipmentPickedUp:
		s.ID = v.AggregateID()
		s.Status = PickedUp
	case *ShipmentShipped:
		s.ID = v.AggregateID()
		s.Status = Shipped
	default:
		return fmt.Errorf("unexpected event %T", e)
	}

	return nil
}

type PackShipment struct{ Command }
type PickupShipment struct{ Command }
type ShipShipment struct{ Command }

func (PackShipment) New() bool { return true }

func main() {
	marshaler := new(JsonEventMarshaler)
	marshaler.Bind(ShipmentPacked{}, ShipmentPickedUp{}, ShipmentShipped{})

	var (
		ctx      = context.Background()
		shipment = &Shipment{}
		repo     = NewRepository(shipment, WithMarshaler(marshaler))
		bus      = NewCommandBus(repo)
	)

	bus.Send(ctx, PackShipment{Command{ID: "abc123"}})
	aggregate, _ := repo.GetByID(ctx, "abc123")

	shipment, _ = aggregate.(*Shipment)
	fmt.Println(shipment.Status)
	fmt.Println(shipment.GetVersion())
	// Output: packed
	// Output: 0

	bus.Send(ctx, PickupShipment{Command{ID: shipment.AggregateRootID()}})
	aggregate, _ = repo.GetByID(ctx, shipment.AggregateRootID())
	shipment, _ = aggregate.(*Shipment)

	fmt.Println(shipment.Status)
	fmt.Println(shipment.GetVersion())
	// Output: picked-up
	
Output:

shipped
Output: 2

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrSnapNotFound = errors.New("snapshot not found")

Functions

func NewCommandBus

func NewCommandBus(repo AggregateRootRepository, middlewares ...command.Middleware) command.CommandSender

Types

type AggregateCommand

type AggregateCommand interface {
	AggregateID() string
}

AggregateCommand confirms that the command must have aggregate id which can be helpful to replay aggreate events from event store to build aggregate current state.

type AggregateHandler

type AggregateHandler interface {
	command.Handler
}

type AggregateRepository

type AggregateRepository struct {
	Marshaler EventMarshaler
	// contains filtered or unexported fields
}

func (*AggregateRepository) GetByID

func (r *AggregateRepository) GetByID(ctx context.Context, aggrID string) (AggregateRoot, error)

func (*AggregateRepository) LoadFromSnap

func (r *AggregateRepository) LoadFromSnap(ctx context.Context, aggrID string) (SnapshottingBehaviour, error)

func (*AggregateRepository) New

func (r *AggregateRepository) New() interface{}

func (*AggregateRepository) Save

type AggregateRoot

type AggregateRoot interface {
	AggregateRootID() string
	GetVersion() int
	StreamSize() int
	On(e Event) error
	Apply(aggr AggregateRoot, e Event, isNew bool) error
	LoadFromHistory(aggr AggregateRoot, history []Event) error
	GetUncommitedEvents() []Event
	CommitEvents()
}

type AggregateRootBase

type AggregateRootBase struct {
	ID string

	Version int
	// contains filtered or unexported fields
}

func (*AggregateRootBase) AggregateRootID

func (aggr *AggregateRootBase) AggregateRootID() string

func (*AggregateRootBase) Apply

func (aggr *AggregateRootBase) Apply(aggregate AggregateRoot, e Event, isNew bool) error

func (*AggregateRootBase) CommitEvents

func (aggr *AggregateRootBase) CommitEvents()

func (*AggregateRootBase) GetUncommitedEvents

func (aggr *AggregateRootBase) GetUncommitedEvents() []Event

func (*AggregateRootBase) GetVersion

func (aggr *AggregateRootBase) GetVersion() int

func (*AggregateRootBase) LoadFromHistory

func (aggr *AggregateRootBase) LoadFromHistory(aggregate AggregateRoot, history []Event) error

func (*AggregateRootBase) StreamSize

func (i *AggregateRootBase) StreamSize() int

type AggregateRootRepository

type AggregateRootRepository interface {
	New() interface{}
	Save(ctx context.Context, agr AggregateRoot) error
	GetByID(ctx context.Context, aggregateRootID string) (AggregateRoot, error)
}

func NewRepository

func NewRepository(aggr AggregateRoot, opts ...Option) AggregateRootRepository

type AggregateRootSnapshotRepository

type AggregateRootSnapshotRepository interface {
	Save(ctx context.Context, snap Snapshot) error
	GetByID(ctx context.Context, aggregateRootID string, version int) (Snapshot, error)
}

type Command

type Command struct {
	ID string
}

Command is an model for AggregateCommand used to avoid code boilerplate for commands implmenting AggregateCommand

func (Command) AggregateID

func (c Command) AggregateID() string

type Constructor

type Constructor interface {
	New() bool
}

Constructor command implementing this interface can be used to construct new aggregate

type EpochMillis

type EpochMillis int64

func Now

func Now() EpochMillis

func Time

func Time(t time.Time) EpochMillis

func (EpochMillis) Int64

func (e EpochMillis) Int64() int64

func (EpochMillis) String

func (e EpochMillis) String() string

func (EpochMillis) Time

func (e EpochMillis) Time() time.Time

type Event

type Event interface {
	// AggregateID returns the aggregate id of the event
	AggregateID() string

	// Version contains version number of aggregate
	EventVersion() int

	// At indicates when the event took place
	EventAt() time.Time
}

type EventMarshaler

type EventMarshaler interface {
	Bind(e ...Event) error
	Marshal(e Event) (EventModel, error)
	Unmarshal(e EventModel) (Event, error)
}

type EventModel

type EventModel struct {
	// Version is the event version the Data represents
	Version int

	// At indicates when the event happened; provided as a utility for the store
	At EpochMillis

	// Data contains the Serializer encoded version of the data
	Data []byte
}

EventModel provides the shape of the records to be saved to the db

type EventSkeleton

type EventSkeleton struct {
	// ID contains the AggregateID
	ID string

	// Version holds the event version
	Version int

	// At contains the event time
	At time.Time
}

EventSkeleton provides a default implementation of an Event that is suitable for being embedded

func (EventSkeleton) AggregateID

func (m EventSkeleton) AggregateID() string

AggregateID implements part of the Event interface

func (EventSkeleton) EventAt

func (m EventSkeleton) EventAt() time.Time

EventAt implements part of the Event interface

func (EventSkeleton) EventVersion

func (m EventSkeleton) EventVersion() int

EventVersion implements part of the Event interface

type EventStore

type EventStore interface {
	SaveEvents(ctx context.Context, aggrID string, models History, version int) error
	GetEventsForAggregate(ctx context.Context, aggrID string, version int) (History, error)
}

func NewInmemEventStore

func NewInmemEventStore() EventStore

type History

type History []EventModel

type JsonEventMarshaler

type JsonEventMarshaler struct {
	// contains filtered or unexported fields
}

func (*JsonEventMarshaler) Bind

func (m *JsonEventMarshaler) Bind(events ...Event) error

func (*JsonEventMarshaler) Marshal

func (m *JsonEventMarshaler) Marshal(e Event) (EventModel, error)

func (*JsonEventMarshaler) Unmarshal

func (m *JsonEventMarshaler) Unmarshal(model EventModel) (Event, error)

type JsonSnapshotMarshaler

type JsonSnapshotMarshaler struct {
	// contains filtered or unexported fields
}

func (*JsonSnapshotMarshaler) Bind

func (m *JsonSnapshotMarshaler) Bind(states ...interface{}) error

func (*JsonSnapshotMarshaler) Marshal

func (*JsonSnapshotMarshaler) Unmarshal

func (m *JsonSnapshotMarshaler) Unmarshal(model SnapshotModel) (Snapshot, error)

type Option

type Option func(r *AggregateRepository)

func WithDefaultSnapRepository

func WithDefaultSnapRepository(states ...interface{}) Option

func WithEventStore

func WithEventStore(s EventStore) Option

func WithMarshaler

func WithMarshaler(m EventMarshaler) Option

func WithSnapRepository

func WithSnapRepository(s SnapshotStore, m SnapshotMarshaler) Option

type Snapshot

type Snapshot interface {
	CurrentVersion() int
	GetState() interface{}
	AggregateRootID() string
}

type SnapshotMarshaler

type SnapshotMarshaler interface {
	Bind(v ...interface{}) error
	Marshal(s Snapshot) (SnapshotModel, error)
	Unmarshal(m SnapshotModel) (Snapshot, error)
}

type SnapshotModel

type SnapshotModel struct {
	ID      string
	Version int
	Data    []byte
}

type SnapshotRepository

type SnapshotRepository struct {
	// contains filtered or unexported fields
}

func (*SnapshotRepository) GetByID

func (r *SnapshotRepository) GetByID(ctx context.Context, agrID string, version int) (Snapshot, error)

func (*SnapshotRepository) Save

func (r *SnapshotRepository) Save(ctx context.Context, snap Snapshot) error

type SnapshotSkeleton

type SnapshotSkeleton struct {
	ID      string
	Version int
	State   interface{}
}

func (SnapshotSkeleton) AggregateRootID

func (s SnapshotSkeleton) AggregateRootID() string

func (SnapshotSkeleton) CurrentVersion

func (s SnapshotSkeleton) CurrentVersion() int

func (SnapshotSkeleton) GetState

func (s SnapshotSkeleton) GetState() interface{}

type SnapshotStore

type SnapshotStore interface {
	SaveSnapshot(ctx context.Context, agrID string, model SnapshotModel, version int) error
	GetSnapshotForAggregate(ctx context.Context, agrID string, version int) (SnapshotModel, error)
}

func NewInmemSnapStore

func NewInmemSnapStore() SnapshotStore

type SnapshottingBehaviour

type SnapshottingBehaviour interface {
	AggregateRoot
	SnapshotInterval() int
	GetState() interface{}
	ApplyState(s Snapshot)
	SnapshottingEnable() bool
}

Directories

Path Synopsis
_examples
cmd
bus
eventstore

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL