scheduler

package
v0.0.0-...-e5dd5f4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// This error is returned from the step at any time when the step should be skipped.
	ErrStepSkipped = errors.New("step skipped")
)

Functions

func MinMaxScale

func MinMaxScale(value, lowerBound, upperBound, activationLowerBound, activationUpperBound float64) float64

Min-max scale a value between lower and upper bounds and apply the given activation. Note: the resulting value is clamped between the activation bounds.

Types

type APIMonitor

type APIMonitor struct {
	// A histogram to measure how long the API requests take to run.
	ApiRequestsTimer *prometheus.HistogramVec
}

Collection of Prometheus metrics to monitor scheduler pipeline

func NewSchedulerMonitor

func NewSchedulerMonitor(registry *monitoring.Registry) APIMonitor

Create a new scheduler monitor and register the necessary Prometheus metrics.

func (*APIMonitor) Callback

func (m *APIMonitor) Callback(w http.ResponseWriter, r *http.Request, pattern string) MonitoredCallback

type ActivationFunction

type ActivationFunction struct{}

Mixin that can be embedded in a step to provide some activation function tooling.

func (*ActivationFunction) Apply

func (m *ActivationFunction) Apply(in, activations map[string]float64) map[string]float64

Apply the activation function to the weights map. All hosts that are not in the activations map are removed.

func (*ActivationFunction) NoEffect

func (m *ActivationFunction) NoEffect() float64

Get activations that will have no effect on the host.

func (*ActivationFunction) Norm

func (m *ActivationFunction) Norm(activation float64) float64

Normalize a single value using the activation function.

type BaseStep

type BaseStep[RequestType PipelineRequest, Opts StepOpts] struct {
	// Options to pass via yaml to this step.
	conf.JsonOpts[Opts]
	// The activation function to use.
	ActivationFunction
	// Database connection.
	DB db.DB
	// The alias of this step, if any.
	Alias string
}

Common base for all steps that provides some functionality that would otherwise be duplicated across all steps.

func (*BaseStep[RequestType, Opts]) GetAlias

func (s *BaseStep[RequestType, Opts]) GetAlias() string

Get the alias of this step.

func (*BaseStep[RequestType, Opts]) Init

func (s *BaseStep[RequestType, Opts]) Init(alias string, db db.DB, opts conf.RawOpts) error

Init the step with the database and options.

func (*BaseStep[RequestType, Opts]) PrepareResult

func (s *BaseStep[RequestType, Opts]) PrepareResult(request RequestType) *StepResult

Get a default result (no action) for the input weight keys given in the request.

func (*BaseStep[RequestType, Opts]) PrepareStats

func (s *BaseStep[RequestType, Opts]) PrepareStats(request PipelineRequest, unit string) StepStatistics

Get default statistics for the input weight keys given in the request.

type EmptyStepOpts

type EmptyStepOpts struct{}

Empty step opts conforming to the StepOpts interface (validation always succeeds).

func (EmptyStepOpts) Validate

func (EmptyStepOpts) Validate() error

type MonitoredCallback

type MonitoredCallback struct {
	// contains filtered or unexported fields
}

Helper to respond to the request with the given code and error. Adds monitoring for the time it took to handle the request.

func (MonitoredCallback) Respond

func (c MonitoredCallback) Respond(code int, err error, text string)

Respond to the request with the given code and error. Also log the time it took to handle the request.

type Pipeline

type Pipeline[RequestType PipelineRequest] interface {
	// Run the scheduling pipeline with the given request.
	Run(request RequestType) ([]string, error)
}

func NewPipeline

func NewPipeline[RequestType PipelineRequest](
	supportedSteps map[string]func() Step[RequestType],
	confedSteps []conf.SchedulerStepConfig,
	stepWrappers []StepWrapper[RequestType],
	database db.DB,
	monitor PipelineMonitor,
	mqttClient mqtt.Client,
	mqttTopic string,
) Pipeline[RequestType]

Create a new pipeline with steps contained in the configuration.

type PipelineMonitor

type PipelineMonitor struct {
	// The pipeline name is used to differentiate between different pipelines.
	PipelineName string
	// contains filtered or unexported fields
}

Collection of Prometheus metrics to monitor scheduler pipeline

func NewPipelineMonitor

func NewPipelineMonitor(registry *monitoring.Registry) PipelineMonitor

Create a new scheduler monitor and register the necessary Prometheus metrics.

func (PipelineMonitor) SubPipeline

func (m PipelineMonitor) SubPipeline(name string) PipelineMonitor

Get a copied pipeline monitor with the name set, after binding the metrics.

type PipelineRequest

type PipelineRequest interface {
	// Get the subjects that went in the pipeline.
	GetSubjects() []string
	// Get the weights for the subjects.
	GetWeights() map[string]float64

	// Get logging args to be used in the step's trace log.
	// Usually, this will be the request context including the request ID.
	GetTraceLogArgs() []slog.Attr

	// Return the selected pipeline.
	GetPipeline() string
	// Return a copy of the request with the pipeline name set.
	WithPipeline(pipeline string) PipelineRequest
}

type Premodifier

type Premodifier[RequestType PipelineRequest] interface {
	// Modify the request before it is sent to the pipeline.
	ModifyRequest(request *RequestType) error
}

type Step

type Step[RequestType PipelineRequest] interface {
	// Configure the step with a database and options.
	Init(alias string, db db.DB, opts conf.RawOpts) error
	// Run this step of the scheduling pipeline.
	// Return a map of keys to activation values. Important: keys that are
	// not in the map are considered as filtered out.
	// Provide a traceLog that contains the global request id and should
	// be used to log the step's execution.
	Run(traceLog *slog.Logger, request RequestType) (*StepResult, error)
	// Get the name of this step.
	// The name is used to identify the step in metrics, config, logs, and more.
	// Should be something like: "my_cool_scheduler_step".
	GetName() string
	// Get the alias of this step.
	GetAlias() string
}

Interface for a scheduler step.

type StepMonitor

type StepMonitor[RequestType PipelineRequest] struct {
	// Mixin that can be embedded in a step to provide some activation function tooling.
	ActivationFunction

	// The wrapped scheduler step to monitor.
	Step Step[RequestType]
	// contains filtered or unexported fields
}

Wraps a scheduler step to monitor its execution.

func MonitorStep

func MonitorStep[RequestType PipelineRequest](step Step[RequestType], m PipelineMonitor) *StepMonitor[RequestType]

Schedule using the wrapped step and measure the time it takes.

func (*StepMonitor[RequestType]) GetAlias

func (s *StepMonitor[RequestType]) GetAlias() string

Get the alias of the wrapped step.

func (*StepMonitor[RequestType]) GetName

func (s *StepMonitor[RequestType]) GetName() string

Get the name of the wrapped step.

func (*StepMonitor[RequestType]) Init

func (s *StepMonitor[RequestType]) Init(alias string, db db.DB, opts conf.RawOpts) error

Initialize the wrapped step with the database and options.

func (*StepMonitor[RequestType]) Run

func (s *StepMonitor[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*StepResult, error)

Run the step and observe its execution.

type StepOpts

type StepOpts interface {
	// Validate the options for this step.
	Validate() error
}

Interface to which step options must conform.

type StepResult

type StepResult struct {
	// The activations calculated by this step.
	Activations map[string]float64

	// Step statistics like:
	//
	//	{
	//	  "max cpu contention": {
	//	     "unit": "cpu contention [%]",
	//	     "hosts": { "host 1": 10, "host 2": 10 }
	//	   },
	//	  "noisy projects": {
	//	     "unit": "vms of this project running on host [#]",
	//	     "hosts": { "host 1": 1, "host 2": 0 }
	//	   }
	//	}
	//
	// These statistics are used to display the step's effect on the hosts.
	// For example: max cpu contention: before [ 100%, 50%, 40% ], after [ 40%, 50%, 100% ]
	Statistics map[string]StepStatistics
}

type StepStatistics

type StepStatistics struct {
	// The unit of the statistic.
	Unit string
	// The subjects and their values.
	Subjects map[string]float64
}

type StepValidator

type StepValidator[RequestType PipelineRequest] struct {
	// The wrapped step to validate.
	Step Step[RequestType]
	// By default, we execute all validations. However, through the config,
	// we can also disable some validations if necessary.
	DisabledValidations disabledValidations
}

Wrapper for scheduler steps that validates them before/after execution.

func ValidateStep

func ValidateStep[RequestType PipelineRequest](step Step[RequestType], disabledValidations disabledValidations) *StepValidator[RequestType]

Validate the wrapped step with the database and options.

func (*StepValidator[RequestType]) GetAlias

func (s *StepValidator[RequestType]) GetAlias() string

Get the alias of the wrapped step.

func (*StepValidator[RequestType]) GetName

func (s *StepValidator[RequestType]) GetName() string

Get the name of the wrapped step.

func (*StepValidator[RequestType]) Init

func (s *StepValidator[RequestType]) Init(alias string, db db.DB, opts conf.RawOpts) error

Initialize the wrapped step with the database and options.

func (*StepValidator[RequestType]) Run

func (s *StepValidator[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*StepResult, error)

Run the step and validate what happens.

type StepWrapper

type StepWrapper[RequestType PipelineRequest] func(Step[RequestType], conf.SchedulerStepConfig) Step[RequestType]

type TelemetryMessage

type TelemetryMessage[RequestType PipelineRequest] struct {
	Time    int64                         `json:"time"`
	Request RequestType                   `json:"request"`
	Order   []string                      `json:"order"`
	In      map[string]float64            `json:"in"`
	Steps   map[string]map[string]float64 `json:"steps"`
	Out     map[string]float64            `json:"out"`
}

Telemetry message as will be published to the mqtt broker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL