Documentation ¶
Index ¶
- Constants
- func NewProjectionManager(es *EventStore) projection.Manager
- type ErrorStreamIterator
- type EventStore
- func (s *EventStore) AppendTo(ctx context.Context, streamName string, events []*messages.Event) error
- func (s *EventStore) Create(ctx context.Context, stream *eventstore.Stream) error
- func (s *EventStore) DB() *sql.DB
- func (s *EventStore) Delete(ctx context.Context, streamName string) error
- func (s *EventStore) FetchStreamMetadata(ctx context.Context, streamName string) (eventstore.StreamMetadata, error)
- func (s *EventStore) FetchStreamNames(ctx context.Context, filter string, matcher eventstore.MetadataMatcher, ...) ([]string, error)
- func (s *EventStore) FetchStreamNamesRegex(ctx context.Context, filter string, matcher eventstore.MetadataMatcher, ...) ([]string, error)
- func (s *EventStore) GetProjectionManager() projection.Manager
- func (s *EventStore) Load(ctx context.Context, streamName string, from, count uint64, ...) eventstore.StreamIterator
- func (s *EventStore) LoadReverse(ctx context.Context, streamName string, from, count uint64, ...) eventstore.StreamIterator
- func (s *EventStore) Ping(ctx context.Context) error
- func (s *EventStore) UpdateStreamMetadata(ctx context.Context, streamName string, newMetadata eventstore.StreamMetadata) error
- type ProjectionManager
- func (m *ProjectionManager) Create(ctx context.Context, name string, opts []projection.ProjectorOpt) (projection.Projector, error)
- func (m *ProjectionManager) Delete(ctx context.Context, projectionName string) error
- func (m *ProjectionManager) FetchPojectionStatus(ctx context.Context, projectionName string) (projection.Status, error)
- func (m *ProjectionManager) FetchPojectionStreamPositions(ctx context.Context, projectionName string) (projection.StreamPositions, error)
- func (m *ProjectionManager) FetchProjectionNames(ctx context.Context, filter string, start, limit uint64) ([]string, error)
- func (m *ProjectionManager) Reset(ctx context.Context, projectionName string) error
- func (m *ProjectionManager) Stop(ctx context.Context, projectionName string) error
- type StreamIterator
- type StreamProjection
- func (p *StreamProjection) FromStream(streamName string) projection.Projector
- func (p *StreamProjection) FromStreams(streamNames []string) projection.Projector
- func (p *StreamProjection) Run(ctx context.Context) error
- func (p *StreamProjection) Stop(ctx context.Context) error
- func (p *StreamProjection) When(eventName string, cb projection.Handler) projection.Projector
- func (p *StreamProjection) WhenAny(cb projection.Handler) projection.Projector
Constants ¶
const ( // DefaultBatchSize ... DefaultBatchSize uint64 = 1000 )
Variables ¶
This section is empty.
Functions ¶
func NewProjectionManager ¶
func NewProjectionManager(es *EventStore) projection.Manager
NewProjectionManager will get a projection manager that uses the MySQL backend to store projection states.
Types ¶
type ErrorStreamIterator ¶
type ErrorStreamIterator struct {
// contains filtered or unexported fields
}
ErrorStreamIterator is returned when an error occured getting stream data, maybe it didn't exist.
func (*ErrorStreamIterator) Current ¶
func (it *ErrorStreamIterator) Current() *messages.Event
Current always returns nil.
func (*ErrorStreamIterator) Error ¶
func (it *ErrorStreamIterator) Error() string
Error will return the inner error's Error method result.
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore will use a MySQL database to manage streams.
func New ¶
func New(ctx context.Context, dsn string, batchSize uint64, payloadBuilder messages.PayloadBuilder) (*EventStore, error)
New returns a new MySQL event store, it is best to send a context with a deadline so we do not hang.
func (*EventStore) AppendTo ¶
func (s *EventStore) AppendTo(ctx context.Context, streamName string, events []*messages.Event) error
AppendTo will append events to the stream.
func (*EventStore) Create ¶
func (s *EventStore) Create(ctx context.Context, stream *eventstore.Stream) error
Create will create the stream with the name and metadata provided.
AFAIK MySQL doesn't have transactions that support schema changes along with inserting rows etc so this could leave the database in a dodgy state.
func (*EventStore) Delete ¶
func (s *EventStore) Delete(ctx context.Context, streamName string) error
Delete will remove the stream.
func (*EventStore) FetchStreamMetadata ¶
func (s *EventStore) FetchStreamMetadata(ctx context.Context, streamName string) (eventstore.StreamMetadata, error)
FetchStreamMetadata gets the metadata about a stream.
func (*EventStore) FetchStreamNames ¶
func (s *EventStore) FetchStreamNames(ctx context.Context, filter string, matcher eventstore.MetadataMatcher, limit, offset uint64) ([]string, error)
FetchStreamNames gets stream names that match the filter.
func (*EventStore) FetchStreamNamesRegex ¶
func (s *EventStore) FetchStreamNamesRegex(ctx context.Context, filter string, matcher eventstore.MetadataMatcher, limit, offset uint64) ([]string, error)
FetchStreamNamesRegex gets stream names that match the regex filter.
func (*EventStore) GetProjectionManager ¶
func (s *EventStore) GetProjectionManager() projection.Manager
GetProjectionManager ...
func (*EventStore) Load ¶
func (s *EventStore) Load(ctx context.Context, streamName string, from, count uint64, matcher eventstore.MetadataMatcher) eventstore.StreamIterator
Load events from the given stream name.
func (*EventStore) LoadReverse ¶
func (s *EventStore) LoadReverse(ctx context.Context, streamName string, from, count uint64, matcher eventstore.MetadataMatcher) eventstore.StreamIterator
LoadReverse Loads events from the given stream name in reverse.
func (*EventStore) Ping ¶
func (s *EventStore) Ping(ctx context.Context) error
Ping tests connection to the database is still ok.
func (*EventStore) UpdateStreamMetadata ¶
func (s *EventStore) UpdateStreamMetadata(ctx context.Context, streamName string, newMetadata eventstore.StreamMetadata) error
UpdateStreamMetadata sets the metadata for the given stream name.
type ProjectionManager ¶
type ProjectionManager struct {
// contains filtered or unexported fields
}
ProjectionManager ...
func (*ProjectionManager) Create ¶
func (m *ProjectionManager) Create(ctx context.Context, name string, opts []projection.ProjectorOpt) (projection.Projector, error)
Create ...
func (*ProjectionManager) Delete ¶
func (m *ProjectionManager) Delete(ctx context.Context, projectionName string) error
Delete ...
func (*ProjectionManager) FetchPojectionStatus ¶
func (m *ProjectionManager) FetchPojectionStatus(ctx context.Context, projectionName string) (projection.Status, error)
FetchPojectionStatus ...
func (*ProjectionManager) FetchPojectionStreamPositions ¶
func (m *ProjectionManager) FetchPojectionStreamPositions(ctx context.Context, projectionName string) (projection.StreamPositions, error)
FetchPojectionStreamPositions ...
func (*ProjectionManager) FetchProjectionNames ¶
func (m *ProjectionManager) FetchProjectionNames(ctx context.Context, filter string, start, limit uint64) ([]string, error)
FetchProjectionNames ...
type StreamIterator ¶
type StreamIterator struct {
// contains filtered or unexported fields
}
StreamIterator iterates over events from a MySQL database.
func (*StreamIterator) Close ¶
func (it *StreamIterator) Close()
Close will clean up resources, do not attempt to use stream after closing.
func (*StreamIterator) Current ¶
func (it *StreamIterator) Current() *messages.Event
Current will return the item we currently have.
func (*StreamIterator) Next ¶
func (it *StreamIterator) Next(ctx context.Context) error
Next will get the next result, and if there is an error return it. Once next has been called without an error returned you can grab the result from Current()
func (*StreamIterator) Rewind ¶
func (it *StreamIterator) Rewind()
Rewind will set the position of the stream back to the default position and allow you to iterate of the stream again.
type StreamProjection ¶
type StreamProjection struct {
// contains filtered or unexported fields
}
StreamProjection ...
func (*StreamProjection) FromStream ¶
func (p *StreamProjection) FromStream(streamName string) projection.Projector
FromStream will limit the Projector to events from 1 stream.
func (*StreamProjection) FromStreams ¶
func (p *StreamProjection) FromStreams(streamNames []string) projection.Projector
FromStreams will limit the Projector to events from many streams.
func (*StreamProjection) Run ¶
func (p *StreamProjection) Run(ctx context.Context) error
Run will start the processing of the events.
func (*StreamProjection) Stop ¶
func (p *StreamProjection) Stop(ctx context.Context) error
Stop will stop the processing of the events.
func (*StreamProjection) When ¶
func (p *StreamProjection) When(eventName string, cb projection.Handler) projection.Projector
When the event with the event name is given the callback will be called.
func (*StreamProjection) WhenAny ¶
func (p *StreamProjection) WhenAny(cb projection.Handler) projection.Projector
WhenAny event is given the callback will be called.