mongodb

package
v0.38.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2023 License: MIT Imports: 26 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func OutboxInsertHandler added in v0.34.0

func OutboxInsertHandler[K eventsourcing.ID](database, collName string) store.InTxHandler[K]

Types

type ChangeEvent

type ChangeEvent struct {
	FullDocument Event `bson:"fullDocument,omitempty"`
}

type EsRepository

type EsRepository[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct {
	Repository
	// contains filtered or unexported fields
}

func NewStore

func NewStore[K eventsourcing.ID, PK eventsourcing.IDPt[K]](ctx context.Context, client *mongo.Client, database string, opts ...Option[K, PK]) (*EsRepository[K, PK], error)

NewStore creates a new instance of MongoEsRepository

func NewStoreWithURI added in v0.33.0

func NewStoreWithURI[K eventsourcing.ID, PK eventsourcing.IDPt[K]](ctx context.Context, connString, database string, opts ...Option[K, PK]) (*EsRepository[K, PK], error)

NewStoreWithURI creates a new instance of MongoEsRepository

func (*EsRepository[K, PK]) Client added in v0.33.0

func (r *EsRepository[K, PK]) Client() *mongo.Client

func (*EsRepository[K, PK]) Close

func (r *EsRepository[K, PK]) Close(ctx context.Context)

func (*EsRepository[K, PK]) Forget

func (r *EsRepository[K, PK]) Forget(ctx context.Context, request eventsourcing.ForgetRequest[K], forget func(kind eventsourcing.Kind, body []byte) ([]byte, error)) error

func (*EsRepository[K, PK]) GetAggregateEvents

func (r *EsRepository[K, PK]) GetAggregateEvents(ctx context.Context, aggregateID K, snapVersion int) ([]*eventsourcing.Event[K], error)

func (*EsRepository[K, PK]) GetEvents

func (r *EsRepository[K, PK]) GetEvents(ctx context.Context, after, until eventid.EventID, batchSize int, filter store.Filter) ([]*eventsourcing.Event[K], error)

func (*EsRepository[K, PK]) GetEventsByRawIDs added in v0.35.0

func (r *EsRepository[K, PK]) GetEventsByRawIDs(ctx context.Context, ids []string) ([]*eventsourcing.Event[K], error)

func (*EsRepository[K, PK]) GetSnapshot

func (r *EsRepository[K, PK]) GetSnapshot(ctx context.Context, aggregateID K) (eventsourcing.Snapshot[K], error)

func (*EsRepository[K, PK]) HasIdempotencyKey

func (r *EsRepository[K, PK]) HasIdempotencyKey(ctx context.Context, idempotencyKey string) (bool, error)

func (*EsRepository[K, PK]) MigrateInPlaceCopyReplace added in v0.21.0

func (r *EsRepository[K, PK]) MigrateInPlaceCopyReplace(
	ctx context.Context,
	revision int,
	snapshotThreshold uint32,
	rehydrateFunc func(eventsourcing.Aggregater[K], *eventsourcing.Event[K]) error,
	codec eventsourcing.Codec[K],
	handler eventsourcing.MigrationHandler[K],
	targetAggregateKind eventsourcing.Kind,
	aggregateKind eventsourcing.Kind,
	eventTypeCriteria ...eventsourcing.Kind,
) error

func (*EsRepository[K, PK]) SaveEvent

func (r *EsRepository[K, PK]) SaveEvent(ctx context.Context, eRec *eventsourcing.EventRecord[K]) (eventid.EventID, uint32, error)

func (*EsRepository[K, PK]) SaveSnapshot

func (r *EsRepository[K, PK]) SaveSnapshot(ctx context.Context, snapshot *eventsourcing.Snapshot[K]) error

type Event

type Event struct {
	ID               string             `bson:"_id,omitempty"`
	AggregateID      string             `bson:"aggregate_id,omitempty"`
	AggregateIDHash  int32              `bson:"aggregate_id_hash,omitempty"`
	AggregateVersion uint32             `bson:"aggregate_version,omitempty"`
	AggregateKind    eventsourcing.Kind `bson:"aggregate_kind,omitempty"`
	Kind             eventsourcing.Kind `bson:"kind,omitempty"`
	Body             []byte             `bson:"body,omitempty"`
	IdempotencyKey   string             `bson:"idempotency_key,omitempty"`
	Metadata         bson.M             `bson:"metadata,omitempty"`
	CreatedAt        time.Time          `bson:"created_at,omitempty"`
	Migration        int                `bson:"migration"`
	Migrated         bool               `bson:"migrated"`
}

Event is the event data stored in the database

type EventsRepository added in v0.34.0

type EventsRepository[K eventsourcing.ID] interface {
	GetEventsByRawIDs(context.Context, []string) ([]*eventsourcing.Event[K], error)
}

type Feed

type Feed[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct {
	// contains filtered or unexported fields
}

func NewFeed

func NewFeed[K eventsourcing.ID, PK eventsourcing.IDPt[K]](logger *slog.Logger, connString, database string, sinker sink.Sinker[K], opts ...FeedOption[K, PK]) (Feed[K, PK], error)

func (*Feed[K, PK]) Run added in v0.25.0

func (f *Feed[K, PK]) Run(ctx context.Context) error

type FeedOption

type FeedOption[K eventsourcing.ID, PK eventsourcing.IDPt[K]] func(*Feed[K, PK])

func WithFeedEventsCollection

func WithFeedEventsCollection[K eventsourcing.ID, PK eventsourcing.IDPt[K]](eventsCollection string) FeedOption[K, PK]

func WithFilter added in v0.36.0

func WithFilter[K eventsourcing.ID, PK eventsourcing.IDPt[K]](filter *store.Filter) FeedOption[K, PK]

type KVStore added in v0.34.0

type KVStore struct {
	Repository
	// contains filtered or unexported fields
}

func NewKVStore added in v0.34.0

func NewKVStore(client *mongo.Client, dbName, collection string) KVStore

func NewKVStoreWithURI added in v0.34.0

func NewKVStoreWithURI(connString, database, collection string) (KVStore, error)

func (KVStore) Get added in v0.34.0

func (m KVStore) Get(ctx context.Context, key string) (string, error)

func (KVStore) Put added in v0.34.0

func (m KVStore) Put(ctx context.Context, key string, value string) error

type Option added in v0.28.0

type Option[K eventsourcing.ID, PK eventsourcing.IDPt[K]] func(f *EsRepository[K, PK])

func WithEventsCollection

func WithEventsCollection[K eventsourcing.ID, PK eventsourcing.IDPt[K]](eventsCollection string) Option[K, PK]

func WithMetadata added in v0.36.0

func WithMetadata[K eventsourcing.ID, PK eventsourcing.IDPt[K]](metadata eventsourcing.Metadata) Option[K, PK]

WithMetadata defines the metadata to be save on every event. Data keys will be converted to lower case

func WithMetadataHook added in v0.36.0

func WithMetadataHook[K eventsourcing.ID, PK eventsourcing.IDPt[K]](fn store.MetadataHook[K]) Option[K, PK]

WithMetadataHook defines the hook that will return the metadata. This metadata will override any metadata defined at the repository level

func WithPostSchemaCreation added in v0.38.0

func WithPostSchemaCreation[K eventsourcing.ID, PK eventsourcing.IDPt[K]](post func(Schema) []bson.D) Option[K, PK]

func WithSkipSchemaCreation added in v0.38.0

func WithSkipSchemaCreation[K eventsourcing.ID, PK eventsourcing.IDPt[K]](skip bool) Option[K, PK]

func WithSnapshotsCollection

func WithSnapshotsCollection[K eventsourcing.ID, PK eventsourcing.IDPt[K]](snapshotsCollection string) Option[K, PK]

func WithTxHandler added in v0.34.0

func WithTxHandler[K eventsourcing.ID, PK eventsourcing.IDPt[K]](txHandler store.InTxHandler[K]) Option[K, PK]

type Outbox added in v0.34.0

type Outbox struct {
	ID              string             `bson:"_id,omitempty"`
	AggregateID     string             `bson:"aggregate_id,omitempty"`
	AggregateIDHash uint32             `bson:"aggregate_id_hash,omitempty"`
	AggregateKind   eventsourcing.Kind `bson:"aggregate_kind,omitempty"`
	Kind            eventsourcing.Kind `bson:"kind,omitempty"`
	Metadata        bson.M             `bson:"metadata,omitempty"`
}

type OutboxRepository added in v0.34.0

type OutboxRepository[K eventsourcing.ID] struct {
	Repository
	// contains filtered or unexported fields
}

func NewOutboxStore added in v0.34.0

func NewOutboxStore[K eventsourcing.ID](client *mongo.Client, database, collectionName string, eventsRepo EventsRepository[K]) *OutboxRepository[K]

func (*OutboxRepository[K]) AfterSink added in v0.34.0

func (r *OutboxRepository[K]) AfterSink(ctx context.Context, evtID eventid.EventID) error

func (*OutboxRepository[K]) PendingEvents added in v0.34.0

func (r *OutboxRepository[K]) PendingEvents(ctx context.Context, batchSize int, filter store.Filter) ([]*eventsourcing.Event[K], error)

type Repository added in v0.33.0

type Repository struct {
	// contains filtered or unexported fields
}

func (Repository) WithTx added in v0.33.0

func (r Repository) WithTx(ctx context.Context, callback func(context.Context) error) (err error)

type Schema added in v0.38.0

type Schema struct {
	CollectionNames []string
}

type Snapshot

type Snapshot struct {
	ID               string             `bson:"_id,omitempty"`
	AggregateID      string             `bson:"aggregate_id,omitempty"`
	AggregateVersion uint32             `bson:"aggregate_version,omitempty"`
	AggregateKind    eventsourcing.Kind `bson:"aggregate_kind,omitempty"`
	Body             []byte             `bson:"body,omitempty"`
	CreatedAt        time.Time          `bson:"created_at,omitempty"`
	Metadata         bson.M
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL