Documentation
¶
Index ¶
- Variables
- func CompareAndDeleteEffect[K comparable, V ComparableEquatable](ctx context.Context, key K, old V) (deleted bool, err error)
- func CompareAndSwapEffect[K comparable, V ComparableEquatable](ctx context.Context, key K, old, new V) (swapped bool, err error)
- func Equals(i, j ComparableEquatable) bool
- func EventSourcingEffect(ctx context.Context, sink chan<- TimeBoundedPayload, ...)
- func InsertIfAbsentEffect[K comparable, V ComparableEquatable](ctx context.Context, key K, new V) (inserted bool, err error)
- func InsertIfAbsentWithTTLEffect[K comparable, V ComparableEquatable](ctx context.Context, key K, new V, ttl time.Duration) (inserted bool, err error)
- func LoadEffect[K comparable, V ComparableEquatable](ctx context.Context, key K) (val V, err error)
- func LoadEffects[V ComparableEquatable](ctx context.Context, prefix string) (val map[string]V, err error)
- func LoadOverlaidEffect[K comparable, V ComparableEquatable](ctx context.Context, key K) (val V, err error)
- func MustHaveEffectHandler(ctx context.Context) string
- func ResetTTLEffect[K comparable](ctx context.Context, key K, ttl time.Duration) (reset bool, err error)
- func WithEffectHandler[K comparable, V ComparableEquatable](ctx context.Context, bufferSize, numWorkers int, delegation bool, ...) (context.Context, func() context.Context)
- type CasStore
- type ComparableEquatable
- type CompareAndDelete
- type Equatable
- type InsertIfAbsent
- type Payload
- type SetStore
- type Source
- type StateStore
- type TimeBoundedPayload
Constants ¶
This section is empty.
Variables ¶
var ErrNoSuchKey = fmt.Errorf("key not found")
ErrNoSuchKey is an error indicating that the key was not found in any state handlers.
Functions ¶
func CompareAndDeleteEffect ¶ added in v0.0.2
func CompareAndDeleteEffect[K comparable, V ComparableEquatable]( ctx context.Context, key K, old V, ) (deleted bool, err error)
CompareAndDeleteEffect compares the current value of the specified key with the provided old value. If they match, it deletes the key from the state store and returns true. If they do not match, it returns false and an error if the operation fails. It delegates to the upper handler if the delegation flag is set on the state handler.
func CompareAndSwapEffect ¶ added in v0.0.2
func CompareAndSwapEffect[K comparable, V ComparableEquatable]( ctx context.Context, key K, old, new V, ) (swapped bool, err error)
CompareAndSwapEffect compares the current value of the specified key with the provided old value. If they match, it updates the key with the new value and returns true. If they do not match, it returns false and an error if the operation fails. It delegates to the upper handler if the delegation flag is set on the state handler.
func Equals ¶
func Equals(i, j ComparableEquatable) bool
func EventSourcingEffect ¶ added in v0.0.2
func EventSourcingEffect( ctx context.Context, sink chan<- TimeBoundedPayload, dropped chan<- TimeBoundedPayload, )
EventSourcingEffect subscribes to the effect source and sends the received payloads to the provided sink and dropped channels. It logs an error if the effect source is not found. WARNING: Stream effect handler must be created before calling this function.
func InsertIfAbsentEffect ¶ added in v0.0.2
func InsertIfAbsentEffect[K comparable, V ComparableEquatable]( ctx context.Context, key K, new V, ) (inserted bool, err error)
InsertIfAbsentEffect inserts a new value into the state store if the key is not already present. It returns true if the value was inserted, false if it already exists, and an error if the operation fails. It delegates to the upper handler if the delegation flag is set on the state handler.
func InsertIfAbsentWithTTLEffect ¶ added in v0.0.2
func InsertIfAbsentWithTTLEffect[K comparable, V ComparableEquatable]( ctx context.Context, key K, new V, ttl time.Duration, ) (inserted bool, err error)
InsertIfAbsentWithTTLEffect inserts a new value into the state store with a TTL if the key is not already present. It returns true if the value was inserted, false if it already exists, and an error if the operation fails. It sets a timer to delete the key after the specified TTL duration. It delegates to the upper handler if the delegation flag is set on the state handler without setting the timer.
func LoadEffect ¶ added in v0.0.2
func LoadEffect[K comparable, V ComparableEquatable](ctx context.Context, key K) (val V, err error)
LoadEffect loads a value from the state store using the provided key. It only checks the local state store and does not delegate to the upper handler. It returns an error if the key is not found.
func LoadEffects ¶ added in v0.0.2
func LoadEffects[V ComparableEquatable](ctx context.Context, prefix string) (val map[string]V, err error)
LoadEffects loads multiple values from the state store using the provided prefix. It returns a map of key-value pairs found in the state store. It does not delegate to the upper handler and returns an error if the prefix is not found. Note: This function is not implemented yet.
func LoadOverlaidEffect ¶ added in v0.0.2
func LoadOverlaidEffect[K comparable, V ComparableEquatable](ctx context.Context, key K) (val V, err error)
LoadOverlaidEffect loads a value from the state store using the provided key. It first checks the local state store and then delegates to the upper handler if not found. If the value is found in the upper handler, it is inserted into the local state store. It returns an error if the key is not found in both stores.
func MustHaveEffectHandler ¶ added in v0.0.2
MustHaveEffectHandler ensures that the state effect handler is installed. It panics if the effect handler is not installed.
func ResetTTLEffect ¶ added in v0.0.2
func ResetTTLEffect[K comparable]( ctx context.Context, key K, ttl time.Duration, ) (reset bool, err error)
ResetTTLEffect resets the TTL of the specified key in the state store. It returns true if the TTL was reset, false if the key was not found, and an error if the operation fails. It sets a new timer to delete the key after the specified TTL duration.
func WithEffectHandler ¶
func WithEffectHandler[K comparable, V ComparableEquatable]( ctx context.Context, bufferSize, numWorkers int, delegation bool, stateStore StateStore, initMap map[K]V, ) (context.Context, func() context.Context)
WithEffectHandler registers a resumable, partitionable effect handler for managing key-value state. It stores the internal state in a memory-safe sync.Map and supports sharded processing. The handler is resumable and partitionable, meaning it can be resumed after a failure and can handle multiple partitions concurrently. The handler is registered in the context and can be used to perform state operations. The handler is closed when the context is canceled or when the teardown function is called. The teardown function should be called when the effect handler is no longer needed. If the teardown function is called early, the effect handler will be closed. The context returned by the teardown function should be used for further operations.
Types ¶
type ComparableEquatable ¶
type ComparableEquatable interface{}
type CompareAndDelete ¶
type CompareAndDelete[K comparable, V ComparableEquatable] struct { Key K // should be comparable Old V // should be comparable }
CompareAndDelete is the payload type for deleting a key from the state.
func (CompareAndDelete[K, V]) PartitionKey ¶
func (p CompareAndDelete[K, V]) PartitionKey() string
type InsertIfAbsent ¶
type InsertIfAbsent[K comparable, V ComparableEquatable] struct { Key K // should be comparable New V // should be comparable }
InsertIfAbsent is the payload type for deleting a key from the state.
func (InsertIfAbsent[K, V]) PartitionKey ¶
func (p InsertIfAbsent[K, V]) PartitionKey() string
type Payload ¶
type Payload interface {
PartitionKey() string
// contains filtered or unexported methods
}
Payload is a sealed interface for state operations. Only predefined payload types (Set, Get, Delete) can implement this interface.
type Source ¶
type Source struct{}
Source is a special payload type for the state effect handler.
func (Source) PartitionKey ¶
type StateStore ¶
type StateStore interface {
// contains filtered or unexported methods
}
func NewCasStore ¶
func NewCasStore[K comparable](store CasStore[K]) StateStore
func NewInMemoryStore ¶
func NewInMemoryStore[K comparable]() StateStore
NewInMemoryStore creates a new in-memory state store. It uses a sync.Map to provide concurrent access to the store.
func NewSetStore ¶
func NewSetStore[K comparable](store SetStore[K]) StateStore
type TimeBoundedPayload ¶
TimeBoundedPayload is a wrapper for StatePayload with a time span.