Documentation
¶
Index ¶
- Variables
- func MinMaxScale(...) float64
- type APIMonitor
- type ActivationFunction
- type BaseStep
- func (s *BaseStep[RequestType, Opts]) GetAlias() string
- func (s *BaseStep[RequestType, Opts]) Init(alias string, db db.DB, opts conf.RawOpts) error
- func (s *BaseStep[RequestType, Opts]) PrepareResult(request RequestType) *StepResult
- func (s *BaseStep[RequestType, Opts]) PrepareStats(request PipelineRequest, unit string) StepStatistics
- type EmptyStepOpts
- type MonitoredCallback
- type Pipeline
- type PipelineMonitor
- type PipelineRequest
- type Premodifier
- type Step
- type StepMonitor
- type StepOpts
- type StepResult
- type StepStatistics
- type StepValidator
- func (s *StepValidator[RequestType]) GetAlias() string
- func (s *StepValidator[RequestType]) GetName() string
- func (s *StepValidator[RequestType]) Init(alias string, db db.DB, opts conf.RawOpts) error
- func (s *StepValidator[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*StepResult, error)
- type StepWrapper
- type TelemetryMessage
Constants ¶
This section is empty.
Variables ¶
var ( // This error is returned from the step at any time when the step should be skipped. ErrStepSkipped = errors.New("step skipped") )
Functions ¶
func MinMaxScale ¶
func MinMaxScale(value, lowerBound, upperBound, activationLowerBound, activationUpperBound float64) float64
Min-max scale a value between lower and upper bounds and apply the given activation. Note: the resulting value is clamped between the activation bounds.
Types ¶
type APIMonitor ¶
type APIMonitor struct { // A histogram to measure how long the API requests take to run. ApiRequestsTimer *prometheus.HistogramVec }
Collection of Prometheus metrics to monitor scheduler pipeline
func NewSchedulerMonitor ¶
func NewSchedulerMonitor(registry *monitoring.Registry) APIMonitor
Create a new scheduler monitor and register the necessary Prometheus metrics.
func (*APIMonitor) Callback ¶
func (m *APIMonitor) Callback(w http.ResponseWriter, r *http.Request, pattern string) MonitoredCallback
type ActivationFunction ¶
type ActivationFunction struct{}
Mixin that can be embedded in a step to provide some activation function tooling.
func (*ActivationFunction) Apply ¶
func (m *ActivationFunction) Apply(in, activations map[string]float64) map[string]float64
Apply the activation function to the weights map. All hosts that are not in the activations map are removed.
func (*ActivationFunction) NoEffect ¶
func (m *ActivationFunction) NoEffect() float64
Get activations that will have no effect on the host.
func (*ActivationFunction) Norm ¶
func (m *ActivationFunction) Norm(activation float64) float64
Normalize a single value using the activation function.
type BaseStep ¶
type BaseStep[RequestType PipelineRequest, Opts StepOpts] struct { // Options to pass via yaml to this step. conf.JsonOpts[Opts] // The activation function to use. ActivationFunction // Database connection. DB db.DB // The alias of this step, if any. Alias string }
Common base for all steps that provides some functionality that would otherwise be duplicated across all steps.
func (*BaseStep[RequestType, Opts]) PrepareResult ¶
func (s *BaseStep[RequestType, Opts]) PrepareResult(request RequestType) *StepResult
Get a default result (no action) for the input weight keys given in the request.
func (*BaseStep[RequestType, Opts]) PrepareStats ¶
func (s *BaseStep[RequestType, Opts]) PrepareStats(request PipelineRequest, unit string) StepStatistics
Get default statistics for the input weight keys given in the request.
type EmptyStepOpts ¶
type EmptyStepOpts struct{}
Empty step opts conforming to the StepOpts interface (validation always succeeds).
func (EmptyStepOpts) Validate ¶
func (EmptyStepOpts) Validate() error
type MonitoredCallback ¶
type MonitoredCallback struct {
// contains filtered or unexported fields
}
Helper to respond to the request with the given code and error. Adds monitoring for the time it took to handle the request.
type Pipeline ¶
type Pipeline[RequestType PipelineRequest] interface { // Run the scheduling pipeline with the given request. Run(request RequestType) ([]string, error) }
func NewPipeline ¶
func NewPipeline[RequestType PipelineRequest]( supportedSteps map[string]func() Step[RequestType], confedSteps []conf.SchedulerStepConfig, stepWrappers []StepWrapper[RequestType], database db.DB, monitor PipelineMonitor, mqttClient mqtt.Client, mqttTopic string, ) Pipeline[RequestType]
Create a new pipeline with steps contained in the configuration.
type PipelineMonitor ¶
type PipelineMonitor struct { // The pipeline name is used to differentiate between different pipelines. PipelineName string // contains filtered or unexported fields }
Collection of Prometheus metrics to monitor scheduler pipeline
func NewPipelineMonitor ¶
func NewPipelineMonitor(registry *monitoring.Registry) PipelineMonitor
Create a new scheduler monitor and register the necessary Prometheus metrics.
func (PipelineMonitor) SubPipeline ¶
func (m PipelineMonitor) SubPipeline(name string) PipelineMonitor
Get a copied pipeline monitor with the name set, after binding the metrics.
type PipelineRequest ¶
type PipelineRequest interface { // Get the subjects that went in the pipeline. GetSubjects() []string // Get the weights for the subjects. GetWeights() map[string]float64 // Get logging args to be used in the step's trace log. // Usually, this will be the request context including the request ID. GetTraceLogArgs() []slog.Attr // Return the selected pipeline. GetPipeline() string // Return a copy of the request with the pipeline name set. WithPipeline(pipeline string) PipelineRequest }
type Premodifier ¶
type Premodifier[RequestType PipelineRequest] interface { // Modify the request before it is sent to the pipeline. ModifyRequest(request *RequestType) error }
type Step ¶
type Step[RequestType PipelineRequest] interface { // Configure the step with a database and options. Init(alias string, db db.DB, opts conf.RawOpts) error // Run this step of the scheduling pipeline. // Return a map of keys to activation values. Important: keys that are // not in the map are considered as filtered out. // Provide a traceLog that contains the global request id and should // be used to log the step's execution. Run(traceLog *slog.Logger, request RequestType) (*StepResult, error) // Get the name of this step. // The name is used to identify the step in metrics, config, logs, and more. // Should be something like: "my_cool_scheduler_step". GetName() string // Get the alias of this step. GetAlias() string }
Interface for a scheduler step.
type StepMonitor ¶
type StepMonitor[RequestType PipelineRequest] struct { // Mixin that can be embedded in a step to provide some activation function tooling. ActivationFunction // The wrapped scheduler step to monitor. Step Step[RequestType] // contains filtered or unexported fields }
Wraps a scheduler step to monitor its execution.
func MonitorStep ¶
func MonitorStep[RequestType PipelineRequest](step Step[RequestType], m PipelineMonitor) *StepMonitor[RequestType]
Schedule using the wrapped step and measure the time it takes.
func (*StepMonitor[RequestType]) GetAlias ¶
func (s *StepMonitor[RequestType]) GetAlias() string
Get the alias of the wrapped step.
func (*StepMonitor[RequestType]) GetName ¶
func (s *StepMonitor[RequestType]) GetName() string
Get the name of the wrapped step.
func (*StepMonitor[RequestType]) Run ¶
func (s *StepMonitor[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*StepResult, error)
Run the step and observe its execution.
type StepOpts ¶
type StepOpts interface { // Validate the options for this step. Validate() error }
Interface to which step options must conform.
type StepResult ¶
type StepResult struct { // The activations calculated by this step. Activations map[string]float64 // Step statistics like: // // { // "max cpu contention": { // "unit": "cpu contention [%]", // "hosts": { "host 1": 10, "host 2": 10 } // }, // "noisy projects": { // "unit": "vms of this project running on host [#]", // "hosts": { "host 1": 1, "host 2": 0 } // } // } // // These statistics are used to display the step's effect on the hosts. // For example: max cpu contention: before [ 100%, 50%, 40% ], after [ 40%, 50%, 100% ] Statistics map[string]StepStatistics }
type StepStatistics ¶
type StepValidator ¶
type StepValidator[RequestType PipelineRequest] struct { // The wrapped step to validate. Step Step[RequestType] // By default, we execute all validations. However, through the config, // we can also disable some validations if necessary. DisabledValidations disabledValidations }
Wrapper for scheduler steps that validates them before/after execution.
func ValidateStep ¶
func ValidateStep[RequestType PipelineRequest](step Step[RequestType], disabledValidations disabledValidations) *StepValidator[RequestType]
Validate the wrapped step with the database and options.
func (*StepValidator[RequestType]) GetAlias ¶
func (s *StepValidator[RequestType]) GetAlias() string
Get the alias of the wrapped step.
func (*StepValidator[RequestType]) GetName ¶
func (s *StepValidator[RequestType]) GetName() string
Get the name of the wrapped step.
func (*StepValidator[RequestType]) Init ¶
Initialize the wrapped step with the database and options.
func (*StepValidator[RequestType]) Run ¶
func (s *StepValidator[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*StepResult, error)
Run the step and validate what happens.
type StepWrapper ¶
type StepWrapper[RequestType PipelineRequest] func(Step[RequestType], conf.SchedulerStepConfig) Step[RequestType]
type TelemetryMessage ¶
type TelemetryMessage[RequestType PipelineRequest] struct { Time int64 `json:"time"` Request RequestType `json:"request"` Order []string `json:"order"` In map[string]float64 `json:"in"` Steps map[string]map[string]float64 `json:"steps"` Out map[string]float64 `json:"out"` }
Telemetry message as will be published to the mqtt broker.