Documentation ¶
Overview ¶
Package postgres contains an Event Store implementation using PostgreSQL as backend data store.
Index ¶
- Constants
- Variables
- type EventStore
- func (st *EventStore) Append(ctx context.Context, id eventstore.StreamID, expected eventstore.VersionCheck, ...) (v int64, err error)
- func (st *EventStore) Close() error
- func (st *EventStore) Read(ctx context.Context, subscriptionName string) (int64, error)
- func (st *EventStore) Register(ctx context.Context, events ...eventually.Payload) error
- func (st *EventStore) Stream(ctx context.Context, es eventstore.EventStream, id eventstore.StreamID, ...) error
- func (st *EventStore) StreamAll(ctx context.Context, es eventstore.EventStream, selectt eventstore.Select) error
- func (st *EventStore) StreamByType(ctx context.Context, es eventstore.EventStream, typ string, ...) error
- func (st *EventStore) SubscribeToAll(ctx context.Context, es eventstore.EventStream) error
- func (st *EventStore) SubscribeToType(ctx context.Context, es eventstore.EventStream, typ string) error
- func (st *EventStore) Write(ctx context.Context, subscriptionName string, sequenceNumber int64) error
Constants ¶
const ( // DefaultNotifyChannelTimeout is the default refresh timeout for each // notifications received through LISTEN. DefaultNotifyChannelTimeout = 10 * time.Second // DefaultReconnectionTimeout is the minimum timeout value the database driver // uses before re-establishing a connection with the database when // the previous one had been closed. DefaultReconnectionTimeout = 10 * time.Second )
Variables ¶
var ErrEmptyEventsMap = fmt.Errorf("postgres.EventStore: empty events map provided for type")
ErrEmptyEventsMap occurs during a call to Register where a nil or empty Events map is provided, which would mean no events would be registered for the desired type.
Functions ¶
This section is empty.
Types ¶
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore is an eventstore.Store implementation which uses PostgreSQL as backend datastore.
func OpenEventStore ¶
func OpenEventStore(dsn string) (*EventStore, error)
OpenEventStore opens a connection with the PostgreSQL identified by the provided DSN and run migrations for the Event Store functionalities.
func (*EventStore) Append ¶
func (st *EventStore) Append( ctx context.Context, id eventstore.StreamID, expected eventstore.VersionCheck, events ...eventually.Event, ) (v int64, err error)
Append inserts the specified Domain Events into the Event Stream specified by the current instance, returning the new version of the Event Stream.
A version can be specified to enable an Optimistic Concurrency check on append, by using the expected version of the Event Stream prior to appending the new Events.
Alternatively, VersionCheckAny can be used if no Optimistic Concurrency check should be carried out.
NOTE: this implementation is not returning yet eventstore.ErrConflict in case of conflicting expectations with the provided VersionCheck value.
func (*EventStore) Close ¶
func (st *EventStore) Close() error
Close closes the Event Store database connection.
func (*EventStore) Read ¶
Read reads the latest checkpointed sequence number of the subscription specified.
func (*EventStore) Register ¶
func (st *EventStore) Register(ctx context.Context, events ...eventually.Payload) error
Register registers Domain Events used by the application in order to decode events stored in the database by their name returned by the eventually.Payload trait.
func (*EventStore) Stream ¶
func (st *EventStore) Stream( ctx context.Context, es eventstore.EventStream, id eventstore.StreamID, selectt eventstore.Select, ) error
Stream opens the specific Event Stream identified by the provided id.
func (*EventStore) StreamAll ¶
func (st *EventStore) StreamAll(ctx context.Context, es eventstore.EventStream, selectt eventstore.Select) error
StreamAll opens an Event Stream and sinks all the events in the Event Store in the provided channel, skipping those events with a sequence number lower than the provided bound.
func (*EventStore) StreamByType ¶
func (st *EventStore) StreamByType( ctx context.Context, es eventstore.EventStream, typ string, selectt eventstore.Select, ) error
StreamByType opens a stream of all Event Streams grouped by the same Type, as specified in input.
The stream will be ordered based on their Global Sequence Number.
func (*EventStore) SubscribeToAll ¶
func (st *EventStore) SubscribeToAll(ctx context.Context, es eventstore.EventStream) error
SubscribeToAll subscribes to all the new Events committed to the Event Store and sinks them in the provided channel.
func (*EventStore) SubscribeToType ¶
func (st *EventStore) SubscribeToType(ctx context.Context, es eventstore.EventStream, typ string) error
SubscribeToType subscribes to all the new Events of the specified Stream Type committed to the Event Store and sinks them in the provided channel.
Directories ¶
Path | Synopsis |
---|---|
Code generated for package migrations by go-bindata DO NOT EDIT.
|
Code generated for package migrations by go-bindata DO NOT EDIT. |