Documentation
¶
Overview ¶
Package controller contains the building blocks for creating Kubernetes controllers.
Index ¶
- Constants
- Variables
- func Stack(skip int) []byte
- type Config
- type Controller
- type DefaultRecorder
- func (r *DefaultRecorder) AddEvent(_ context.Context, name string, isRequeue bool)
- func (r *DefaultRecorder) DoneEvent(_ context.Context, name string, success bool, start time.Time)
- func (r *DefaultRecorder) GetEvent(_ context.Context, name string, queued time.Time)
- func (r *DefaultRecorder) RegisterLen(controller string, call func(context.Context) int)
- type DefaultRunner
- type Handler
- type LeaderConfig
- type LeaderRunner
- type MonitorQueue
- type Processor
- type Queue
- type Recorder
- type RecorderConfig
- type ResourceEventHandler
- type Retriever
- type RetryQueue
- func (q *RetryQueue[T]) Add(_ context.Context, item T)
- func (q *RetryQueue[T]) Done(_ context.Context, item T)
- func (q *RetryQueue[T]) Get(_ context.Context) (item T, shutdown bool)
- func (q *RetryQueue[T]) Len(_ context.Context) int
- func (q *RetryQueue[T]) Name() string
- func (q *RetryQueue[T]) Requeue(_ context.Context, item T) error
- func (q *RetryQueue[T]) ShutDown(_ context.Context)
- type Runnable
- type Runner
Constants ¶
const ( // StackFrames is the number of stack frames to capture. StackFrames = 16 // StackBufferSize is the initial size of the buffer used to capture the // stack trace. StackBufferSize = StackFrames * 64 // StackFramesSkip is the number of stack frames to skip when capturing a // stack trace. This includes the frames for capturing the stack trace and // to call the Stack function. StackFramesSkip = 5 )
Variables ¶
var ( // ErrQueue is a generic queue error. ErrQueue = errors.New("queue") // ErrMaxRetries is returned when an item has reached the maximum number // of retries. ErrMaxRetries = ErrQueue.New("max retries") )
var ErrController = errors.New("controller")
ErrController is an operator error.
var ErrPanic = errors.New("panic")
ErrPanic is returned when a panic occurs during event processing.
Functions ¶
Types ¶
type Config ¶
type Config struct {
// Name of the main controller resource.
Name string
// Workers is the number of concurrent workers the controller will run
// processing events.
Workers int
// Sync is the interval in which the controller will process a re-capture
// of the selected resources from the API server.
Sync time.Duration
// Retries is the number of times the controller will try to process an
// resource event before returning a real error.
Retries int
}
Config is the controller configuration.
type Controller ¶
type Controller[T runtime.Object] interface { // Runnable enables the controller to be runnable. Runnable // Get retrieves an resource object given by its key. Get(key string) (T, error) // List retrieves all objects owned by the owner with the given namespace, // object name, and given uid. If name or uid are empty, they are ignored // during the ownership check. If the namespace is empty, all namespaces are // considered. If the namespace, name, and uid are empty, all objects are // returned. List(namespace, name string, uid types.UID) []T // AddHandler will add a new handler to the controller. AddHandler(handler Handler[T], recorder Recorder) error }
Controller is the interface for managing the controller and accessing controller resources.
type DefaultRecorder ¶
type DefaultRecorder struct {
// contains filtered or unexported fields
}
DefaultRecorder implements the metrics recording in a prometheus registry.
func NewRecorder ¶
func NewRecorder( config RecorderConfig, reg prometheus.Registerer, ) *DefaultRecorder
NewRecorder returns a new Prometheus implementation for a metrics recorder.
func (*DefaultRecorder) AddEvent ¶
func (r *DefaultRecorder) AddEvent( _ context.Context, name string, isRequeue bool, )
AddEvent satisfies controller.MetricsRecorder interface.
func (*DefaultRecorder) DoneEvent ¶
func (r *DefaultRecorder) DoneEvent( _ context.Context, name string, success bool, start time.Time, )
DoneEvent satisfies controller.MetricsRecorder interface.
func (*DefaultRecorder) GetEvent ¶
GetEvent satisfies controller.MetricsRecorder interface.
revive:disable-next-line:get-return // named after recorded event.
func (*DefaultRecorder) RegisterLen ¶
func (r *DefaultRecorder) RegisterLen( controller string, call func(context.Context) int, )
RegisterLen satisfies controller.MetricsRecorder interface.
type DefaultRunner ¶
type DefaultRunner struct{}
DefaultRunner is the default implementation of a runner.
type Handler ¶
type Handler[T runtime.Object] interface { // Handle knows how to handle resource events. Handle(ctx context.Context, obj runtime.Object) error // Notify is called to notify about errors during processing. The error // may be of type `ErrPanic`, to indicates that the controller loop has // panicked, or `nil` to indicate an informational message. Notify(ctx context.Context, key string, err error) }
Handler is the interface the resource event handler.
type LeaderConfig ¶
type LeaderConfig struct {
// Name is the basic name of the resource lock.
Name string `default:"controller"`
// Namespace is the namespace of the resource lock.
Namespace string `default:"default"`
// LockType is the type of resource lock to use. Supported types are
// "endpoints", "configmaps", and "leases".
LockType string `default:"leases"`
// LeaseDuration is the duration that non-leader candidates will
// wait to force acquire leadership. This is measured against time of
// last observed ack.
LeaseDuration time.Duration `default:"15s"`
// RenewDeadline is the duration that the acting leader will retry
// refreshing leadership before giving up.
RenewDeadline time.Duration `default:"10s"`
// RenewPeriod is the duration the leader elector clients should wait
// between tries of actions.
RenewPeriod time.Duration `default:"2s"`
}
LeaderConfig is the configuration for the leader election, i.e. lease duration, renew deadline, and retry period.
type LeaderRunner ¶
type LeaderRunner struct {
// contains filtered or unexported fields
}
LeaderRunner is the leader election default implementation.
type MonitorQueue ¶
type MonitorQueue[T comparable] struct { Queue[T] // contains filtered or unexported fields }
MonitorQueue is a wrapper for a monitored queue.
func (*MonitorQueue[T]) Add ¶
func (q *MonitorQueue[T]) Add(ctx context.Context, item T)
Add will add an item to the queue.
type Processor ¶
Processor is the default implementation of a processor.
func NewProcessor ¶
func NewProcessor[T runtime.Object]( handler Handler[T], informer cache.SharedIndexInformer, workers int, queue Queue[string], recorder Recorder, ) *Processor[T]
NewProcessor creates a new processor.
type Queue ¶
type Queue[T comparable] interface { // Name returns the name of the queue. Name() string // Len returns the size of the queue. Len(ctx context.Context) int // Add will add an item to the queue. Add(ctx context.Context, item T) // Requeue will add an item to the queue in a requeue mode. If will // return an error if an item has reached max requeue attempts. Requeue(ctx context.Context, item T) error // Get is returning the first item in the queue. If the last item has not // been finished, the request will block until the queue is notified. Get(ctx context.Context) (item T, shutdown bool) // Done marks the item being used as done. Done(ctx context.Context, item T) // ShutDown stops the queue from accepting new items. ShutDown(ctx context.Context) }
Queue is a generic interface that any queue should implement.
func NewDefaultQueue ¶
NewDefaultQueue creates a new default queue with given number of retries and given recorder for monitoring.
func NewMonitorQueue ¶
func NewMonitorQueue[T comparable]( queue Queue[T], mrec Recorder, ) Queue[T]
NewMonitorQueue creates a new monitored queue wrapping the given queue.
func NewRetryQueue ¶
func NewRetryQueue[T comparable]( name string, queue workqueue.TypedRateLimitingInterface[T], retries int, ) Queue[T]
NewRetryQueue creates a new rate limiting queue with the given work queue and max retries.
type Recorder ¶
type Recorder interface {
// AddEvent adds an event metric records of a queued event.
AddEvent(ctx context.Context, name string, retry bool)
// GetEvent measures how long an object stays in the queue. If the object
// is already in queue it will be measured once, since the first time it
// was added to the queue.
GetEvent(ctx context.Context, name string, queued time.Time)
// DoneEvent measures how long it takes to process a resources.
DoneEvent(ctx context.Context, name string, success bool, start time.Time)
// RegisterLen will register a function that will be called by the metrics
// recorder to get the length of a queue at a given point in time.
RegisterLen(name string, call func(context.Context) int)
}
Recorder knows how to record queue metrics.
type RecorderConfig ¶
type RecorderConfig struct {
// Namespace is the prometheus metrics namespace.
Namespace string
// Subsystem is the prometheus metrics subsystem.
Subsystem string
// QueueBuckets sets custom buckets for the duration/latency items in queue metrics.
// Check https://godoc.org/github.com/prometheus/client_golang/prometheus#pkg-variables
QueueBuckets []float64 `default:"[0.01,0.05,0.1,0.25,0.5,1,3,10,20,60,150,300]"`
// ProcessBuckets sets custom buckets for the duration/latency processing metrics.
// Check https://godoc.org/github.com/prometheus/client_golang/prometheus#pkg-variables
ProcessBuckets []float64 `default:"[0.005,0.01,0.025,0.05,0.1,0.25,0.5,1,2.5,5,10,30]"`
}
RecorderConfig is the recorder configuration.
type ResourceEventHandler ¶
ResourceEventHandler is an event handler for handling resource events.
func NewResourceEventHandler ¶
func NewResourceEventHandler[T runtime.Object]( handler Handler[T], queue Queue[string], ) *ResourceEventHandler[T]
NewResourceEventHandler creates a new event handler.
func (*ResourceEventHandler[T]) OnAdd ¶
func (r *ResourceEventHandler[T]) OnAdd(obj any, _ bool)
OnAdd is called when an object is added.
func (*ResourceEventHandler[T]) OnDelete ¶
func (r *ResourceEventHandler[T]) OnDelete(obj any)
OnDelete is called when an object is deleted.
func (*ResourceEventHandler[T]) OnUpdate ¶
func (r *ResourceEventHandler[T]) OnUpdate(_, obj any)
OnUpdate is called when an object is updated.
type Retriever ¶
type Retriever[T runtime.Object] interface { // List retrieves the list of resources from the API server. List(ctx context.Context, options metav1.ListOptions) (T, error) // Watch starts watching for changes on the resources from the API server. Watch( ctx context.Context, options metav1.ListOptions, ) (watch.Interface, error) }
Retriever is the retriever interface for the service handling a resource.
type RetryQueue ¶
type RetryQueue[T comparable] struct { // contains filtered or unexported fields }
RetryQueue is a rate limiting queue implementation.
func (*RetryQueue[T]) Add ¶
func (q *RetryQueue[T]) Add(_ context.Context, item T)
Add will add an item to the queue.
func (*RetryQueue[T]) Done ¶
func (q *RetryQueue[T]) Done(_ context.Context, item T)
Done marks the item being used as done.
func (*RetryQueue[T]) Get ¶
func (q *RetryQueue[T]) Get(_ context.Context) (item T, shutdown bool)
Get returns the first item in the queue. If the last item has not been finished, the request will block.
func (*RetryQueue[T]) Len ¶
func (q *RetryQueue[T]) Len(_ context.Context) int
Len returns the size of the queue.
func (*RetryQueue[T]) Name ¶
func (q *RetryQueue[T]) Name() string
Name returns the name of the queue.
func (*RetryQueue[T]) Requeue ¶
func (q *RetryQueue[T]) Requeue(_ context.Context, item T) error
Requeue re-adds the given item to the queue if the max retries has not been reached yet.
func (*RetryQueue[T]) ShutDown ¶
func (q *RetryQueue[T]) ShutDown(_ context.Context)
ShutDown stops the queue from accepting new items.
type Runnable ¶
type Runnable interface {
// Init initializes the controller runnable by starting the informer and
// waiting until the cache is synced. This allows to coordinate multiple
// controllers by initializing all controllers before running their
// processors.
Init(ctx context.Context, errch chan error)
// Run starts the controller runnable by creating the workers that are
// running the actual event processing.
Run(ctx context.Context)
}
Runnable is the interface for controller runnables that can be first initialized and than started.
type Runner ¶
type Runner interface {
// Run runs the given set of controllers using the given context and error
// channel for reporting errors.
Run(ctx context.Context, errch chan error, runnables ...Runnable)
}
Runner knows how to run the processing queue.
func NewDefaultRunner ¶
func NewDefaultRunner() Runner
NewDefaultRunner creates a new default runner that just runs the function.
func NewLeaderRunner ¶
func NewLeaderRunner( id string, config *LeaderConfig, k8scli kubernetes.Interface, ) Runner
NewLeaderRunner returns a new leader election runner with given unique host identifier that can be used to run a function after acquiring leadership using the Kubernetes leader election mechanism. Make sure to use a unique host identifier for each instance of a leader eleaction runner, e.g. by adding a universal unique identifier to the hostname.