informer

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2024 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultWatchErrorHandler added in v0.3.0

func DefaultWatchErrorHandler(r *Reflector, err error)

DefaultWatchErrorHandler is the default implementation of WatchErrorHandler

func NewInformer added in v0.6.4

func NewInformer(
	lw cache.ListerWatcher,
	objType runtime.Object,
	store cache.Store,
	resyncPeriod time.Duration,
	h cache.ResourceEventHandler,
	transfor cache.TransformFunc,
) cache.Controller

func NewNamedController

func NewNamedController(name string, config *Config) cache.Controller

Types

type Config added in v0.3.0

type Config struct {
	// The queue for your objects - has to be a DeltaFIFO due to
	// assumptions in the implementation. Your Process() function
	// should accept the output of this Queue's Pop() method.
	cache.Queue

	// Something that can list and watch your objects.
	cache.ListerWatcher

	// Something that can process a popped Deltas.
	Process cache.ProcessFunc

	// ObjectType is an example object of the type this controller is
	// expected to handle.  Only the type needs to be right, except
	// that when that is `unstructured.Unstructured` the object's
	// `"apiVersion"` and `"kind"` must also be right.
	ObjectType runtime.Object

	// FullResyncPeriod is the period at which ShouldResync is considered.
	FullResyncPeriod time.Duration

	// ShouldResync is periodically used by the reflector to determine
	// whether to Resync the Queue. If ShouldResync is `nil` or
	// returns true, it means the reflector should proceed with the
	// resync.
	ShouldResync cache.ShouldResyncFunc

	// If true, when Process() returns an error, re-enqueue the object.
	// TODO: add interface to let you inject a delay/backoff or drop
	//       the object completely if desired. Pass the object in
	//       question to this interface as a parameter.  This is probably moot
	//       now that this functionality appears at a higher level.
	RetryOnError bool

	// Called whenever the ListAndWatch drops the connection with an error.
	WatchErrorHandler WatchErrorHandler

	// WatchListPageSize is the requested chunk size of initial and relist watch lists.
	WatchListPageSize int64

	// StreamHandle of paginated list, resources within a pager will be processed
	// as soon as possible instead of waiting until all resources are pulled before calling the ResourceHandler.
	StreamHandleForPaginatedList bool

	// Force paging, Reflector will sometimes use APIServer's cache,
	// even if paging is specified APIServer will return all resources for performance,
	// then it will skip Reflector's streaming memory optimization.
	ForcePaginatedList bool
}

type DynamicListerWatcherFactory

type DynamicListerWatcherFactory interface {
	ForResource(namespace string, gvr schema.GroupVersionResource) cache.ListerWatcher
	ForResourceWithOptions(namespace string, gvr schema.GroupVersionResource, optionsFunc TweakListOptionsFunc) cache.ListerWatcher
}

func NewDynamicListerWatcherFactory added in v0.3.0

func NewDynamicListerWatcherFactory(config *rest.Config) (DynamicListerWatcherFactory, error)

type ExtraStore added in v0.8.0

type ExtraStore interface {
	// Add adds the given object to the accumulator associated with the given object's key
	Add(obj interface{}) error

	// Update updates the given object in the accumulator associated with the given object's key
	Update(obj interface{}) error

	// Delete deletes the given object from the accumulator associated with the given object's key
	Delete(obj interface{}) error

	// Replace will delete the contents of the store, using instead the
	// given list. Store takes ownership of the list, you should not reference
	// it after calling this function.
	Replace([]interface{}, string) error
}

type FilteringResourceEventHandler

type FilteringResourceEventHandler struct {
	FilterFunc func(obj interface{}) bool
	Handler    ResourceEventHandler
}

func (FilteringResourceEventHandler) OnAdd

func (r FilteringResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool)

func (FilteringResourceEventHandler) OnDelete

func (r FilteringResourceEventHandler) OnDelete(obj interface{})

func (FilteringResourceEventHandler) OnSync

func (r FilteringResourceEventHandler) OnSync(obj interface{})

func (FilteringResourceEventHandler) OnUpdate

func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}, isInInitialList bool)

type InformerConfig added in v0.8.0

type InformerConfig struct {
	cache.ListerWatcher
	Storage *ResourceVersionStorage

	ExampleObject runtime.Object
	Handler       ResourceEventHandler
	ErrorHandler  WatchErrorHandler
	ExtraStore    ExtraStore

	WatchListPageSize            int64
	ForcePaginatedList           bool
	StreamHandleForPaginatedList bool
}

type Reflector added in v0.3.0

type Reflector struct {

	// MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch.
	MaxInternalErrorRetryDuration time.Duration

	// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
	ShouldResync func() bool

	// WatchListPageSize is the requested chunk size of initial and resync watch lists.
	// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
	// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
	// it will turn off pagination to allow serving them from watch cache.
	// NOTE: It should be used carefully as paginated lists are always served directly from
	// etcd, which is significantly less efficient and may lead to serious performance and
	// scalability problems.
	WatchListPageSize int64

	// StreamHandle of paginated list, resources within a pager will be processed
	// as soon as possible instead of waiting until all resources are pulled before calling the ResourceHandler.
	StreamHandleForPaginatedList bool

	// Force paging, Reflector will sometimes use APIServer's cache,
	// even if paging is specified APIServer will return all resources for performance,
	// then it will skip Reflector's streaming memory optimization.
	ForcePaginatedList bool
	// contains filtered or unexported fields
}

Reflector watches a specified resource and causes all changes to be reflected in the given store.

func NewNamedReflector added in v0.3.0

func NewNamedReflector(name string, lw cache.ListerWatcher, expectedType interface{}, store cache.Store, resyncPeriod time.Duration) *Reflector

NewNamedReflector same as NewReflector, but with a specified name for logging

func NewNamespaceKeyedIndexerAndReflector added in v0.3.0

func NewNamespaceKeyedIndexerAndReflector(lw cache.ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer cache.Indexer, reflector *Reflector)

NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector The indexer is configured to key on namespace

func NewReflector added in v0.3.0

func NewReflector(lw cache.ListerWatcher, expectedType interface{}, store cache.Store, resyncPeriod time.Duration) *Reflector

NewReflector creates a new Reflector object which will keep the given store up to date with the server's contents for the given resource. Reflector promises to only put things in the store that have the type of expectedType, unless expectedType is nil. If resyncPeriod is non-zero, then the reflector will periodically consult its ShouldResync function to determine whether to invoke the Store's Resync operation; `ShouldResync==nil` means always "yes". This enables you to use reflectors to periodically process everything as well as incrementally processing the things that change.

func (*Reflector) HasInitializedSynced added in v0.8.0

func (r *Reflector) HasInitializedSynced() bool

func (*Reflector) LastSyncResourceVersion added in v0.3.0

func (r *Reflector) LastSyncResourceVersion() string

LastSyncResourceVersion is the resource version observed when last sync with the underlying store The value returned is not synchronized with access to the underlying store and is not thread-safe

func (*Reflector) ListAndWatch added in v0.3.0

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error

ListAndWatch first lists all items and get the resource version at the moment of call, and then use the resource version to watch. It returns error if ListAndWatch didn't even try to initialize watch.

func (*Reflector) Run added in v0.3.0

func (r *Reflector) Run(stopCh <-chan struct{})

Run repeatedly uses the reflector's ListAndWatch to fetch all the objects and subsequent deltas. Run will exit when stopCh is closed.

type ResourceEventHandler

type ResourceEventHandler interface {
	cache.ResourceEventHandler
	OnSync(obj interface{})
}

type ResourceEventHandlerFuncs

type ResourceEventHandlerFuncs struct {
	AddFunc    func(obj interface{})
	UpdateFunc func(oldObj, newObj interface{})
	DeleteFunc func(obj interface{})
	SyncFunc   func(obj interface{})
}

func (ResourceEventHandlerFuncs) OnAdd

func (r ResourceEventHandlerFuncs) OnAdd(obj interface{})

func (ResourceEventHandlerFuncs) OnDelete

func (r ResourceEventHandlerFuncs) OnDelete(obj interface{})

func (ResourceEventHandlerFuncs) OnSync

func (r ResourceEventHandlerFuncs) OnSync(obj interface{})

func (ResourceEventHandlerFuncs) OnUpdate

func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{})

type ResourceVersionInformer

type ResourceVersionInformer interface {
	Run(stopCh <-chan struct{})
	HasSynced() bool
}

func NewResourceVersionInformer

func NewResourceVersionInformer(name string, config InformerConfig) ResourceVersionInformer

type ResourceVersionStorage

type ResourceVersionStorage struct {
	// contains filtered or unexported fields
}

func NewResourceVersionStorage

func NewResourceVersionStorage() *ResourceVersionStorage

func (*ResourceVersionStorage) Add

func (c *ResourceVersionStorage) Add(obj interface{}) error

func (*ResourceVersionStorage) Delete

func (c *ResourceVersionStorage) Delete(obj interface{}) error

func (*ResourceVersionStorage) Get

func (c *ResourceVersionStorage) Get(obj interface{}) (string, bool, error)

func (*ResourceVersionStorage) GetByKey

func (c *ResourceVersionStorage) GetByKey(key string) (item interface{}, exists bool, err error)

func (*ResourceVersionStorage) ListKeys

func (c *ResourceVersionStorage) ListKeys() []string

func (*ResourceVersionStorage) Replace

func (c *ResourceVersionStorage) Replace(versions map[string]interface{}) error

func (*ResourceVersionStorage) Update

func (c *ResourceVersionStorage) Update(obj interface{}) error

type ResourceVersionUpdater added in v0.3.0

type ResourceVersionUpdater interface {
	// UpdateResourceVersion is called each time current resource version of the reflector
	// is updated.
	UpdateResourceVersion(resourceVersion string)
}

ResourceVersionUpdater is an interface that allows store implementation to track the current resource version of the reflector. This is especially important if storage bookmarks are enabled.

type TweakListOptionsFunc

type TweakListOptionsFunc func(*metav1.ListOptions)

type WatchErrorHandler added in v0.3.0

type WatchErrorHandler func(r *Reflector, err error)

The WatchErrorHandler is called whenever ListAndWatch drops the connection with an error. After calling this handler, the informer will backoff and retry.

The default implementation looks at the error type and tries to log the error message at an appropriate level.

Implementations of this handler may display the error message in other ways. Implementations should return quickly - any expensive processing should be offloaded.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL