Documentation
¶
Index ¶
- func HydrateState[T Handler](t *testing.T, state T, contents ...Content) T
- func VerifyEvents(t *testing.T, events []Content, expected ...any) bool
- type Config
- type Content
- type Event
- type EventBus
- type EventUpgrade
- type EventUpgradeFunc
- type Handler
- type HandlerFunc
- type InMemoryEventBus
- func (bus *InMemoryEventBus) Close() error
- func (bus *InMemoryEventBus) GetSubscriberIDs(ctx context.Context, entityType string) ([]string, error)
- func (bus *InMemoryEventBus) Subscribe(ctx context.Context, entityType, subscriberID string, handler Handler) error
- func (bus *InMemoryEventBus) Write(ctx context.Context, entityType string, events iter.Seq2[Event, error]) error
- func (bus *InMemoryEventBus) WriteTo(ctx context.Context, entityType string, events iter.Seq2[Event, error], ...) error
- type Logger
- type Option
- func WithDefaultSlog() Option
- func WithEventBus(bus EventBus) Option
- func WithEventUpgrades(entityType string, upgrades ...EventUpgrade) Option
- func WithEvents(entityType string, contentTypes []Content) Option
- func WithLogger(logger Logger) Option
- func WithNoopLogger() Option
- func WithSlog(log *slog.Logger) Option
- type ReadWriter
- type Reader
- type Storage
- type Store
- func (s *Store) Close() error
- func (s *Store) GetSubscriberIDs(ctx context.Context, entityType string) ([]string, error)
- func (s *Store) Open(ctx context.Context, entityType string, entityID string) Stream
- func (s *Store) OpenFrom(ctx context.Context, entityType string, entityID string, eventNumber int64) Stream
- func (s *Store) Project(ctx context.Context, entityType, entityID string, handler Handler) (err error)
- func (s *Store) Start(ctx context.Context) error
- func (s *Store) Subscribe(ctx context.Context, entityType string, subscriberID string, handler Handler) error
- type Stream
- type Writer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func HydrateState ¶
HydrateState is a test helper meant to make it easy to hydrate a state using event data.
Types ¶
type Content ¶
type Content interface {
Name() string
}
Content is the application specific data model used in an Event.
type Event ¶
type Event struct { // EntityID is the ID of the stream the event belongs to. EntityID string // EntityType is the type of the stream the event belongs to. EntityType string // EventNumber is the number of the event in the stream. EventNumber int64 // EventTime is the time the event was recorded in the Store EventTime time.Time // Content is the actual content of the event. Expected to be a struct defined // by the application. Content Content // StoreEventID is the ID of the event assigned by the Store // The StoreEventID is a UUIDv7 with the underlying time matching the EventTime StoreEventID string // StoreEntityID is the ID of the stream assigned by the Store // The StoreEntityID is a UUIDv7 with the underlying time matching the EventTime // of the first event in the stream. StoreEntityID string }
Event is a combination of the metadata and content of a business event in the system. It is part of a Stream that makes up the current state of a business entity.
type EventBus ¶
type EventBus interface { // Write should write the events to all subscriptions Write(ctx context.Context, entityType string, events iter.Seq2[Event, error]) error // Subscribe a Handler by it's subscriptionID Subscribe(ctx context.Context, entityType string, subscriberID string, handler Handler) error // GetSubscriberIDs returns a list of all subscription IDs for the eventType GetSubscriberIDs(ctx context.Context, entityType string) ([]string, error) // WriteTo call all Handler with subscriberIDs with the events WriteTo(ctx context.Context, entityType string, events iter.Seq2[Event, error], subscriberIDs ...string) error Close() error }
EventBus is responsible for distributing an Event to all subscribing Handler's after they are written to the Storage.
type EventUpgrade ¶
type EventUpgrade interface {
Upgrade(ctx context.Context, events iter.Seq2[Event, error]) iter.Seq2[Event, error]
}
EventUpgrade events into a new version. The events will all be from the same Stream and come in order by EventNumber.
The EventUpgrade is used when reading a Stream and when an Event is published to subscribing Handlers. As a result, it is only the events that are in-flight that will go through the EventUpgrade. Example: If you Open a Stream from EventNumber 3, Event 1 and 2 will not be in the events sequence.
type EventUpgradeFunc ¶
type HandlerFunc ¶
HandlerFunc is a convenience type to allow an inline func to act as a Handler
type InMemoryEventBus ¶
type InMemoryEventBus struct {
// contains filtered or unexported fields
}
func NewInMemoryEventBus ¶
func NewInMemoryEventBus() *InMemoryEventBus
func (*InMemoryEventBus) Close ¶
func (bus *InMemoryEventBus) Close() error
func (*InMemoryEventBus) GetSubscriberIDs ¶
type Option ¶
type Option func(*Config)
func WithDefaultSlog ¶
func WithDefaultSlog() Option
func WithEventBus ¶
func WithEventUpgrades ¶
func WithEventUpgrades(entityType string, upgrades ...EventUpgrade) Option
func WithEvents ¶
func WithLogger ¶
func WithNoopLogger ¶
func WithNoopLogger() Option
type ReadWriter ¶
type Reader ¶
type Reader interface {
Read(ctx context.Context, entityType string, entityID string, eventNumber int64) iter.Seq2[Event, error]
}
Reader allows getting a sequence of Events for an EntityType and EntityID
type Storage ¶
type Storage interface { // Read the events of of an entityType with the entityID from eventNumber Read(ctx context.Context, entityType string, entityID string, eventNumber int64) iter.Seq2[Event, error] // Write writes the events to the store. // All of the events must be written by sequence. // They should all be written or fully fail. Write(ctx context.Context, entityType string, events iter.Seq2[Event, error]) error // StartPublish should begin the process where newly written events are published to the Writer. // The publishing must be cancelled with the context StartPublish(ctx context.Context, w Writer) error // Register allows the Storage to Unmarshal multiple shapes of Content for an entityType. // It is considered an error if a Storage contains a shape of Content that have not been registered. Register(entityType string, types ...Content) error // GetEntityIDs returns a list of EntityIDs for the given entityType. // The returned list is ordered by the storeEntityID and limited in size by the limit. // The second return value is the next storeEntityID and works as a pagination token GetEntityIDs(ctx context.Context, entityType string, storeEntityID string, limit int64) ([]string, string, error) }
Storage is the abstracts the persistence of a Store.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
func (*Store) GetSubscriberIDs ¶
GetSubscriberIDs returns a list of all subscriber IDs registered for a given entityType.
func (*Store) Open ¶
Open a stream by the type and id of the entity. The Stream will be opened at the start and must be closed.
func (*Store) OpenFrom ¶
func (s *Store) OpenFrom(ctx context.Context, entityType string, entityID string, eventNumber int64) Stream
OpenFrom opens a Stream so the first event read will be eventNumber + 1. The Stream must be closed.
func (*Store) Project ¶
func (s *Store) Project(ctx context.Context, entityType, entityID string, handler Handler) (err error)
Project onto a Handler all Events by the type and id of the entity.
type Stream ¶
type Stream interface { // Project iterates over all events in the stream and calls the handler for each event. // The Stream will stop projecting if the handler returns an error. Project(handler Handler) error // All returns a iter.Seq2 of all events in the stream. // The returned iter.Seq2 will stop and return an error if there was an error // reading the events. // Calling this method twice will return the same iter.Seq2 All() iter.Seq2[Event, error] // Write writes the given events to the stream. // The Events will be written in the order they are given and starting // at the most recent event number + 1. Write(events ...Content) error // Position returns the current position of the stream. Position() int64 Close() error }
Stream is a sequence of Events that in combination represent the state of an entity. The Stream can be written and read from, which enables applications to alter and get the state.