state

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2025 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoSuchKey = fmt.Errorf("key not found")

ErrNoSuchKey is an error indicating that the key was not found in any state handlers.

Functions

func CompareAndDeleteEffect added in v0.0.2

func CompareAndDeleteEffect[K comparable, V ComparableEquatable](
	ctx context.Context,
	key K,
	old V,
) (deleted bool, err error)

CompareAndDeleteEffect compares the current value of the specified key with the provided old value. If they match, it deletes the key from the state store and returns true. If they do not match, it returns false and an error if the operation fails. It delegates to the upper handler if the delegation flag is set on the state handler.

func CompareAndSwapEffect added in v0.0.2

func CompareAndSwapEffect[K comparable, V ComparableEquatable](
	ctx context.Context,
	key K,
	old, new V,
) (swapped bool, err error)

CompareAndSwapEffect compares the current value of the specified key with the provided old value. If they match, it updates the key with the new value and returns true. If they do not match, it returns false and an error if the operation fails. It delegates to the upper handler if the delegation flag is set on the state handler.

func Equals

func Equals(i, j ComparableEquatable) bool

func EventSourcingEffect added in v0.0.2

func EventSourcingEffect(
	ctx context.Context,
	sink chan<- TimeBoundedPayload,
	dropped chan<- TimeBoundedPayload,
)

EventSourcingEffect subscribes to the effect source and sends the received payloads to the provided sink and dropped channels. It logs an error if the effect source is not found. WARNING: Stream effect handler must be created before calling this function.

func InsertIfAbsentEffect added in v0.0.2

func InsertIfAbsentEffect[K comparable, V ComparableEquatable](
	ctx context.Context,
	key K,
	new V,
) (inserted bool, err error)

InsertIfAbsentEffect inserts a new value into the state store if the key is not already present. It returns true if the value was inserted, false if it already exists, and an error if the operation fails. It delegates to the upper handler if the delegation flag is set on the state handler.

func InsertIfAbsentWithTTLEffect added in v0.0.2

func InsertIfAbsentWithTTLEffect[K comparable, V ComparableEquatable](
	ctx context.Context,
	key K,
	new V,
	ttl time.Duration,
) (inserted bool, err error)

InsertIfAbsentWithTTLEffect inserts a new value into the state store with a TTL if the key is not already present. It returns true if the value was inserted, false if it already exists, and an error if the operation fails. It sets a timer to delete the key after the specified TTL duration. It delegates to the upper handler if the delegation flag is set on the state handler without setting the timer.

func LoadEffect added in v0.0.2

func LoadEffect[K comparable, V ComparableEquatable](ctx context.Context, key K) (val V, err error)

LoadEffect loads a value from the state store using the provided key. It only checks the local state store and does not delegate to the upper handler. It returns an error if the key is not found.

func LoadEffects added in v0.0.2

func LoadEffects[V ComparableEquatable](ctx context.Context, prefix string) (val map[string]V, err error)

LoadEffects loads multiple values from the state store using the provided prefix. It returns a map of key-value pairs found in the state store. It does not delegate to the upper handler and returns an error if the prefix is not found. Note: This function is not implemented yet.

func LoadOverlaidEffect added in v0.0.2

func LoadOverlaidEffect[K comparable, V ComparableEquatable](ctx context.Context, key K) (val V, err error)

LoadOverlaidEffect loads a value from the state store using the provided key. It first checks the local state store and then delegates to the upper handler if not found. If the value is found in the upper handler, it is inserted into the local state store. It returns an error if the key is not found in both stores.

func MustHaveEffectHandler added in v0.0.2

func MustHaveEffectHandler(ctx context.Context) string

MustHaveEffectHandler ensures that the state effect handler is installed. It panics if the effect handler is not installed.

func ResetTTLEffect added in v0.0.2

func ResetTTLEffect[K comparable](
	ctx context.Context,
	key K,
	ttl time.Duration,
) (reset bool, err error)

ResetTTLEffect resets the TTL of the specified key in the state store. It returns true if the TTL was reset, false if the key was not found, and an error if the operation fails. It sets a new timer to delete the key after the specified TTL duration.

func WithEffectHandler

func WithEffectHandler[K comparable, V ComparableEquatable](
	ctx context.Context,
	bufferSize, numWorkers int,
	delegation bool,
	stateStore StateStore,
	initMap map[K]V,
) (context.Context, func() context.Context)

WithEffectHandler registers a resumable, partitionable effect handler for managing key-value state. It stores the internal state in a memory-safe sync.Map and supports sharded processing. The handler is resumable and partitionable, meaning it can be resumed after a failure and can handle multiple partitions concurrently. The handler is registered in the context and can be used to perform state operations. The handler is closed when the context is canceled or when the teardown function is called. The teardown function should be called when the effect handler is no longer needed. If the teardown function is called early, the effect handler will be closed. The context returned by the teardown function should be used for further operations.

Types

type CasStore

type CasStore[K comparable] interface {
	Load(key K) (value any, ok bool, err error)
	InsertIfAbsent(key K, value any) (inserted bool, err error)
	CompareAndSwap(key K, old, new any) (swapped bool, err error)
	CompareAndDelete(key K, old any) (deleted bool, err error)
}

type ComparableEquatable

type ComparableEquatable interface{}

type CompareAndDelete

type CompareAndDelete[K comparable, V ComparableEquatable] struct {
	Key K // should be comparable
	Old V // should be comparable
}

CompareAndDelete is the payload type for deleting a key from the state.

func (CompareAndDelete[K, V]) PartitionKey

func (p CompareAndDelete[K, V]) PartitionKey() string

type Equatable

type Equatable interface {
	Equals(j any) bool
}

type InsertIfAbsent

type InsertIfAbsent[K comparable, V ComparableEquatable] struct {
	Key K // should be comparable
	New V // should be comparable
}

InsertIfAbsent is the payload type for deleting a key from the state.

func (InsertIfAbsent[K, V]) PartitionKey

func (p InsertIfAbsent[K, V]) PartitionKey() string

type Payload

type Payload interface {
	PartitionKey() string
	// contains filtered or unexported methods
}

Payload is a sealed interface for state operations. Only predefined payload types (Set, Get, Delete) can implement this interface.

type SetStore

type SetStore[K comparable] interface {
	Get(key K) (value any, ok bool, err error)
	Set(key K, value any) error
	Delete(key K) error
}

type Source

type Source struct{}

Source is a special payload type for the state effect handler.

func (Source) PartitionKey

func (Source) PartitionKey() string

type StateStore

type StateStore interface {
	// contains filtered or unexported methods
}

func NewCasStore

func NewCasStore[K comparable](store CasStore[K]) StateStore

func NewInMemoryStore

func NewInMemoryStore[K comparable]() StateStore

NewInMemoryStore creates a new in-memory state store. It uses a sync.Map to provide concurrent access to the store.

func NewSetStore

func NewSetStore[K comparable](store SetStore[K]) StateStore

type TimeBoundedPayload

type TimeBoundedPayload struct {
	Payload
	shared.TimeSpan
}

TimeBoundedPayload is a wrapper for StatePayload with a time span.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL