controller

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2018 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultScalerInterval = 100 * time.Millisecond

DefaultScalerInterval controls how often to run the scaling strategy.

Variables

This section is empty.

Functions

func DecorateWithDelayAndSmoothing

func DecorateWithDelayAndSmoothing(c Controller)

Types

type Controller

type Controller interface {
	// Run requests that this controller starts doing its job, until an empty struct is sent on the close channel.
	Run(closeCh <-chan struct{})
	// SetScalerInterval changes the interval at which the controller recomputes the required number of replicas for functions.
	// Should not be called once running.
	SetScalingInterval(interval time.Duration)
}

Controller deploys functions by monitoring input lag to registered functions. To do so, it periodically runs some scaling logic and keeps track of (un-)registered functions, topics and deployments.

func New

func New(topicInformer informersV1.TopicInformer,
	functionInformer informersV1.FunctionInformer,
	deploymentInformer informersV1Beta1.DeploymentInformer,
	deployer Deployer,
	tracker LagTracker,
	port int) Controller

New initialises a new function controller, adding event handlers to the provided informers.

type Deployer

type Deployer interface {
	// Deploy requests that a function be initially deployed on k8s.
	Deploy(function *v1.Function) error

	// Undeploy is called when a function is unregistered.
	Undeploy(function *v1.Function) error

	// Update is called when a function is updated. The desired number of replicas of the function is provided.
	Update(function *v1.Function, replicas int) error

	// Scale is used to vary the number of replicas dedicated to a function, including going to zero.
	Scale(function *v1.Function, replicas int) error
}

Deployer allows the realisation of a function on k8s and its subsequent scaling to accommodate more/less load.

func NewDeployer

func NewDeployer(config *rest.Config, brokers []string) (Deployer, error)

type LagTracker

type LagTracker interface {
	// Register a given function for monitoring.
	BeginTracking(Subscription) error

	// Unregister a function for monitoring.
	StopTracking(Subscription) error

	// Compute the current lags for all tracked subscriptions
	Compute() map[Subscription]PartitionedOffsets
}

LagTracker is used to compute how many unprocessed messages each function needs to take care of.

func NewLagTracker

func NewLagTracker(brokers []string) LagTracker

type Offsets

type Offsets struct {
	Current         int64
	End             int64
	PreviousCurrent int64
}

Offsets gives per-partition information about current and end offsets.

func (Offsets) Activity

func (o Offsets) Activity() int64

Activity returns how many messages have been handled since a previous query of the tracker.

func (Offsets) Lag

func (o Offsets) Lag() int64

Lag returns how many messages are available that haven't been handled yet.

func (Offsets) String

func (o Offsets) String() string

type PartitionedOffsets

type PartitionedOffsets map[int32]Offsets

func (PartitionedOffsets) String

func (po PartitionedOffsets) String() string

type Subscription

type Subscription struct {
	Topic string
	Group string
}

Subscription describes a tracked tuple of topic and consumer group.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL