storage

package
v0.12.2-0...-962f32f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 7, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEventChannelClosed = errors.New("event channel closed")
	ErrObjectDeleted      = errors.New("object deleted")
)
View Source
var (
	ErrNotFound      = status.Error(codes.NotFound, "not found")
	ErrAlreadyExists = status.Error(codes.AlreadyExists, "already exists")
	ErrConflict      = lo.Must(status.New(codes.Aborted, "conflict").WithDetails(ErrDetailsConflict)).Err()
)
View Source
var (
	ErrDetailsConflict      = &errdetails.ErrorInfo{Reason: "CONFLICT"}
	ErrDetailsDiscontinuity = &errdetails.ErrorInfo{Reason: "DISCONTINUITY"}
)

Functions

func GetStoreBuilder

func GetStoreBuilder[T ~string](name T) func(...any) (any, error)

func IsAlreadyExists

func IsAlreadyExists(err error) bool

Use this instead of errors.Is(err, ErrAlreadyExists). The implementation of Is() for grpc status errors compares the error message, which can result in false negatives.

func IsConflict

func IsConflict(err error) bool

Use this instead of errors.Is(err, ErrConflict). The status code is too generic to identify conflict errors, so there are additional details added to conflict errors to disambiguate them.

func IsDiscontinuity

func IsDiscontinuity(err error) bool

func IsNotFound

func IsNotFound(err error) bool

Use this instead of errors.Is(err, ErrNotFound). The implementation of Is() for grpc status errors compares the error message, which can result in false negatives.

func NewWatchContext

func NewWatchContext[T any](
	base context.Context,
	eventC <-chan WatchEvent[T],
) context.Context

Returns a context that listens on a watch event channel and closes its Done channel when the object is deleted. This context should have exclusive read access to the event channel to avoid missing events.

func RegisterStoreBuilder

func RegisterStoreBuilder[T ~string](name T, builder func(...any) (any, error))

Types

type AlertFilterOptions

type AlertFilterOptions struct {
	Labels map[string]string
	Range  *corev1.TimeRange
}

type Backend

type Backend interface {
	TokenStore
	ClusterStore
	RoleBindingStore
	KeyringStoreBroker
	KeyValueStoreBroker

	// Close shuts down any connections and releases any resources held
	// by the storage backend client.
	Close()
}

type ClusterMutator

type ClusterMutator = MutatorFunc[*corev1.Cluster]

type ClusterStore

type ClusterStore interface {
	CreateCluster(ctx context.Context, cluster *corev1.Cluster) error
	DeleteCluster(ctx context.Context, ref *corev1.Reference) error
	GetCluster(ctx context.Context, ref *corev1.Reference) (*corev1.Cluster, error)
	UpdateCluster(ctx context.Context, ref *corev1.Reference, mutator ClusterMutator) (*corev1.Cluster, error)
	WatchCluster(ctx context.Context, cluster *corev1.Cluster) (<-chan WatchEvent[*corev1.Cluster], error)
	WatchClusters(ctx context.Context, known []*corev1.Cluster) (<-chan WatchEvent[*corev1.Cluster], error)
	ListClusters(ctx context.Context, matchLabels *corev1.LabelSelector, matchOptions corev1.MatchOptions) (*corev1.ClusterList, error)
}

type CompositeBackend

func (CompositeBackend) Close

func (cb CompositeBackend) Close()

Close implements Backend.

func (*CompositeBackend) IsValid

func (cb *CompositeBackend) IsValid() bool

func (*CompositeBackend) Use

func (cb *CompositeBackend) Use(store any)

type DeleteOpt

type DeleteOpt interface{ ApplyDeleteOption(*DeleteOptions) }

type DeleteOptions

type DeleteOptions struct {
	// Delete only if the latest Revision matches
	Revision *int64
}

func (*DeleteOptions) Apply

func (o *DeleteOptions) Apply(opts ...DeleteOpt)

type GetOpt

type GetOpt interface{ ApplyGetOption(*GetOptions) }

type GetOptions

type GetOptions struct {
	// If set, will return the config at the specified revision instead of
	// the current config.
	Revision *int64

	// If non-nil, will be set to the current revision of the key after the Get
	// operation completes successfully. If an error occurs, no changes
	// will be made to the value.
	RevisionOut *int64
}

func (*GetOptions) Apply

func (o *GetOptions) Apply(opts ...GetOpt)

type GrpcTtlCache

type GrpcTtlCache[T any] interface {
	// getter for default cache's configuration
	MaxAge() time.Duration

	Get(key string) (resp T, ok bool)
	// If 0 is passed as ttl, the default cache's configuration will be used
	Set(key string, resp T, ttl time.Duration)
	Delete(key string)
}

type HistoryOpt

type HistoryOpt interface{ ApplyHistoryOption(*HistoryOptions) }

type HistoryOptions

type HistoryOptions struct {
	// Specifies the latest modification revision to include in the returned
	// history. The history will contain all revisions of the key, starting at
	// the most recent creation revision, and ending at either the specified
	// revision, or the most recent modification revision of the key. If the
	// specified revision is before the latest creation revision, and the
	// key has multiple creation revisions (due to a delete and re-create),
	// then the history will instead start at the most recent creation
	// revision that is <= the specified revision.
	Revision *int64
	// Include the values in the response, not just the metadata. This could
	// have performance implications, so use with caution.
	IncludeValues bool
}

func (*HistoryOptions) Apply

func (o *HistoryOptions) Apply(opts ...HistoryOpt)

type HttpTtlCache

type HttpTtlCache[T any] interface {
	// getter for default cache's configuration
	MaxAge() time.Duration

	Get(key string) (resp T, ok bool)
	// If 0 is passed as ttl, the default cache's configuration will be used
	Set(key string, resp T)
	Delete(key string)
}

type IncludeValuesOpt

type IncludeValuesOpt bool

func IncludeValues

func IncludeValues(include bool) IncludeValuesOpt

IncludeValues can be used for HistoryOptions.

func (IncludeValuesOpt) ApplyHistoryOption

func (i IncludeValuesOpt) ApplyHistoryOption(opts *HistoryOptions)

type KeyRevision

type KeyRevision[T any] interface {
	Key() string
	SetKey(string)

	// If values were requested, returns the value at this revision. Otherwise,
	// returns the zero value for T.
	// Note that if the value has a revision field, it will *not*
	// be populated, and should be set manually if needed using the Revision()
	// method.
	Value() T
	// Returns the revision of this key. Larger values are newer, but the
	// revision number should otherwise be treated as an opaque value.
	Revision() int64
	// Returns the timestamp of this revision. This may or may not always be
	// available, depending on if the underlying store supports it.
	Timestamp() time.Time
}

type KeyRevisionImpl

type KeyRevisionImpl[T any] struct {
	K    string
	V    T
	Rev  int64
	Time time.Time
}

func (*KeyRevisionImpl[T]) Key

func (k *KeyRevisionImpl[T]) Key() string

func (*KeyRevisionImpl[T]) Revision

func (k *KeyRevisionImpl[T]) Revision() int64

func (*KeyRevisionImpl[T]) SetKey

func (k *KeyRevisionImpl[T]) SetKey(key string)

func (*KeyRevisionImpl[T]) Timestamp

func (k *KeyRevisionImpl[T]) Timestamp() time.Time

func (*KeyRevisionImpl[T]) Value

func (k *KeyRevisionImpl[T]) Value() T

type KeyValueStore

type KeyValueStore = KeyValueStoreT[[]byte]

type KeyValueStoreBroker

type KeyValueStoreBroker interface {
	KeyValueStore(namespace string) KeyValueStore
}

type KeyValueStoreT

type KeyValueStoreT[T any] interface {
	Put(ctx context.Context, key string, value T, opts ...PutOpt) error
	Get(ctx context.Context, key string, opts ...GetOpt) (T, error)

	// Starts a watch on the specified key. The returned channel will receive
	// events for the key until the context is canceled, after which the
	// channel will be closed. This function does not block. An error will only
	// be returned if the key is invalid or the watch fails to start.
	//
	// When the watch is started, the current value of the key will be sent
	// if and only if both of the following conditions are met:
	// 1. A revision is explicitly set in the watch options. If no revision is
	//    specified, only future events will be sent. Revision 0 is equivalent
	//    to the oldest revision among all keys matching the prefix, not
	//    including deleted keys.
	// 2. The key exists; or in prefix mode, there is at least one key matching
	//    the prefix.
	//
	// In most cases a starting revision should be specified, as this will
	// ensure no events are missed.
	//
	// This function can be called multiple times for the same key, prefix, or
	// overlapping prefixes. Each call will initiate a separate watch, and events
	// are always replicated to all active watches.
	//
	// The channels are buffered to hold at least 64 events. Ensure that events
	// are read from the channel in a timely manner if a large volume of events
	// are expected; otherwise it will block and events may be delayed, or be
	// dropped by the backend.
	Watch(ctx context.Context, key string, opts ...WatchOpt) (<-chan WatchEvent[KeyRevision[T]], error)
	Delete(ctx context.Context, key string, opts ...DeleteOpt) error
	ListKeys(ctx context.Context, prefix string, opts ...ListOpt) ([]string, error)
	History(ctx context.Context, key string, opts ...HistoryOpt) ([]KeyRevision[T], error)
}

type KeyValueStoreTBroker

type KeyValueStoreTBroker[T any] interface {
	KeyValueStore(namespace string) KeyValueStoreT[T]
}

type KeyringStore

type KeyringStore interface {
	Put(ctx context.Context, keyring keyring.Keyring) error
	Get(ctx context.Context) (keyring.Keyring, error)
	Delete(ctx context.Context) error
}

type KeyringStoreBroker

type KeyringStoreBroker interface {
	KeyringStore(namespace string, ref *corev1.Reference) KeyringStore
}

type LimitOpt

type LimitOpt int64

func WithLimit

func WithLimit(limit int64) LimitOpt

WithLimit can be used for ListKeysOptions or HistoryOptions.

func (LimitOpt) ApplyListOption

func (l LimitOpt) ApplyListOption(opts *ListKeysOptions)

type ListKeysOptions

type ListKeysOptions struct {
	// Maximum number of keys to return
	Limit *int64
}

func (*ListKeysOptions) Apply

func (o *ListKeysOptions) Apply(opts ...ListOpt)

type ListOpt

type ListOpt interface{ ApplyListOption(*ListKeysOptions) }

type Lock

type Lock interface {
	// Lock acquires a lock on the key. If the lock is already held, it will block until the lock is acquired or
	// the context fails.
	// Lock returns an error if the context expires or an unrecoverable error occurs when trying to acquire the lock.
	Lock(ctx context.Context) (expired <-chan struct{}, err error)

	// TryLock tries to acquire the lock on the key and reports whether it succeeded.
	// It blocks until at least one attempt was made to acquired the lock, and returns acquired=false and no error
	// if the lock is known to be held by someone else
	TryLock(ctx context.Context) (acquired bool, expired <-chan struct{}, err error)

	// Unlock releases the lock on the key in a non-blocking fashion.
	// It spawns a goroutine that will perform the unlock mechanism until it succeeds or the the lock is
	// expired by the server.
	// It immediately signals to the lock's original expired channel that the lock is released.
	Unlock() error
}

Lock is a distributed lock that can be used to coordinate access to a resource or interest in such a resource. Locks follow the following liveliness & atomicity guarantees to prevent distributed deadlocks and guarantee atomicity in the critical section.

Liveliness A : A lock is always eventually released when the process holding it crashes or exits unexpectedly. Liveliness B : A lock is always eventually released when its backend store is unavailable. Atomicity A : No two processes or threads can hold the same lock at the same time. Atomicity B : Any call to unlock will always eventually release the lock

type LockManager

type LockManager interface {
	// Instantiates a new Lock instance for the given key, with the given options.
	//
	// Defaults to lock.DefaultOptions if no options are provided.
	NewLock(key string, opts ...lock.LockOption) Lock
}

LockManager replaces sync.Mutex when a distributed locking mechanism is required.

Usage

## Transient lock (transactions, tasks, ...) ```

	func distributedTransation(lm stores.LockManager, key string) error {
		keyMu := lm.Locker(key, lock.WithKeepalive(false), lock.WithExpireDuration(1 * time.Second))
		if err := keyMu.Lock(); err != nil {
		return err
		}
		defer keyMu.Unlock()
     // do some work
	}

```

## Persistent lock (leader election)

```

func serve(lm stores.LockManager, key string, done chan struct{}) error {
	keyMu := lm.Locker(key, lock.WithKeepalive(true))
	if err := keyMu.Lock(); err != nil {
		return err
	}
	go func() {
		<-done
		keyMu.Unlock()
	}()
	// serve a service...
}

```

type LockManagerBroker

type LockManagerBroker interface {
	LockManager(namespace string) LockManager
}

type MutatorFunc

type MutatorFunc[T any] func(T)

func NewAddCapabilityMutator

func NewAddCapabilityMutator[O corev1.MetadataAccessor[T], T corev1.Capability[T]](capability T) MutatorFunc[O]

func NewCompositeMutator

func NewCompositeMutator[T any](mutators ...MutatorFunc[T]) MutatorFunc[T]

func NewIncrementUsageCountMutator

func NewIncrementUsageCountMutator() MutatorFunc[*corev1.BootstrapToken]

func NewRemoveCapabilityMutator

func NewRemoveCapabilityMutator[O corev1.MetadataAccessor[T], T corev1.Capability[T]](capability T) MutatorFunc[O]

type PrefixOpt

type PrefixOpt bool

func (PrefixOpt) ApplyWatchOption

func (p PrefixOpt) ApplyWatchOption(opts *WatchOptions)

type PutOpt

type PutOpt interface{ ApplyPutOption(*PutOptions) }

type PutOptions

type PutOptions struct {
	// Put only if the latest Revision matches
	Revision *int64

	// If non-nil, will be set to the updated revision of the key after the Put
	// operation completes successfully. If an error occurs, no changes
	// will be made to the value.
	RevisionOut *int64
}

func (*PutOptions) Apply

func (o *PutOptions) Apply(opts ...PutOpt)

type RevisionOpt

type RevisionOpt int64

func WithRevision

func WithRevision(rev int64) RevisionOpt

WithRevision can be used for GetOptions, PutOptions, WatchOptions, or DeleteOptions

func (RevisionOpt) ApplyDeleteOption

func (r RevisionOpt) ApplyDeleteOption(opts *DeleteOptions)

func (RevisionOpt) ApplyGetOption

func (r RevisionOpt) ApplyGetOption(opts *GetOptions)

func (RevisionOpt) ApplyHistoryOption

func (r RevisionOpt) ApplyHistoryOption(opts *HistoryOptions)

func (RevisionOpt) ApplyPutOption

func (r RevisionOpt) ApplyPutOption(opts *PutOptions)

func (RevisionOpt) ApplyWatchOption

func (r RevisionOpt) ApplyWatchOption(opts *WatchOptions)

type RevisionOutOpt

type RevisionOutOpt struct {
	// contains filtered or unexported fields
}

func WithRevisionOut

func WithRevisionOut(out *int64) RevisionOutOpt

WithRevisionOut can be used for GetOptions or PutOptions.

func (RevisionOutOpt) ApplyGetOption

func (r RevisionOutOpt) ApplyGetOption(opts *GetOptions)

func (RevisionOutOpt) ApplyPutOption

func (r RevisionOutOpt) ApplyPutOption(opts *PutOptions)

type RoleBindingMutator

type RoleBindingMutator = MutatorFunc[*corev1.RoleBinding]

type RoleBindingStore

type RoleBindingStore interface {
	CreateRoleBinding(context.Context, *corev1.RoleBinding) error
	UpdateRoleBinding(ctx context.Context, ref *corev1.Reference, mutator RoleBindingMutator) (*corev1.RoleBinding, error)
	DeleteRoleBinding(context.Context, *corev1.Reference) error
	GetRoleBinding(context.Context, *corev1.Reference) (*corev1.RoleBinding, error)
	ListRoleBindings(context.Context) (*corev1.RoleBindingList, error)
}

type RoleMutator

type RoleMutator = MutatorFunc[*corev1.Role]

type RoleStore

type RoleStore = KeyValueStoreT[*corev1.Role]

type SelectorPredicate

type SelectorPredicate[T corev1.IdLabelReader] func(T) bool

type TokenCreateOption

type TokenCreateOption func(*TokenCreateOptions)

func WithCapabilities

func WithCapabilities(capabilities []*corev1.TokenCapability) TokenCreateOption

func WithLabels

func WithLabels(labels map[string]string) TokenCreateOption

func WithMaxUsages

func WithMaxUsages(usages int64) TokenCreateOption

type TokenCreateOptions

type TokenCreateOptions struct {
	Labels       map[string]string
	Capabilities []*corev1.TokenCapability
	MaxUsages    int64
}

func NewTokenCreateOptions

func NewTokenCreateOptions() TokenCreateOptions

func (*TokenCreateOptions) Apply

func (o *TokenCreateOptions) Apply(opts ...TokenCreateOption)

type TokenMutator

type TokenMutator = MutatorFunc[*corev1.BootstrapToken]

type TokenStore

type TokenStore interface {
	CreateToken(ctx context.Context, ttl time.Duration, opts ...TokenCreateOption) (*corev1.BootstrapToken, error)
	DeleteToken(ctx context.Context, ref *corev1.Reference) error
	GetToken(ctx context.Context, ref *corev1.Reference) (*corev1.BootstrapToken, error)
	UpdateToken(ctx context.Context, ref *corev1.Reference, mutator TokenMutator) (*corev1.BootstrapToken, error)
	ListTokens(ctx context.Context) ([]*corev1.BootstrapToken, error)
}

type ValueStoreT

type ValueStoreT[T any] interface {
	Put(ctx context.Context, value T, opts ...PutOpt) error
	Get(ctx context.Context, opts ...GetOpt) (T, error)
	Watch(ctx context.Context, opts ...WatchOpt) (<-chan WatchEvent[KeyRevision[T]], error)
	Delete(ctx context.Context, opts ...DeleteOpt) error
	History(ctx context.Context, opts ...HistoryOpt) ([]KeyRevision[T], error)
}

type WatchEvent

type WatchEvent[T any] struct {
	EventType WatchEventType
	Current   T
	Previous  T
}

type WatchEventType

type WatchEventType string
const (
	// An operation that creates a new key OR modifies an existing key.
	//
	// NB: The Watch API does not distinguish between create and modify events.
	// It is not practical (nor desired, in most cases) to provide this info
	// to the caller, because it cannot be guaranteed to be accurate in all cases.
	// Because of the inability to make this guarantee, any client code that
	// relies on this distinction would be highly likely to end up in an invalid
	// state after a sufficient amount of time, or after issuing a watch request
	// on a key that has a complex and/or truncated history. However, in certain
	// cases, clients may be able to correlate events with out-of-band information
	// to reliably disambiguate Put events. This is necessarily an implementation
	// detail and may not always be possible.
	WatchEventPut WatchEventType = "Put"

	// An operation that removes an existing key.
	//
	// Delete events make few guarantees, as different backends handle deletes
	// differently. Backends are not required to discard revision history, or
	// to stop sending events for a key after it has been deleted. Keys may
	// be recreated after a delete event, in which case a Put event will follow.
	// Such events may or may not contain a previous revision value, depending
	// on implementation details of the backend (they will always contain a
	// current revision value, though).
	WatchEventDelete WatchEventType = "Delete"
)

type WatchOpt

type WatchOpt interface{ ApplyWatchOption(*WatchOptions) }

func WithPrefix

func WithPrefix() WatchOpt

type WatchOptions

type WatchOptions struct {
	// Starting revision for the watch. If not specified, will start at the
	// latest revision.
	Revision *int64

	// If true, all keys under the same prefix will be watched.
	// When prefix mode is disabled (default), events will only be sent for
	// the single key specified in the request.
	// If used in combination with the revision option, will effectively
	// "replay" the history of all keys under the prefix starting at the
	// specified revision.
	// Care should be taken when using this option, especially in combination
	// with a past revision, as it could cause performance issues.
	Prefix bool
}

func (*WatchOptions) Apply

func (o *WatchOptions) Apply(opts ...WatchOpt)

Directories

Path Synopsis
Package etcd implements data storage using etcd.
Package etcd implements data storage using etcd.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL