controller

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2026 License: MIT Imports: 25 Imported by: 0

Documentation

Overview

Package controller contains the building blocks for creating Kubernetes controllers.

Index

Constants

View Source
const (
	// StackFrames is the number of stack frames to capture.
	StackFrames = 16
	// StackBufferSize is the initial size of the buffer used to capture the
	// stack trace.
	StackBufferSize = StackFrames * 64
	// StackFramesSkip is the number of stack frames to skip when capturing a
	// stack trace. This includes the frames for capturing the stack trace and
	// to call the Stack function.
	StackFramesSkip = 5
)

Variables

View Source
var (
	// ErrQueue is a generic queue error.
	ErrQueue = errors.New("queue")
	// ErrMaxRetries is returned when an item has reached the maximum number
	// of retries.
	ErrMaxRetries = ErrQueue.New("max retries")
)
View Source
var ErrController = errors.New("controller")

ErrController is an operator error.

View Source
var ErrPanic = errors.New("panic")

ErrPanic is returned when a panic occurs during event processing.

Functions

func Stack

func Stack(skip int) []byte

Stack returns the current stack trace, skipping the given number of frames.

Types

type Config

type Config struct {
	// Name of the main controller resource.
	Name string
	// Workers is the number of concurrent workers the controller will run
	// processing events.
	Workers int
	// Sync is the interval in which the controller will process a re-capture
	// of the selected resources from the API server.
	Sync time.Duration
	// Retries is the number of times the controller will try to process an
	// resource event before returning a real error.
	Retries int
}

Config is the controller configuration.

type Controller

type Controller[T runtime.Object] interface {
	// Runnable enables the controller to be runnable.
	Runnable
	// Get retrieves an resource object given by its key.
	Get(key string) (T, error)
	// List retrieves all objects owned by the owner with the given namespace,
	// object name, and given uid. If name or uid are empty, they are ignored
	// during the ownership check. If the namespace is empty, all namespaces are
	// considered. If the namespace, name, and uid are empty, all objects are
	// returned.
	List(namespace, name string, uid types.UID) []T
	// AddHandler will add a new handler to the controller.
	AddHandler(handler Handler[T], recorder Recorder) error
}

Controller is the interface for managing the controller and accessing controller resources.

func New

func New[T runtime.Object, L runtime.Object](
	config *Config, retriever Retriever[L], indexers cache.Indexers,
) Controller[T]

New creates a new controller for given retriever using given configuration and indexers.

type DefaultRecorder

type DefaultRecorder struct {
	// contains filtered or unexported fields
}

DefaultRecorder implements the metrics recording in a prometheus registry.

func NewRecorder

func NewRecorder(
	config RecorderConfig, reg prometheus.Registerer,
) *DefaultRecorder

NewRecorder returns a new Prometheus implementation for a metrics recorder.

func (*DefaultRecorder) AddEvent

func (r *DefaultRecorder) AddEvent(
	_ context.Context, name string, isRequeue bool,
)

AddEvent satisfies controller.MetricsRecorder interface.

func (*DefaultRecorder) DoneEvent

func (r *DefaultRecorder) DoneEvent(
	_ context.Context, name string, success bool, start time.Time,
)

DoneEvent satisfies controller.MetricsRecorder interface.

func (*DefaultRecorder) GetEvent

func (r *DefaultRecorder) GetEvent(
	_ context.Context, name string, queued time.Time,
)

GetEvent satisfies controller.MetricsRecorder interface.

revive:disable-next-line:get-return // named after recorded event.

func (*DefaultRecorder) RegisterLen

func (r *DefaultRecorder) RegisterLen(
	controller string, call func(context.Context) int,
)

RegisterLen satisfies controller.MetricsRecorder interface.

type DefaultRunner

type DefaultRunner struct{}

DefaultRunner is the default implementation of a runner.

func (*DefaultRunner) Run

func (*DefaultRunner) Run(
	ctx context.Context, errch chan error, runnables ...Runnable,
)

Run runs the given set of controllers using the given context and error channel for reporting errors.

type Handler

type Handler[T runtime.Object] interface {
	// Handle knows how to handle resource events.
	Handle(ctx context.Context, obj runtime.Object) error
	// Notify is called to notify about errors during processing. The error
	// may be of type `ErrPanic`, to indicates that the controller loop has
	// panicked, or `nil` to indicate an informational message.
	Notify(ctx context.Context, key string, err error)
}

Handler is the interface the resource event handler.

func NewHandler

func NewHandler[T runtime.Object](
	handle func(ctx context.Context, obj T) error,
	base *errors.Error,
) Handler[T]

NewHandler creates a new resource event handler.

type LeaderConfig

type LeaderConfig struct {
	// Name is the basic name of the resource lock.
	Name string `default:"controller"`
	// Namespace is the namespace of the resource lock.
	Namespace string `default:"default"`
	// LockType is the type of resource lock to use. Supported types are
	// "endpoints", "configmaps", and "leases".
	LockType string `default:"leases"`

	// LeaseDuration is the duration that non-leader candidates will
	// wait to force acquire leadership. This is measured against time of
	// last observed ack.
	LeaseDuration time.Duration `default:"15s"`
	// RenewDeadline is the duration that the acting leader will retry
	// refreshing leadership before giving up.
	RenewDeadline time.Duration `default:"10s"`
	// RenewPeriod is the duration the leader elector clients should wait
	// between tries of actions.
	RenewPeriod time.Duration `default:"2s"`
}

LeaderConfig is the configuration for the leader election, i.e. lease duration, renew deadline, and retry period.

type LeaderRunner

type LeaderRunner struct {
	// contains filtered or unexported fields
}

LeaderRunner is the leader election default implementation.

func (*LeaderRunner) Run

func (r *LeaderRunner) Run(
	ctx context.Context, errch chan error, runnables ...Runnable,
)

Run runs the given set of controllers using the given context and error channel for reporting errors.

type MonitorQueue

type MonitorQueue[T comparable] struct {
	Queue[T]
	// contains filtered or unexported fields
}

MonitorQueue is a wrapper for a monitored queue.

func (*MonitorQueue[T]) Add

func (q *MonitorQueue[T]) Add(ctx context.Context, item T)

Add will add an item to the queue.

func (*MonitorQueue[T]) Get

func (q *MonitorQueue[T]) Get(ctx context.Context) (T, bool)

Get returns the first item in the queue. If the last item has not been finished, the request will block.

func (*MonitorQueue[T]) Requeue

func (q *MonitorQueue[T]) Requeue(ctx context.Context, item T) error

Requeue re-adds the given item to the queue in requeue mode.

type Processor

type Processor[T runtime.Object] struct {
	// contains filtered or unexported fields
}

Processor is the default implementation of a processor.

func NewProcessor

func NewProcessor[T runtime.Object](
	handler Handler[T], informer cache.SharedIndexInformer, workers int,
	queue Queue[string], recorder Recorder,
) *Processor[T]

NewProcessor creates a new processor.

func (*Processor[T]) Process

func (p *Processor[T]) Process(ctx context.Context)

Process will start a processing loop on event queue. The loop will run until the given context is done or the queue is shutdown.

func (*Processor[T]) Run

func (p *Processor[T]) Run(ctx context.Context)

Run will start the processing loop.

type Queue

type Queue[T comparable] interface {
	// Name returns the name of the queue.
	Name() string
	// Len returns the size of the queue.
	Len(ctx context.Context) int
	// Add will add an item to the queue.
	Add(ctx context.Context, item T)
	// Requeue will add an item to the queue in a requeue mode. If will
	// return an error if an item has reached max requeue attempts.
	Requeue(ctx context.Context, item T) error
	// Get is returning the first item in the queue. If the last item has not
	// been finished, the request will block until the queue is notified.
	Get(ctx context.Context) (item T, shutdown bool)
	// Done marks the item being used as done.
	Done(ctx context.Context, item T)
	// ShutDown stops the queue from accepting new items.
	ShutDown(ctx context.Context)
}

Queue is a generic interface that any queue should implement.

func NewDefaultQueue

func NewDefaultQueue(
	name string, retries int, recorder Recorder,
) Queue[string]

NewDefaultQueue creates a new default queue with given number of retries and given recorder for monitoring.

func NewMonitorQueue

func NewMonitorQueue[T comparable](
	queue Queue[T], mrec Recorder,
) Queue[T]

NewMonitorQueue creates a new monitored queue wrapping the given queue.

func NewRetryQueue

func NewRetryQueue[T comparable](
	name string, queue workqueue.TypedRateLimitingInterface[T], retries int,
) Queue[T]

NewRetryQueue creates a new rate limiting queue with the given work queue and max retries.

type Recorder

type Recorder interface {
	// AddEvent adds an event metric records of a queued event.
	AddEvent(ctx context.Context, name string, retry bool)
	// GetEvent measures how long an object stays in the queue. If the object
	// is already in queue it will be measured once, since the first time it
	// was added to the queue.
	GetEvent(ctx context.Context, name string, queued time.Time)
	// DoneEvent measures how long it takes to process a resources.
	DoneEvent(ctx context.Context, name string, success bool, start time.Time)
	// RegisterLen will register a function that will be called by the metrics
	// recorder to get the length of a queue at a given point in time.
	RegisterLen(name string, call func(context.Context) int)
}

Recorder knows how to record queue metrics.

type RecorderConfig

type RecorderConfig struct {
	// Namespace is the prometheus metrics namespace.
	Namespace string
	// Subsystem is the prometheus metrics subsystem.
	Subsystem string
	// QueueBuckets sets custom buckets for the duration/latency items in queue metrics.
	// Check https://godoc.org/github.com/prometheus/client_golang/prometheus#pkg-variables
	QueueBuckets []float64 `default:"[0.01,0.05,0.1,0.25,0.5,1,3,10,20,60,150,300]"`
	// ProcessBuckets sets custom buckets for the duration/latency processing metrics.
	// Check https://godoc.org/github.com/prometheus/client_golang/prometheus#pkg-variables
	ProcessBuckets []float64 `default:"[0.005,0.01,0.025,0.05,0.1,0.25,0.5,1,2.5,5,10,30]"`
}

RecorderConfig is the recorder configuration.

type ResourceEventHandler

type ResourceEventHandler[T runtime.Object] struct {
	// contains filtered or unexported fields
}

ResourceEventHandler is an event handler for handling resource events.

func NewResourceEventHandler

func NewResourceEventHandler[T runtime.Object](
	handler Handler[T],
	queue Queue[string],
) *ResourceEventHandler[T]

NewResourceEventHandler creates a new event handler.

func (*ResourceEventHandler[T]) OnAdd

func (r *ResourceEventHandler[T]) OnAdd(obj any, _ bool)

OnAdd is called when an object is added.

func (*ResourceEventHandler[T]) OnDelete

func (r *ResourceEventHandler[T]) OnDelete(obj any)

OnDelete is called when an object is deleted.

func (*ResourceEventHandler[T]) OnUpdate

func (r *ResourceEventHandler[T]) OnUpdate(_, obj any)

OnUpdate is called when an object is updated.

type Retriever

type Retriever[T runtime.Object] interface {
	// List retrieves the list of resources from the API server.
	List(ctx context.Context, options metav1.ListOptions) (T, error)
	// Watch starts watching for changes on the resources from the API server.
	Watch(
		ctx context.Context, options metav1.ListOptions,
	) (watch.Interface, error)
}

Retriever is the retriever interface for the service handling a resource.

func NewRetriever

func NewRetriever[L runtime.Object](
	iface Retriever[L], base *errors.Error,
) Retriever[L]

NewRetriever creates a new retriever adapter for given retriever interface.

type RetryQueue

type RetryQueue[T comparable] struct {
	// contains filtered or unexported fields
}

RetryQueue is a rate limiting queue implementation.

func (*RetryQueue[T]) Add

func (q *RetryQueue[T]) Add(_ context.Context, item T)

Add will add an item to the queue.

func (*RetryQueue[T]) Done

func (q *RetryQueue[T]) Done(_ context.Context, item T)

Done marks the item being used as done.

func (*RetryQueue[T]) Get

func (q *RetryQueue[T]) Get(_ context.Context) (item T, shutdown bool)

Get returns the first item in the queue. If the last item has not been finished, the request will block.

func (*RetryQueue[T]) Len

func (q *RetryQueue[T]) Len(_ context.Context) int

Len returns the size of the queue.

func (*RetryQueue[T]) Name

func (q *RetryQueue[T]) Name() string

Name returns the name of the queue.

func (*RetryQueue[T]) Requeue

func (q *RetryQueue[T]) Requeue(_ context.Context, item T) error

Requeue re-adds the given item to the queue if the max retries has not been reached yet.

func (*RetryQueue[T]) ShutDown

func (q *RetryQueue[T]) ShutDown(_ context.Context)

ShutDown stops the queue from accepting new items.

type Runnable

type Runnable interface {
	// Init initializes the controller runnable by starting the informer and
	// waiting until the cache is synced. This allows to coordinate multiple
	// controllers by initializing all controllers before running their
	// processors.
	Init(ctx context.Context, errch chan error)
	// Run starts the controller runnable by creating the workers that are
	// running the actual event processing.
	Run(ctx context.Context)
}

Runnable is the interface for controller runnables that can be first initialized and than started.

type Runner

type Runner interface {
	// Run runs the given set of controllers using the given context and error
	// channel for reporting errors.
	Run(ctx context.Context, errch chan error, runnables ...Runnable)
}

Runner knows how to run the processing queue.

func NewDefaultRunner

func NewDefaultRunner() Runner

NewDefaultRunner creates a new default runner that just runs the function.

func NewLeaderRunner

func NewLeaderRunner(
	id string, config *LeaderConfig, k8scli kubernetes.Interface,
) Runner

NewLeaderRunner returns a new leader election runner with given unique host identifier that can be used to run a function after acquiring leadership using the Kubernetes leader election mechanism. Make sure to use a unique host identifier for each instance of a leader eleaction runner, e.g. by adding a universal unique identifier to the hostname.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL