scheduler

package
v0.0.0-...-e560ebb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 13, 2021 License: BSD-3-Clause Imports: 12 Imported by: 0

Documentation

Overview

Package scheduler provides Scheduler, which is an implementation of the quotascheduler algorithm. The algorithm priorities and matches requests to workers, tracks account balances, and ensures consistency between the scheduler's estimate of Request and Worker states and the client-supplied authoritative state.

scheduler.Scheduler is an implementation of the reconciler.Scheduler interface.

See the provided example in this packages godoc or doc_test.go for usage.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"infra/qscheduler/qslib/scheduler"

	"go.chromium.org/luci/common/data/stringset"
)

func HandleAssignments([]*scheduler.Assignment) {}

func IsOn(requestID scheduler.RequestID, workerID scheduler.WorkerID, s *scheduler.Scheduler) {
	fmt.Printf("%s is on %s? %v\n", requestID, workerID, s.IsAssigned(requestID, workerID))
}

func main() {
	ctx := context.Background()

	// Create a scheduler.
	s := scheduler.New(time.Now())

	// Create a quota account with no initial balance.
	accountConfig := scheduler.NewAccountConfig(0, nil, 1, []float32{1, 2, 3}, false, "")
	accountID := scheduler.AccountID("Account1")
	s.AddAccount(ctx, accountID, accountConfig, nil)

	// Update time, causing quota accounts to accumulate quota.
	s.UpdateTime(ctx, time.Now())

	// Create a task request, and add it to the scheduler queue.
	requestID := scheduler.RequestID("Request1")
	request := scheduler.NewTaskRequest(requestID, accountID, stringset.NewFromSlice("Label1"), nil, time.Now())
	s.AddRequest(ctx, request, time.Now(), []string{"tag1", "tag2"}, scheduler.NullEventSink)

	// Inform the scheduler of the existence of an idle worker.
	workerID := scheduler.WorkerID("Worker1")
	s.MarkIdle(ctx, workerID, stringset.NewFromSlice("Label2"), time.Now(), scheduler.NullEventSink)

	// False.
	IsOn(requestID, workerID, s)

	// Run a round of the scheduling algorithm, after updating time and accounts
	// again.
	t := time.Now()
	s.UpdateTime(ctx, t)
	// This will return a match between Request1 and Worker1.
	assignments := s.RunOnce(ctx, scheduler.NullEventSink)

	// True.
	IsOn(requestID, workerID, s)

	// Your code for handling these assignments goes here...
	HandleAssignments(assignments)

	// Update time, causing quota accounts to be charged for their running tasks
	// In this case, Account1 will be charged for Request1 running on Worker1.
	s.UpdateTime(ctx, time.Now())

	// Notify the scheduler that the task has started running on that worker.
	// This is an acknowledgement of the above assignment.
	// Note: the account is already being charged for this task prior to the
	// notification. The notification ensures consistency of request and worker
	// state, but does not affect account state.
	s.NotifyTaskRunning(ctx, "Request1", "Worker1", time.Now(), scheduler.NullEventSink)

	// True.
	IsOn(requestID, workerID, s)

	// Update time, causing quota accounts to again accumulate quota or be charged
	// quota for their running tasks.
	s.UpdateTime(ctx, time.Now())

	// Notifications that contradict the scheduler's state estimate will cause
	// inconsistent records to be deleted from the state.

	// Notify the scheduler that a different task is now running on Worker1,
	// causing records about that worker and previous request to be deleted.
	// Note that this deletion will not affect the current balance of Account1;
	// quota that was spent already on Request1 will not be refunded.
	s.NotifyTaskRunning(ctx, "Request2", "Worker1", time.Now(), scheduler.NullEventSink)

	// False.
	IsOn(requestID, workerID, s)

}
Output:

Request1 is on Worker1? false
Request1 is on Worker1? true
Request1 is on Worker1? true
Request1 is on Worker1? false

Index

Examples

Constants

View Source
const (
	// FreeBucket is the free priority bucket, where jobs may run even if they have
	// no quota account or have an empty quota account.
	FreeBucket Priority = NumPriorities

	// PromoteThreshold is the account balance at which the scheduler will consider
	// promoting jobs.
	PromoteThreshold = 600.0

	// DemoteThreshold is the account balance at which the scheduler will consider
	// demoting jobs.
	DemoteThreshold = -600.0
)
View Source
const NumPriorities = 5

NumPriorities is the number of distinct priority buckets. For performance and code complexity reasons, this is a compile-time constant.

Variables

This section is empty.

Functions

This section is empty.

Types

type AccountConfig

type AccountConfig struct {
	// ChargeRate is the rates (per second) at which per-priority accounts grow.
	//
	// Conceptually this is the time-averaged number of workers that this account
	// may use, at each priority level.
	ChargeRate Balance
	// MaxChargeSeconds is the maximum amount of time over which this account can
	// accumulate quota before hitting its cap.
	//
	// Conceptually this sets the time window over which the time averaged
	// utilization by this account is measured. Very bursty clients will need to
	// use a wider window, whereas very consistent clients will use a narrow one.
	MaxChargeSeconds float32
	// MaxFanout is the maximum number of concurrent paid jobs for each combination
	// of all task labels that this account will pay for (0 = no limit).
	//
	// Additional jobs beyond this may run if there is idle capacity, but they
	// will run in the FreeBucket priority level (except if DisableFreeTasks
	// is true, in which case they will not run).
	MaxFanout int32
	// PerLabelTaskLimits allows for extra limits to be enforced for any given
	// label, e.g. setting label-model:2 ensures that a maximum of two
	// concurrent jobs per model can run under this account.
	PerLabelTaskLimits map[string]int32
	// If DisableFreeTasks is true, then jobs for this account will not start
	// running if they would be run at FreeBucket priority.
	DisableFreeTasks bool
	// Human readable description of account's intended purpose.
	Description string
}

AccountConfig represents a single account's config. It is the native struct version of the AccountConfig proto.

func NewAccountConfig

func NewAccountConfig(fanout int, labelLimits map[string]int32, chargeSeconds float32, chargeRate []float32, disableFreeTasks bool, desc string) *AccountConfig

NewAccountConfig creates a new Config instance.

func NewAccountConfigFromProto

func NewAccountConfigFromProto(c *protos.AccountConfig) *AccountConfig

NewAccountConfigFromProto creates a new Config instance.

type AccountID

type AccountID string

AccountID (a string) identifies an account.

type Assignment

type Assignment struct {
	// Type describes which kind of assignment this represents.
	Type AssignmentType

	// WorkerID of the worker to assign a new task to (and to preempt the previous
	// task of, if this is a AssignmentPreemptWorker mutator).
	WorkerID WorkerID

	// RequestID of the task to assign to that worker.
	RequestID RequestID

	// TaskToAbort is relevant only for the AssignmentPreemptWorker type.
	// It is the request ID of the task that should be preempted.
	TaskToAbort RequestID

	// Priority at which the task will run.
	Priority Priority

	// Time is the time at which this Assignment was determined.
	Time time.Time
}

An Assignment represents a scheduler decision to assign a task to a worker.

type AssignmentType

type AssignmentType int

AssignmentType is an enum of scheduler assignment types.

const (
	// AssignmentIdleWorker indicates assigning a task to a currently idle worker.
	AssignmentIdleWorker AssignmentType = iota

	// AssignmentPreemptWorker indicates preempting a running task on a worker with a new task.
	AssignmentPreemptWorker
)

type Balance

type Balance [NumPriorities]float32

Balance is a vector that represents a cost or account balance.

func (Balance) Add

func (a Balance) Add(b *Balance) Balance

Add returns the sum of two vectors, as a new vector.

func (Balance) Less

func (a Balance) Less(b Balance) bool

Less determines whether Vector a is less than b, based on priority ordered comparison

func (Balance) Sub

func (a Balance) Sub(b *Balance) Balance

Sub returns the difference of two vectors, as a new vector.

type Config

type Config struct {
	// AccountConfigs is a map of per-account AccountConfig.
	AccountConfigs map[AccountID]*AccountConfig

	// DisablePreemption, if true, causes scheduler to never preempt
	// any tasks.
	DisablePreemption bool

	// BotExpiration is the duration after which a bot will no longer be
	// considered idle, if the scheduler doesn't receive any assignment requests
	// for it.
	//
	// If 0 or unspecified, defaults to 300 seconds.
	BotExpiration time.Duration
}

Config represents configuration fields that affect the behavior the quota scheduler pool.

func NewConfig

func NewConfig() *Config

NewConfig creates an returns a new Config instance.

func NewConfigFromProto

func NewConfigFromProto(p *protos.SchedulerConfig) *Config

NewConfigFromProto creates an returns a new Config instance from a proto representation.

func (*Config) ToProto

func (c *Config) ToProto() *protos.SchedulerConfig

ToProto convers a config to proto representation.

type EventSink

type EventSink interface {
	// AddEvent emits a task event to this sink.
	AddEvent(*metrics.TaskEvent)

	// WithFields returns a child sink, that will emit events with the given
	// field overrides.
	WithFields(isCallback bool) EventSink
}

EventSink defines the interface for a class that records scheduler events, for metrics or analytics purposes.

var NullEventSink EventSink = &nullEventSink{}

NullEventSink is a trivial MetricsSink that discards metrics.

type Priority

type Priority int

Priority is a qscheduler priority level.

func BestPriorityFor

func BestPriorityFor(b Balance) Priority

BestPriorityFor determines the highest available priority for a quota account, given its balance.

If the account is out of quota, or if the supplied balance is a nil pointer, then this returns FreeBucket.

type RequestID

type RequestID string

RequestID (a string) identifies a request.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler encapsulates the state and configuration of a running quotascheduler for a single pool, and its methods provide an implementation of the quotascheduler algorithm.

func New

func New(t time.Time) *Scheduler

New returns a newly initialized Scheduler.

func NewFromProto

func NewFromProto(s *protos.Scheduler) *Scheduler

NewFromProto returns a new Scheduler from proto representation.

func NewWithConfig

func NewWithConfig(t time.Time, c *Config) *Scheduler

NewWithConfig returns a newly initialized Scheduler.

func (*Scheduler) AddAccount

func (s *Scheduler) AddAccount(ctx context.Context, id AccountID, config *AccountConfig, initialBalance []float32)

AddAccount creates a new account with the given id, config, and initialBalance (or zero balance if nil).

If an account with that id already exists, then it is overwritten.

func (*Scheduler) AddRequest

func (s *Scheduler) AddRequest(ctx context.Context, request *TaskRequest, t time.Time, tags []string, e EventSink)

AddRequest enqueues a new task request.

func (*Scheduler) Config

func (s *Scheduler) Config() *Config

Config returns the scheduler's config.

func (*Scheduler) DeleteAccount

func (s *Scheduler) DeleteAccount(aid AccountID)

DeleteAccount deletes the given account.

func (*Scheduler) GetBalances

func (s *Scheduler) GetBalances() map[AccountID]Balance

GetBalances returns the account balances.

func (*Scheduler) GetRequest

func (s *Scheduler) GetRequest(rid RequestID) (req *TaskRequest, ok bool)

GetRequest returns the (waiting or running) request for a given ID.

func (*Scheduler) GetWaitingRequests

func (s *Scheduler) GetWaitingRequests() map[RequestID]*TaskRequest

GetWaitingRequests returns the waiting requests.

func (*Scheduler) GetWorkers

func (s *Scheduler) GetWorkers() map[WorkerID]*Worker

GetWorkers returns the known workers.

func (*Scheduler) IsAssigned

func (s *Scheduler) IsAssigned(requestID RequestID, workerID WorkerID) bool

IsAssigned returns whether the given request is currently assigned to the given worker. It is provided for a consistency checks.

func (*Scheduler) MarkIdle

func (s *Scheduler) MarkIdle(ctx context.Context, workerID WorkerID, labels stringset.Set, t time.Time, e EventSink)

MarkIdle marks the given worker as idle, and with the given provisionable, labels, as of the given time. If this call is contradicted by newer knowledge of state, then it does nothing.

Note: calls to MarkIdle come from bot reap calls from swarming.

func (*Scheduler) NotifyTaskAbsent

func (s *Scheduler) NotifyTaskAbsent(ctx context.Context, requestID RequestID, t time.Time, e EventSink)

NotifyTaskAbsent informs the scheduler authoritatively that the given request is stopped (not running on a worker, and not in the queue) at the given time.

Supplied requestID must not be "".

func (*Scheduler) NotifyTaskRunning

func (s *Scheduler) NotifyTaskRunning(ctx context.Context, requestID RequestID, workerID WorkerID, t time.Time, e EventSink)

NotifyTaskRunning informs the scheduler authoritatively that the given task was running on the given worker at the given time.

Supplied requestID and workerID must not be "".

func (*Scheduler) ResetBalance

func (s *Scheduler) ResetBalance(aid AccountID)

ResetBalance resets the given account's balance to 0, if it exists.

func (*Scheduler) RunOnce

func (s *Scheduler) RunOnce(ctx context.Context, e EventSink) []*Assignment

RunOnce performs a single round of the quota scheduler algorithm on a given state and config, and returns a slice of state mutations.

func (*Scheduler) RunningRequests

func (s *Scheduler) RunningRequests() int

RunningRequests gets the number of running task requests.

func (*Scheduler) ToProto

func (s *Scheduler) ToProto() *protos.Scheduler

ToProto returns a proto representation of the state and configuration of Scheduler.

func (*Scheduler) ToSnapshot

func (s *Scheduler) ToSnapshot(poolID string) *Snapshot

ToSnapshot returns the snapshot of the pool's scheduler state.

func (*Scheduler) Unassign

func (s *Scheduler) Unassign(ctx context.Context, requestID RequestID, workerID WorkerID, t time.Time, e EventSink) error

Unassign moves a request that was previously assigned to a worker back to the queue.

This is intended for internal use by reconciler, to heal scheduler state in cases where a worker was assigned a task but never successfully reaped it.

func (*Scheduler) UpdateTime

func (s *Scheduler) UpdateTime(ctx context.Context, t time.Time)

UpdateTime updates the current time for a quotascheduler, and updates quota account balances accordingly, based on running jobs, account policies, and the time elapsed since the last update.

If the provided time is earlier than that last update, this does nothing.

type Snapshot

type Snapshot struct {
	Accounts []*metrics.Account
	Tasks    []*metrics.Task
	Workers  []*metrics.Worker
}

Snapshot represents the scheduler state at a specified timestamp.

type TaskRequest

type TaskRequest struct {
	// ID is the ID of this request.
	ID RequestID

	// AccountID is the id of the account that this request charges to.
	AccountID AccountID

	// EnqueueTime is the time at which the request was enqueued.
	EnqueueTime time.Time

	// ProvisionableLabels is the set of Provisionable Labels for this task.
	ProvisionableLabels []string

	// BaseLabels is the set of base labels for this task.
	BaseLabels []string
	// contains filtered or unexported fields
}

TaskRequest represents a queued or running task TaskRequest.

func NewTaskRequest

func NewTaskRequest(id RequestID, accountID AccountID, provisionableLabels stringset.Set,
	baseLabels stringset.Set, enqueueTime time.Time) *TaskRequest

NewTaskRequest creates a new TaskRequest.

func (*TaskRequest) ConfirmedTime

func (t *TaskRequest) ConfirmedTime() time.Time

ConfirmedTime returns the latest time at which the task request's state was confirmed by source of truth (swarming).

type Worker

type Worker struct {
	// ID is the ID of this worker.
	ID WorkerID

	// Labels represents the set of Labels that this worker possesses.
	Labels stringset.Set
	// contains filtered or unexported fields
}

Worker represents a running or idle Worker capable of running tasks.

func (*Worker) ConfirmedTime

func (w *Worker) ConfirmedTime() time.Time

ConfirmedTime returns the latest time at which the worker's state was confirmed by source of truth (swarming).

func (*Worker) IsIdle

func (w *Worker) IsIdle() bool

IsIdle returns whether the given worker is currently idle.

func (*Worker) RunningPriority

func (w *Worker) RunningPriority() Priority

RunningPriority returns the priority of the currently running request.

func (*Worker) RunningRequest

func (w *Worker) RunningRequest() *TaskRequest

RunningRequest returns the currently running request for this worker.

type WorkerID

type WorkerID string

WorkerID (a string) identifies a worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL