state

package
v0.0.0-...-0a06464 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2023 License: Apache-2.0 Imports: 8 Imported by: 0

README

State Stores

State Stores provide a common way to interact with different data store implementations, and allow users to opt-in to advanced capabilities using defined metadata.

Implementing a new State Store

A compliant state store needs to implement one or more interfaces: Store and TransactionalStore, defined in the store.go file.

See the documentation site for examples.

Implementing State Query API

State Store has an optional API for querying the state.

Please refer to the documentation site for API description and definition.

// Querier is an interface to execute queries.
type Querier interface {
        Query(req *QueryRequest) (*QueryResponse, error)
}

Below are the definitions of structures (including nested) for QueryRequest and QueryResponse.

// QueryResponse is the request object for querying the state.
type QueryRequest struct {
        Query    query.Query       `json:"query"`
        Metadata map[string]string `json:"metadata,omitempty"`
}

type Query struct {
        Filters map[string]interface{} `json:"filter"`
        Sort    []Sorting              `json:"sort"`
        Page    Pagination             `json:"page"`

        // derived from Filters
        Filter Filter
}

type Sorting struct {
        Key   string `json:"key"`
        Order string `json:"order,omitempty"`
}

type Pagination struct {
        Limit int    `json:"limit"`
        Token string `json:"token,omitempty"`
}

// QueryResponse is the response object on querying state.
type QueryResponse struct {
        Results  []QueryItem       `json:"results"`
        Token    string            `json:"token,omitempty"`
        Metadata map[string]string `json:"metadata,omitempty"`
}

// QueryItem is an object representing a single entry in query results.
type QueryItem struct {
        Key   string  `json:"key"`
        Data  []byte  `json:"data"`
        ETag  *string `json:"etag,omitempty"`
        Error string  `json:"error,omitempty"`
}

Upon receiving the query request, Dapr validates it and transforms into object Query, which, in turn, is passed on to the state store component.

The Query object has a member Filter that implements parsing interface per component as described below.

type Filter interface {
	Parse(interface{}) error
}

type FilterEQ struct {
	Key string
	Val interface{}
}

type FilterIN struct {
	Key  string
	Vals []interface{}
}

type FilterAND struct {
	Filters []Filter
}

type FilterOR struct {
	Filters []Filter
}

To simplify the process of query translation, we leveraged visitor design pattern. A state store component developer would need to implement the visit method, and the runtime will use it to construct the native query statement.

type Visitor interface {
	// returns "equal" expression
	VisitEQ(*FilterEQ) (string, error)
	// returns "in" expression
	VisitIN(*FilterIN) (string, error)
	// returns "and" expression
	VisitAND(*FilterAND) (string, error)
	// returns "or" expression
	VisitOR(*FilterOR) (string, error)
	// receives concatenated filters and finalizes the native query
	Finalize(string, *MidQuery) error
}

The Dapr runtime implements QueryBuilder object that takes in Visitor interface and constructs the native query.

type QueryBuilder struct {
	visitor Visitor
}

func (h *QueryBuilder) BuildQuery(mq *MidQuery) error {...}

The last part is to implement Querier interface in the component:

type Querier interface {
	Query(req *QueryRequest) (*QueryResponse, error)
}

A sample implementation might look like that:

func (m *MyComponent) Query(req *state.QueryRequest) (*state.QueryResponse, error) {
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()

	query := &Query{} // Query implements Visitor interface
	qbuilder := state.NewQueryBuilder(query)
	if err := qbuilder.BuildQuery(&req.Query); err != nil {
		return &state.QueryResponse{}, err
	}
	data, token, err := query.execute(ctx)
	if err != nil {
		return &state.QueryResponse{}, err
	}
	return &state.QueryResponse{
		Results:  data,
		Token:    token,
	}, nil
}

Some of the examples of State Query API implementation are MongoDB and CosmosDB state store components.

Documentation

Index

Constants

View Source
const (
	FirstWrite = "first-write"
	LastWrite  = "last-write"
	Strong     = "strong"
	Eventual   = "eventual"
)
View Source
const (
	// GetRespMetaKeyTTLExpireTime is the key for the metadata value of the TTL
	// expire time. Value is a RFC3339 formatted string.
	GetRespMetaKeyTTLExpireTime string = "ttlExpireTime"
)

Variables

This section is empty.

Functions

func CheckRequestOptions

func CheckRequestOptions(options interface{}) error

CheckRequestOptions checks if request options use supported keywords.

func DoBulkSetDelete

func DoBulkSetDelete[T stateRequestConstraint](ctx context.Context, req []T, method func(ctx context.Context, req *T) error, opts BulkStoreOpts) error

DoBulkSetDelete performs BulkSet and BulkDelete.

func Ping

func Ping(ctx context.Context, store Store) error

Types

type BaseStore

type BaseStore interface {
	Init(ctx context.Context, metadata Metadata) error
	Features() []Feature
	Delete(ctx context.Context, req *DeleteRequest) error
	Get(ctx context.Context, req *GetRequest) (*GetResponse, error)
	Set(ctx context.Context, req *SetRequest) error
	GetComponentMetadata() map[string]string
}

BaseStore is an interface that contains the base methods for each state store.

type BulkDeleteRowMismatchError

type BulkDeleteRowMismatchError struct {
	// contains filtered or unexported fields
}

BulkDeleteRowMismatchError represents mismatch in rowcount while deleting rows.

func NewBulkDeleteRowMismatchError

func NewBulkDeleteRowMismatchError(expected, affected uint64) *BulkDeleteRowMismatchError

BulkDeleteRowMismatchError returns a BulkDeleteRowMismatchError.

func (*BulkDeleteRowMismatchError) Error

type BulkGetOpts

type BulkGetOpts struct {
	// Number of requests made in parallel while retrieving values in bulk.
	// When set to <= 0 (the default value), will fetch all requested values in bulk without limit.
	// Note that if the component implements a native BulkGet method, this value may be ignored.
	Parallelism int
}

BulkGetOpts contains options for the BulkGet method.

type BulkGetResponse

type BulkGetResponse struct {
	Key         string            `json:"key"`
	Data        []byte            `json:"data"`
	ETag        *string           `json:"etag,omitempty"`
	Metadata    map[string]string `json:"metadata"`
	Error       string            `json:"error,omitempty"`
	ContentType *string           `json:"contentType,omitempty"`
}

BulkGetResponse is the response object for bulk get response.

func DoBulkGet

func DoBulkGet(ctx context.Context, req []GetRequest, opts BulkGetOpts, getFn func(ctx context.Context, req *GetRequest) (*GetResponse, error)) ([]BulkGetResponse, error)

DoBulkGet performs BulkGet.

type BulkStore

type BulkStore interface {
	BulkGet(ctx context.Context, req []GetRequest, opts BulkGetOpts) ([]BulkGetResponse, error)
	BulkSet(ctx context.Context, req []SetRequest, opts BulkStoreOpts) error
	BulkDelete(ctx context.Context, req []DeleteRequest, opts BulkStoreOpts) error
}

BulkStore is an interface to perform bulk operations on store.

func NewDefaultBulkStore

func NewDefaultBulkStore(base BaseStore) BulkStore

NewDefaultBulkStore build a default bulk store.

type BulkStoreError

type BulkStoreError struct {
	// contains filtered or unexported fields
}

BulkStoreError is an error object that contains details on the operations that failed.

func NewBulkStoreError

func NewBulkStoreError(key string, err error) BulkStoreError

func (BulkStoreError) ETagError

func (e BulkStoreError) ETagError() *ETagError

ETagError returns an *ETagError if the wrapped error is of that kind; otherwise, returns nil

func (BulkStoreError) Error

func (e BulkStoreError) Error() string

Error returns the error message. It implements the error interface.

func (BulkStoreError) Key

func (e BulkStoreError) Key() string

Key returns the key of the operation that failed.

func (BulkStoreError) Unwrap

func (e BulkStoreError) Unwrap() error

Unwrap returns the wrapped error. It implements the error wrapping interface.

type BulkStoreOpts

type BulkStoreOpts struct {
	// Number of requests made in parallel while storing/deleting values in bulk.
	// When set to <= 0 (the default value), will perform all operations in parallel without limit.
	Parallelism int
}

BulkStoreOpts contains options for the BulkSet and BulkDelete methods.

type DefaultBulkStore

type DefaultBulkStore struct {
	// contains filtered or unexported fields
}

DefaultBulkStore is a default implementation of BulkStore.

func (*DefaultBulkStore) BulkDelete

func (b *DefaultBulkStore) BulkDelete(ctx context.Context, req []DeleteRequest, opts BulkStoreOpts) error

BulkDelete performs a bulk delete operation.

func (*DefaultBulkStore) BulkGet

func (b *DefaultBulkStore) BulkGet(ctx context.Context, req []GetRequest, opts BulkGetOpts) ([]BulkGetResponse, error)

BulkGet performs a Get operation in bulk.

func (*DefaultBulkStore) BulkSet

func (b *DefaultBulkStore) BulkSet(ctx context.Context, req []SetRequest, opts BulkStoreOpts) error

BulkSet performs a bulk save operation.

type DeleteRequest

type DeleteRequest struct {
	Key      string            `json:"key"`
	ETag     *string           `json:"etag,omitempty"`
	Metadata map[string]string `json:"metadata"`
	Options  DeleteStateOption `json:"options,omitempty"`
}

DeleteRequest is the object describing a delete state request.

func (DeleteRequest) GetKey

func (r DeleteRequest) GetKey() string

Key gets the Key on a DeleteRequest.

func (DeleteRequest) GetMetadata

func (r DeleteRequest) GetMetadata() map[string]string

Metadata gets the Metadata on a DeleteRequest.

func (DeleteRequest) HasETag

func (r DeleteRequest) HasETag() bool

HasETag returns true if the request has a non-empty ETag.

func (DeleteRequest) Operation

func (r DeleteRequest) Operation() OperationType

Operation returns the operation type for DeleteRequest, implementing TransactionalStateOperationRequest.

type DeleteStateOption

type DeleteStateOption struct {
	Concurrency string `json:"concurrency,omitempty"` // "concurrency"
	Consistency string `json:"consistency"`           // "eventual, strong"
}

DeleteStateOption controls how a state store reacts to a delete request.

type ETagError

type ETagError struct {
	// contains filtered or unexported fields
}

ETagError is a custom error type for etag exceptions.

func NewETagError

func NewETagError(kind ETagErrorKind, err error) *ETagError

NewETagError returns an ETagError wrapping an existing context error.

func (*ETagError) Error

func (e *ETagError) Error() string

func (*ETagError) Kind

func (e *ETagError) Kind() ETagErrorKind

func (*ETagError) Unwrap

func (e *ETagError) Unwrap() error

type ETagErrorKind

type ETagErrorKind string
const (
	ETagInvalid  ETagErrorKind = "invalid"
	ETagMismatch ETagErrorKind = "mismatch"
)

type Feature

type Feature string

Feature names a feature that can be implemented by PubSub components.

const (
	// FeatureETag is the feature to etag metadata in state store.
	FeatureETag Feature = "ETAG"
	// FeatureTransactional is the feature that performs transactional operations.
	FeatureTransactional Feature = "TRANSACTIONAL"
	// FeatureQueryAPI is the feature that performs query operations.
	FeatureQueryAPI Feature = "QUERY_API"
)

func (Feature) IsPresent

func (f Feature) IsPresent(features []Feature) bool

IsPresent checks if a given feature is present in the list.

type GetRequest

type GetRequest struct {
	Key      string            `json:"key"`
	Metadata map[string]string `json:"metadata"`
	Options  GetStateOption    `json:"options,omitempty"`
}

GetRequest is the object describing a state fetch request.

func (GetRequest) GetKey

func (r GetRequest) GetKey() string

Key gets the Key on a GetRequest.

func (GetRequest) GetMetadata

func (r GetRequest) GetMetadata() map[string]string

Metadata gets the Metadata on a GetRequest.

type GetResponse

type GetResponse struct {
	Data        []byte            `json:"data"`
	ETag        *string           `json:"etag,omitempty"`
	Metadata    map[string]string `json:"metadata"`
	ContentType *string           `json:"contentType,omitempty"`
}

GetResponse is the response object for getting state.

type GetStateOption

type GetStateOption struct {
	Consistency string `json:"consistency"` // "eventual, strong"
}

GetStateOption controls how a state store reacts to a get request.

type Metadata

type Metadata struct {
	metadata.Base `json:",inline"`
}

Metadata contains a state store specific set of metadata properties.

type OperationType

type OperationType string

OperationType describes a CRUD operation performed against a state store.

const (
	// OperationUpsert is an update or create transactional operation.
	OperationUpsert OperationType = "upsert"
	// OperationDelete is a delete transactional operation.
	OperationDelete OperationType = "delete"
)

type Querier

type Querier interface {
	Query(ctx context.Context, req *QueryRequest) (*QueryResponse, error)
}

Querier is an interface to execute queries.

type QueryItem

type QueryItem struct {
	Key         string  `json:"key"`
	Data        []byte  `json:"data"`
	ETag        *string `json:"etag,omitempty"`
	Error       string  `json:"error,omitempty"`
	ContentType *string `json:"contentType,omitempty"`
}

QueryItem is an object representing a single entry in query results.

type QueryRequest

type QueryRequest struct {
	Query    query.Query       `json:"query"`
	Metadata map[string]string `json:"metadata,omitempty"`
}

type QueryResponse

type QueryResponse struct {
	Results  []QueryItem       `json:"results"`
	Token    string            `json:"token,omitempty"`
	Metadata map[string]string `json:"metadata,omitempty"`
}

QueryResponse is the response object for querying state.

type SetRequest

type SetRequest struct {
	Key         string            `json:"key"`
	Value       any               `json:"value"`
	ETag        *string           `json:"etag,omitempty"`
	Metadata    map[string]string `json:"metadata,omitempty"`
	Options     SetStateOption    `json:"options,omitempty"`
	ContentType *string           `json:"contentType,omitempty"`
}

SetRequest is the object describing an upsert request.

func (SetRequest) GetKey

func (r SetRequest) GetKey() string

GetKey gets the Key on a SetRequest.

func (SetRequest) GetMetadata

func (r SetRequest) GetMetadata() map[string]string

GetMetadata gets the Key on a SetRequest.

func (SetRequest) HasETag

func (r SetRequest) HasETag() bool

HasETag returns true if the request has a non-empty ETag.

func (SetRequest) Operation

func (r SetRequest) Operation() OperationType

Operation returns the operation type for SetRequest, implementing TransactionalStateOperationRequest.

type SetStateOption

type SetStateOption struct {
	Concurrency string // first-write, last-write
	Consistency string // "eventual, strong"
}

SetStateOption controls how a state store reacts to a set request.

type StateRequest

type StateRequest interface {
	GetKey() string
	GetMetadata() map[string]string
}

StateRequest is an interface that allows gets of the Key and Metadata inside requests.

type Store

type Store interface {
	BaseStore
	BulkStore
}

Store is an interface to perform operations on store.

type TransactionalStateOperation

type TransactionalStateOperation interface {
	StateRequest

	Operation() OperationType
}

TransactionalStateOperation is an interface for all requests that can be part of a transaction.

type TransactionalStateRequest

type TransactionalStateRequest struct {
	Operations []TransactionalStateOperation
	Metadata   map[string]string
}

TransactionalStateRequest describes a transactional operation against a state store that comprises multiple types of operations The Request field is either a DeleteRequest or SetRequest.

type TransactionalStore

type TransactionalStore interface {
	Multi(ctx context.Context, request *TransactionalStateRequest) error
}

TransactionalStore is an interface for initialization and support multiple transactional requests.

Directories

Path Synopsis
alicloud
aws
azure
cloudflare
gcp
hashicorp
Package mongodb is an implementation of StateStore interface to perform operations on store
Package mongodb is an implementation of StateStore interface to perform operations on store
oci
Package zookeeper is a generated GoMock package.
Package zookeeper is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL