dynamodb

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2024 License: MIT Imports: 18 Imported by: 3

Documentation

Index

Constants

View Source
const (
	VerAttribute  string = "ver"
	GVerAttribute string = "gver"
	GIDAttribute  string = "gid"
)
View Source
const (
	HashKey            string = "_pk"
	RangeKey           string = "_sk"
	TTLAttribute       string = "_ttl"
	LocalIndexRangeKey string = "_lsik"
	LocalIndex         string = "_lsi"
	TraceIDAttribute   string = "_traceID"
)

Variables

View Source
var (
	ErrTxAlreadyStarted  = errors.New("transaction already started")
	ErrTxInvalidItemType = errors.New("invalid transaction item type")
)
View Source
var (
	ErrIndexingRecordFailed = errors.New("indexing record has failed")
)

Functions

func ContextWithSession

func ContextWithSession(ctx context.Context, s *Session) context.Context

func CreateTable

func CreateTable(ctx context.Context, svc AdminAPI, table string) error

func DeleteTable

func DeleteTable(ctx context.Context, svc AdminAPI, table string) error

func IsConditionCheckFailure

func IsConditionCheckFailure(err error) bool

IsConditionCheckFailure checks if the given error is an aws error that expresses a conditional failure exception. It works seamlessly in both single write and within a transaction operation.

func IsConditionCheckFailureWithItem added in v0.0.2

func IsConditionCheckFailureWithItem(err error, hashKey, rangeKey string) (bool, bool)

func NewIndexer

func NewIndexer(api ClientAPI, table string, opts ...func(cfg *IndexerConfig)) *indexer

NewIndexer returns a dynamodb global stream indexer. It keeps track of the global stream current version and increments sequence accordingly by using a Dynamodb LSI

It has an in-memory cache that works effectively only if the instance keeps receiving records from the same global stream. Otherwise the cache is cleared and synced with the new global stream's current state.

The use of a such cache is only possible because Dynamodb Lambda event source mapper allows it:

  • It does not concurrently distribute a partition-related record to different Lambda instances.
  • The partition key and global stream have a one-to-one relation.

The later one-to-one point might change in the future to overcome the 10GB size limit. This may require a draining logic to allow a safer transition to the next partition.

func UnpackRecord added in v0.0.2

func UnpackRecord(ctx context.Context, r Record, serializer event.Serializer) ([]event.Envelope, error)

UnpackRecord does unmarshal events contained in the record and set their respective global stream sequence. It fails if the record is not indexed, and it panics if an event envelope does not implement event.GlobalVersionSetter interface.

Types

type AdminAPI

type AdminAPI interface {
	ClientAPI

	dynamodb.DescribeTableAPIClient

	CreateTable(ctx context.Context, params *dynamodb.CreateTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.CreateTableOutput, error)
	DeleteTable(ctx context.Context, params *dynamodb.DeleteTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteTableOutput, error)
	UpdateTimeToLive(ctx context.Context, params *dynamodb.UpdateTimeToLiveInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateTimeToLiveOutput, error)
}

AdminAPI presents a sub part of Dynamodb Client Operations especially the ones related table.

type ClientAPI

type ClientAPI interface {
	dynamodb.QueryAPIClient

	GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error)
	PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error)
	UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error)
	DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error)
	TransactWriteItems(ctx context.Context, params *dynamodb.TransactWriteItemsInput, optFns ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error)
}

ClientAPI presents a sub part of Dynamodb Client Operations especially the ones related to table items.

func NewClient

func NewClient(cfg aws.Config) ClientAPI

type ContextKey

type ContextKey string

type Indexer

type Indexer interface {
	Index(ctx context.Context, rec Record) (err error)
}

type IndexerConfig

type IndexerConfig struct{}

type Item

type Item struct {
	HashKey     string `dynamodbav:"_pk"`
	RangeKey    string `dynamodbav:"_sk"`
	LSIRangeKey string `dynamodbav:"_lsik,omitempty" localIndex:"_lsi,range"`
	TTL         int64  `dynamodbav:"_ttl,omitempty"`
	TraceID     string `dynamodbav:"_traceID,omitempty"`
}

type Record

type Record struct {
	Item
	Events []byte `dynamodbav:"evts"`

	Since    int64  `dynamodbav:"since"`
	Until    int64  `dynamodbav:"until"`
	Version  string `dynamodbav:"ver,omitempty"`
	GID      string `dynamodbav:"gid"`
	GVersion string `dynamodbav:"gver,omitempty"`
}

func (Record) Keys added in v0.0.2

func (rec Record) Keys() map[string]string

type Session

type Session struct {
	// contains filtered or unexported fields
}

func NewSession

func NewSession(api ClientAPI) *Session

func SessionFrom

func SessionFrom(ctx context.Context) (*Session, bool)

func (*Session) AddToTx added in v0.0.2

func (s *Session) AddToTx(ctx context.Context, ops []any) error

func (*Session) Check

func (s *Session) Check(ctx context.Context, c *types.ConditionCheck) error

func (*Session) CloseTx

func (s *Session) CloseTx() error

func (*Session) CommitTx

func (s *Session) CommitTx(ctx context.Context) (err error)

func (*Session) ConsumedCapacity added in v0.0.2

func (s *Session) ConsumedCapacity() *consumedCapacity

ConsumedCapacity implements Session.

func (*Session) Delete

func (s *Session) Delete(ctx context.Context, d *dynamodb.DeleteItemInput) error

func (*Session) HasTx

func (s *Session) HasTx() bool

func (*Session) Put

func (*Session) StartTx

func (s *Session) StartTx() error

func (*Session) Update

func (s *Session) Update(ctx context.Context, u *dynamodb.UpdateItemInput) error

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store implements event.Store, sourcing.Store, and event.Streamer interfaces

func NewEventStore

func NewEventStore(api ClientAPI, table string, opts ...func(*StoreConfig)) *Store

NewEventStore returns an implementation of event store interfaces. It panics if ClientAPI or table are empty.

func (Store) Append

func (s Store) Append(ctx context.Context, id event.StreamID, events []event.Envelope, opts ...func(*event.AppendConfig)) (err error)

AppendToStream implements event.Store interface

func (Store) AppendToStream

func (s Store) AppendToStream(ctx context.Context, stm sourcing.Stream, optFns ...func(opt *event.AppendConfig)) (err error)

AppendToStream implements sourcing.Store interface

func (Store) Load

func (s Store) Load(ctx context.Context, id event.StreamID, trange ...time.Time) (events []event.Envelope, err error)

AppendToStream implements event.Store interface

func (Store) LoadStream

func (s Store) LoadStream(ctx context.Context, id event.StreamID, vrange ...event.Version) (stm *sourcing.Stream, err error)

AppendToStream implements sourcing.Store interface

func (Store) Replay

func (s Store) Replay(ctx context.Context, id event.StreamID, q event.StreamerQuery, h event.StreamerHandler) (err error)

Replay replays event from the global stream according to the query filters.

Note that the global stream is eventually consistent.

type StoreConfig

type StoreConfig struct {
	// Serializer presents the event serializer. By default, the store uses the JSON event serializer.
	Serializer event.Serializer
	// AppendEventOptions presents the default pre-append options applied in every events' Append call.
	AppendEventOptions []func(*event.AppendConfig)
	// RecordSizeLimit describes the size limit in bytes of events' appended to the stream.
	// It's ignored if empty.
	RecordSizeLimit int
}

StoreConfig presents the config of dynamodb-based event store implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL