resource

package
v1.15.0-pre.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2023 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AddToScheme = localSchemeBuilder.AddToScheme

Functions

This section is empty.

Types

type ErrorAction

type ErrorAction string
var (
	// ErrorActionRetry instructs to retry the processing. The key is requeued after
	// rate limiting.
	ErrorActionRetry ErrorAction = "retry"

	// ErrorActionIgnore instructs to ignore the error.
	ErrorActionIgnore ErrorAction = "ignore"

	// ErrorActionStop instructs to stop the processing for this subscriber.
	ErrorActionStop ErrorAction = "stop"
)

func AlwaysRetry

func AlwaysRetry(Key, int, error) ErrorAction

AlwaysRetry is an error handler that always retries the error.

type ErrorHandler

type ErrorHandler func(key Key, numRetries int, err error) ErrorAction

ErrorHandler is a function that takes the key of the failing object (zero key if event was sync), the number of times the key has been retried and the error that occurred. The function returns the action that should be taken.

func RetryUpTo

func RetryUpTo(n int) ErrorHandler

RetryUpTo is an error handler that retries a key up to specified number of times before stopping.

type Event

type Event[T k8sRuntime.Object] struct {
	Kind   EventKind
	Key    Key
	Object T

	// Done marks the event as processed.  If err is non-nil, the
	// key of the object is requeued and the processing retried at
	// a later time with a potentially new version of the object.
	//
	// If this method is not called after the references to the event
	// are gone, the finalizer will panic.
	Done func(err error)
}

Event emitted from resource.

type EventKind

type EventKind string
const (
	Sync   EventKind = "sync"
	Upsert EventKind = "upsert"
	Delete EventKind = "delete"
)

type EventsOpt

type EventsOpt func(*eventsOpts)

func WithErrorHandler

func WithErrorHandler(h ErrorHandler) EventsOpt

WithErrorHandler specifies the error handling strategy for failed events. By default the strategy is to always requeue the processing of a failed event.

func WithRateLimiter

func WithRateLimiter(r workqueue.RateLimiter) EventsOpt

WithRateLimiter sets the rate limiting algorithm to be used when requeueing failed events.

type Key

type Key struct {
	// Name is the name of the object
	Name string

	// Namespace is the namespace, or empty if object is not namespaced.
	Namespace string
}

Key of an K8s object, e.g. name and optional namespace.

func NewKey

func NewKey(obj any) Key

func (Key) String

func (k Key) String() string

type KeyIter

type KeyIter interface {
	// Next returns true if there is a key, false if iteration has finished.
	Next() bool
	Key() Key
}

type Resource

type Resource[T k8sRuntime.Object] interface {
	stream.Observable[Event[T]]

	// Events returns a channel of events. Each event must be marked as handled
	// with a call to Done(), otherwise no new events for this key will be emitted.
	//
	// When Done() is called with non-nil error the error handler is invoked, which
	// can ignore, requeue the event or close the channel. The default error handler
	// will requeue.
	Events(ctx context.Context, opts ...EventsOpt) <-chan Event[T]

	// Store retrieves the read-only store for the resource. Blocks until
	// the store has been synchronized or the context cancelled.
	// Returns a non-nil error if context is cancelled or the resource
	// has been stopped before store has synchronized.
	Store(context.Context) (Store[T], error)
}

Resource provides access to a Kubernetes resource through either a stream of events or a read-only store.

Observing of the events can be done from a constructor as subscriber registration is non-blocking.

Store() however should only be called from a start hook, or from a goroutine forked from the start hook as it blocks until the store has been synchronized.

The subscriber can process the events from Events() asynchronously and in parallel, but for each event the Done() function must be called to mark the event as handled. If not done no new events will be emitted for this key. If an event handling is marked as failed the configured error handler is called (WithErrorHandler). The default error handler will requeue the event (by its key) for later retried processing. The requeueing is rate limited and can be configured with WithRateLimiter option to Events().

The resource is lazy, e.g. it will not start the informer until a call has been made to Events() or Store().

func New

New creates a new Resource[T]. Use with hive.Provide:

var exampleCell = hive.Module(
	"example",
 	cell.Provide(
	 	// Provide `Resource[*slim_corev1.Pod]` to the hive:
	 	func(lc hive.Lifecycle, c k8sClient.Clientset) resource.Resource[*slim_corev1.Pod] {
			lw := utils.ListerWatcherFromTyped[*slim_corev1.PodList](
				c.Slim().CoreV1().Pods(""),
			)
	 		return resource.New(lc, lw)
	 	}
	}),
	...
)

func usePods(pods resource.Resource[*slim_corev1.Pod]) {
	go func() {
		for ev := range podEvents {
	   		onPodEvent(ev)
	   	}
	}
	return e
}
func onPodEvent(event resource.Event[*slim_core.Pod]) {
	switch event.Kind {
	case resource.Sync:
		// Pods have now been synced and the set of Upsert events
		// received thus far forms a coherent snapshot.

		// Must always call event.Done(error) to mark the event as processed.
		event.Done(nil)
	case resource.Upsert:
		event.Done(onPodUpsert(event.Object))
	case resource.Delete:
		event.Done(onPodDelete(event.Object))
	}
}

See also pkg/k8s/resource/example/main.go for a runnable example.

type ResourceOption

type ResourceOption func(o *options)

func WithIndexers

func WithIndexers(indexers cache.Indexers) ResourceOption

WithIndexers sets additional custom indexers on the resource store.

func WithLazyTransform

func WithLazyTransform(sourceObj func() k8sRuntime.Object, transform cache.TransformFunc) ResourceOption

WithLazyTransform sets the function to transform the object before storing it. Unlike "WithTransform", this defers the resolving of the source object type until the resource is needed. Use this in situations where the source object depends on api-server capabilities.

func WithMetric

func WithMetric(scope string) ResourceOption

WithMetric enables metrics collection for the resource using the provided scope.

func WithName added in v1.15.0

func WithName(name string) ResourceOption

WithName sets the name of the resource. Used for workqueue metrics.

func WithTransform

func WithTransform[From, To k8sRuntime.Object](transform func(From) (To, error)) ResourceOption

WithTransform sets the function to transform the object before storing it.

type Store

type Store[T k8sRuntime.Object] interface {
	// List returns all items currently in the store.
	List() []T

	// IterKeys returns a key iterator.
	IterKeys() KeyIter

	// Get returns the latest version by deriving the key from the given object.
	Get(obj T) (item T, exists bool, err error)

	// GetByKey returns the latest version of the object with given key.
	GetByKey(key Key) (item T, exists bool, err error)

	// IndexKeys returns the keys of the stored objects whose set of indexed values
	// for the index includes the given indexed value.
	IndexKeys(indexName, indexedValue string) ([]string, error)

	// ByIndex returns the stored objects whose set of indexed values for the index
	// includes the given indexed value.
	ByIndex(indexName, indexedValue string) ([]T, error)

	// CacheStore returns the underlying cache.Store instance. Use for temporary
	// compatibility purposes only!
	CacheStore() cache.Store
}

Store is a read-only typed wrapper for cache.Store.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL