esxsql

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2021 License: Apache-2.0 Imports: 17 Imported by: 0

README

Esxsql

Module provides an implementation of the event store with event state tracking based on the SQL.

github.com/kucjac/cleango/database/es

Drivers

Currently, all SQL drivers are supported if the database is properly migrated.

Migration

Current implementation allows only PostgreSQL and MySQL automatic migration. PostgreSQL allows also sharding of the event table - shard by aggregate_type and event_state by handler_name.

In order to start migration use the function esxsql.Migrate with proper configuration of table names and sharding possibilities.

In order to migrate the eventstate a configuration needs to have non-nil EventState field defined.

Sharding

In case of sharding all aggregate types provided in the configuration would have its own partition table for the event. The Config needs to have PartitionEventTable field set to true. When a new aggregates is provided, it needs to have its own partition table created - in that case use esxsql.MigrateEventPartitions function, and provide all new aggregate types.

The table that is following event state could also be sharded. In order to migrate event state table with sharding enabled mark PartitionState field as true in the EventStateConfig.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Migrate

func Migrate(conn xsql.DB, config *Config) error

Migrate executes table and types migration for the event store and snapshot. The table names are taken from the config.

func MigrateEventPartitions added in v0.0.20

func MigrateEventPartitions(conn xsql.DB, cfg *Config, aggregateTypes ...string) error

MigrateEventPartitions migrates event partitions

func MigrateEventStatePartitions added in v0.0.20

func MigrateEventStatePartitions(conn xsql.DB, cfg *Config, handlerNames ...string) error

MigrateEventStatePartitions create partitions on event state by its type.

func ToSnakeCase added in v0.0.18

func ToSnakeCase(s string) string

Types

type Config

type Config struct {
	SchemaName          string // Optional
	EventTable          string
	PartitionEventTable bool
	SnapshotTable       string
	AggregateTable      string
	AggregateTypes      []string
	EventState          *EventStateConfig
	WorkersCount        int
}

Config is the configuration for the event storage.

func DefaultConfig

func DefaultConfig(aggregateTypes ...string) *Config

DefaultConfig creates a new default config.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the config is valid to use.

func (*Config) WithEventState added in v0.0.21

func (c *Config) WithEventState(config EventStateConfig) *Config

WithEventState sets up the event state for given config.

type EventStateConfig added in v0.0.21

type EventStateConfig struct {
	EventStateTable    string
	PartitionState     bool
	Handlers           []eventstate.Handler
	HandleFailureTable string
	HandlerTable       string
}

EventStateConfig is a configuration for the event state part.

func DefaultEventStateConfig added in v0.0.21

func DefaultEventStateConfig(handlers ...eventstate.Handler) EventStateConfig

DefaultEventStateConfig is a default configuration for the event state tracking.

func (*EventStateConfig) Validate added in v0.0.21

func (c *EventStateConfig) Validate() error

type StateStorage added in v0.0.20

type StateStorage struct {
	// contains filtered or unexported fields
}

StateStorage is the implementation of the eventstate.Storage interface. It also implements es.StorageBase.

func NewStateStorage added in v0.0.20

func NewStateStorage(conn *xsql.Conn, cfg *Config) (*StateStorage, error)

NewStateStorage creates a new event storage based on provided sqlx connection.

func (*StateStorage) As added in v0.0.20

func (s *StateStorage) As(dst interface{}) error

As exposes driver specific implementation.

func (*StateStorage) BeginTx added in v0.0.20

func (s *StateStorage) BeginTx(ctx context.Context) (esstate.TxStorage, error)

BeginTx starts a new transaction.

func (*StateStorage) Config added in v0.0.20

func (s *StateStorage) Config() Config

Config gets the storage config.

func (*StateStorage) Err added in v0.0.20

func (s *StateStorage) Err(err error) error

Err handles error message with given driver.

func (*StateStorage) ErrorCode added in v0.0.20

func (s *StateStorage) ErrorCode(err error) cgerrors.ErrorCode

ErrorCode gets the error code related to given error.

func (*StateStorage) FindFailures added in v0.0.20

func (s *StateStorage) FindFailures(ctx context.Context, query eventstate.FindFailureQuery) ([]eventstate.HandleFailure, error)

FindFailures implements eventstate.StorageBase.

func (*StateStorage) FindUnhandled added in v0.0.20

func (s *StateStorage) FindUnhandled(ctx context.Context, query eventstate.FindUnhandledQuery) ([]eventstate.Unhandled, error)

FindUnhandled implements eventstate.StorageBase interface. Finds all unhandled event state matching given query.

func (*StateStorage) FinishHandling added in v0.0.20

func (s *StateStorage) FinishHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error

FinishHandling implements eventstate.StorageBase.

func (*StateStorage) GetSnapshot added in v0.0.20

func (s *StateStorage) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*es.Snapshot, error)

GetSnapshot gets the latest snapshot for given aggregate. Implements eventsource.Storage interface.

func (*StateStorage) HandlingFailed added in v0.0.20

func (s *StateStorage) HandlingFailed(ctx context.Context, failure *eventstate.HandleFailure) error

HandlingFailed implements eventstate.StorageBase.

func (*StateStorage) ListEvents added in v0.0.20

func (s *StateStorage) ListEvents(ctx context.Context, aggId, aggType string) ([]*es.Event, error)

ListEvents gets the event stream for provided aggregate. Implements eventsource.Storage interface.

func (*StateStorage) ListEventsAfterRevision added in v0.0.20

func (s *StateStorage) ListEventsAfterRevision(ctx context.Context, aggId string, aggType string, after int64) ([]*es.Event, error)

ListEventsAfterRevision gets the event stream for given aggregate where the revision is subsequent from provided.

func (*StateStorage) ListHandlers added in v0.0.20

func (s *StateStorage) ListHandlers(ctx context.Context) ([]eventstate.Handler, error)

ListHandlers implements eventstate.StorageBase.

func (*StateStorage) MarkUnhandled added in v0.0.20

func (s *StateStorage) MarkUnhandled(ctx context.Context, eventID, eventType string, timestamp int64) error

MarkUnhandled implements eventstate.StorageBase.

func (*StateStorage) RegisterHandlers added in v0.0.20

func (s *StateStorage) RegisterHandlers(ctx context.Context, eventHandlers ...eventstate.Handler) error

RegisterHandlers implements eventstate.StorageBase.

func (*StateStorage) SaveEvents added in v0.0.20

func (s *StateStorage) SaveEvents(ctx context.Context, es []*es.Event) error

SaveEvents stores provided events in the database. Implements eventsource.Storage interface.

func (*StateStorage) SaveSnapshot added in v0.0.20

func (s *StateStorage) SaveSnapshot(ctx context.Context, snap *es.Snapshot) error

SaveSnapshot stores the snapshot in the database. Implements eventsource.Storage interface.

func (*StateStorage) StartHandling added in v0.0.20

func (s *StateStorage) StartHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error

StartHandling implements eventstate.StorageBase.

func (*StateStorage) StreamEvents added in v0.0.20

func (s *StateStorage) StreamEvents(ctx context.Context, req *es.StreamEventsRequest) (<-chan *es.Event, error)

StreamEvents opens the channel of the events stream that matches given request. Implements eventsource.Storage.

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage is the implementation of the eventsource.Storage interface for the sqlx driver.

func New

func New(conn *xsql.Conn, cfg *Config) (*Storage, error)

New creates a new event storage based on provided sqlx connection.

func (*Storage) As

func (s *Storage) As(dst interface{}) error

As exposes driver specific implementation.

func (*Storage) BeginTx

func (s *Storage) BeginTx(ctx context.Context) (es.TxStorage, error)

BeginTx creates and begins a new transaction, which exposes *sqlx.Tx and allows atomic commits.

func (*Storage) Config added in v0.0.18

func (s *Storage) Config() Config

Config gets the storage configuration.

func (*Storage) Err

func (s *Storage) Err(err error) error

Err handles error message with given driver.

func (*Storage) ErrorCode

func (s *Storage) ErrorCode(err error) cgerrors.ErrorCode

ErrorCode gets the error code related to given error.

func (*Storage) FindFailures added in v0.0.20

func (s *Storage) FindFailures(ctx context.Context, query eventstate.FindFailureQuery) ([]eventstate.HandleFailure, error)

FindFailures implements eventstate.StorageBase.

func (*Storage) FindUnhandled added in v0.0.20

func (s *Storage) FindUnhandled(ctx context.Context, query eventstate.FindUnhandledQuery) ([]eventstate.Unhandled, error)

FindUnhandled implements eventstate.StorageBase interface. Finds all unhandled event state matching given query.

func (*Storage) FinishHandling added in v0.0.20

func (s *Storage) FinishHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error

FinishHandling implements eventstate.StorageBase.

func (*Storage) GetSnapshot

func (s *Storage) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*es.Snapshot, error)

GetSnapshot gets the latest snapshot for given aggregate. Implements eventsource.Storage interface.

func (*Storage) HandlingFailed added in v0.0.20

func (s *Storage) HandlingFailed(ctx context.Context, failure *eventstate.HandleFailure) error

HandlingFailed implements eventstate.StorageBase.

func (*Storage) ListEvents

func (s *Storage) ListEvents(ctx context.Context, aggId, aggType string) ([]*es.Event, error)

ListEvents gets the event stream for provided aggregate. Implements eventsource.Storage interface.

func (*Storage) ListEventsAfterRevision

func (s *Storage) ListEventsAfterRevision(ctx context.Context, aggId string, aggType string, after int64) ([]*es.Event, error)

ListEventsAfterRevision gets the event stream for given aggregate where the revision is subsequent from provided.

func (*Storage) ListHandlers added in v0.0.20

func (s *Storage) ListHandlers(ctx context.Context) ([]eventstate.Handler, error)

ListHandlers implements eventstate.StorageBase.

func (*Storage) MarkUnhandled added in v0.0.20

func (s *Storage) MarkUnhandled(ctx context.Context, eventID, eventType string, timestamp int64) error

MarkUnhandled implements eventstate.StorageBase.

func (*Storage) RegisterHandlers added in v0.0.20

func (s *Storage) RegisterHandlers(ctx context.Context, eventHandlers ...eventstate.Handler) error

RegisterHandlers implements eventstate.StorageBase.

func (*Storage) SaveEvents

func (s *Storage) SaveEvents(ctx context.Context, es []*es.Event) error

SaveEvents stores provided events in the database. Implements eventsource.Storage interface.

func (*Storage) SaveSnapshot

func (s *Storage) SaveSnapshot(ctx context.Context, snap *es.Snapshot) error

SaveSnapshot stores the snapshot in the database. Implements eventsource.Storage interface.

func (*Storage) StartHandling added in v0.0.20

func (s *Storage) StartHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error

StartHandling implements eventstate.StorageBase.

func (*Storage) StreamEvents

func (s *Storage) StreamEvents(ctx context.Context, req *es.StreamEventsRequest) (<-chan *es.Event, error)

StreamEvents opens the channel of the events stream that matches given request. Implements eventsource.Storage.

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

Transaction is the implementation of the

func (*Transaction) As

func (t *Transaction) As(dst interface{}) error

As sets the destination with the *sqlx.Tx implementation.

func (*Transaction) Commit

func (t *Transaction) Commit(ctx context.Context) error

Commit commits the transaction.

func (*Transaction) Done

func (t *Transaction) Done() bool

Done checks if the transaction is already done.

func (*Transaction) Err

func (s *Transaction) Err(err error) error

Err handles error message with given driver.

func (*Transaction) ErrorCode

func (s *Transaction) ErrorCode(err error) cgerrors.ErrorCode

ErrorCode gets the error code related to given error.

func (*Transaction) FindFailures added in v0.0.20

func (s *Transaction) FindFailures(ctx context.Context, query eventstate.FindFailureQuery) ([]eventstate.HandleFailure, error)

FindFailures implements eventstate.StorageBase.

func (*Transaction) FindUnhandled added in v0.0.20

func (s *Transaction) FindUnhandled(ctx context.Context, query eventstate.FindUnhandledQuery) ([]eventstate.Unhandled, error)

FindUnhandled implements eventstate.StorageBase interface. Finds all unhandled event state matching given query.

func (*Transaction) FinishHandling added in v0.0.20

func (s *Transaction) FinishHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error

FinishHandling implements eventstate.StorageBase.

func (*Transaction) GetSnapshot

func (s *Transaction) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*es.Snapshot, error)

GetSnapshot gets the latest snapshot for given aggregate. Implements eventsource.Storage interface.

func (*Transaction) HandlingFailed added in v0.0.20

func (s *Transaction) HandlingFailed(ctx context.Context, failure *eventstate.HandleFailure) error

HandlingFailed implements eventstate.StorageBase.

func (*Transaction) ListEvents

func (s *Transaction) ListEvents(ctx context.Context, aggId, aggType string) ([]*es.Event, error)

ListEvents gets the event stream for provided aggregate. Implements eventsource.Storage interface.

func (*Transaction) ListEventsAfterRevision

func (s *Transaction) ListEventsAfterRevision(ctx context.Context, aggId string, aggType string, after int64) ([]*es.Event, error)

ListEventsAfterRevision gets the event stream for given aggregate where the revision is subsequent from provided.

func (*Transaction) ListHandlers added in v0.0.20

func (s *Transaction) ListHandlers(ctx context.Context) ([]eventstate.Handler, error)

ListHandlers implements eventstate.StorageBase.

func (*Transaction) MarkUnhandled added in v0.0.20

func (s *Transaction) MarkUnhandled(ctx context.Context, eventID, eventType string, timestamp int64) error

MarkUnhandled implements eventstate.StorageBase.

func (*Transaction) RegisterHandlers added in v0.0.20

func (s *Transaction) RegisterHandlers(ctx context.Context, eventHandlers ...eventstate.Handler) error

RegisterHandlers implements eventstate.StorageBase.

func (*Transaction) Rollback

func (t *Transaction) Rollback(_ context.Context) error

Rollback the transaction.

func (*Transaction) SaveEvents

func (s *Transaction) SaveEvents(ctx context.Context, es []*es.Event) error

SaveEvents stores provided events in the database. Implements eventsource.Storage interface.

func (*Transaction) SaveSnapshot

func (s *Transaction) SaveSnapshot(ctx context.Context, snap *es.Snapshot) error

SaveSnapshot stores the snapshot in the database. Implements eventsource.Storage interface.

func (*Transaction) StartHandling added in v0.0.20

func (s *Transaction) StartHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error

StartHandling implements eventstate.StorageBase.

func (*Transaction) StreamEvents

func (s *Transaction) StreamEvents(ctx context.Context, req *es.StreamEventsRequest) (<-chan *es.Event, error)

StreamEvents opens the channel of the events stream that matches given request. Implements eventsource.Storage.

Directories

Path Synopsis
esxsql_test module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL