reconciler

package
v0.0.0-...-e560ebb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 13, 2021 License: BSD-3-Clause Imports: 8 Imported by: 0

Documentation

Overview

Package reconciler provides a wrapper around a global state scheduler to be used by a per-worker pulling dispatcher.

The primary scheduler.Scheduler implementation intended to be used by reconciler is the quotascheduler algorithm as implemented in qslib/scheduler. The primary dispatcher client is intended to be swarming.

The reconciler tracks the queue of actions for workers that have pending actions (both those in the most recent pull call from client, and those not). For each worker, reconciler holds actions in the queue until they are acknowledged, and orchestrates task preemption.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"infra/qscheduler/qslib/reconciler"
	"infra/qscheduler/qslib/scheduler"

	"go.chromium.org/luci/common/data/stringset"
)

func main() {
	ctx := context.Background()

	// Create a scheduler and reconciler.
	s := scheduler.New(time.Now())
	r := reconciler.New()

	// Notify the reconciler of a newly enqueued task request.
	requestID := scheduler.RequestID("Request1")
	accountID := scheduler.AccountID("Account1")
	labels := stringset.NewFromSlice("label1")
	t := time.Now()
	waitRequest := &reconciler.TaskWaitingRequest{
		AccountID:           accountID,
		RequestID:           requestID,
		ProvisionableLabels: labels,
		EnqueueTime:         t,
		Time:                t,
	}
	r.NotifyTaskWaiting(ctx, s, scheduler.NullEventSink, waitRequest)

	// Notify the reconciler of a new idle worker, and fetch an assignment
	// for it. This will fetch Request1 to run on it.
	workerID := scheduler.WorkerID("Worker1")
	idleWorker := &reconciler.IdleWorker{ID: workerID, Labels: labels}
	a := r.AssignTasks(ctx, s, time.Now(), scheduler.NullEventSink, idleWorker)

	fmt.Printf("%s was assigned %s.\n", a[0].WorkerID, a[0].RequestID)

	// A subsequent call for this worker will return the same task,
	// because the previous assignment has not yet been acknowledged.
	a = r.AssignTasks(ctx, s, time.Now(), scheduler.NullEventSink, idleWorker)

	fmt.Printf("%s was again assigned %s.\n", a[0].WorkerID, a[0].RequestID)

	// Acknowledge the that request is running on the worker.
	runningRequest := &reconciler.TaskRunningRequest{
		RequestID: requestID,
		WorkerID:  workerID,
		Time:      time.Now(),
	}
	r.NotifyTaskRunning(ctx, s, scheduler.NullEventSink, runningRequest)

	// Now, a subsequent AssignTasks call for this worker will return
	// nothing, as there are no other tasks in the queue.
	a = r.AssignTasks(ctx, s, time.Now(), scheduler.NullEventSink, idleWorker)
	fmt.Printf("After ACK, there were %d new assignments.\n", len(a))

}
Output:

Worker1 was assigned Request1.
Worker1 was again assigned Request1.
After ACK, there were 0 new assignments.

Index

Examples

Constants

View Source
const WorkerQueueTimeout = time.Duration(10) * time.Minute

WorkerQueueTimeout is the time after which a task will return to the queue if it was assigned to a worker but the worker never picked it up.

Variables

This section is empty.

Functions

This section is empty.

Types

type Assignment

type Assignment struct {
	// WorkerID is the ID the worker that is being assigned a task.
	WorkerID scheduler.WorkerID

	// RequestID is the ID of the task request that is being assigned.
	RequestID scheduler.RequestID

	// ProvisionRequired indicates whether the worker needs to be provisioned (in other
	// words, it is true if the worker does not possess the request's provisionable
	// labels.)
	ProvisionRequired bool
}

Assignment represents a scheduler-initated operation to assign a task to a worker.

type Cancellation

type Cancellation struct {
	// WorkerID is the id the worker where we should cancel a task.
	WorkerID string

	// RequestID is the id of the task that we should request.
	RequestID string

	// ErrorMessage is a description of the error that caused the task to be
	// cancelled, if it was cancelled due to error.
	ErrorMessage string
}

Cancellation represents a scheduler-initated operation to cancel a task on a worker. The worker should be aborted if and only if it is currently running the given task.

TODO: Consider unifying this with Assignment, since it is in fact the same content.

type IdleWorker

type IdleWorker struct {
	// ID is the ID of the idle worker.
	ID scheduler.WorkerID

	// Labels is the set of labels of the idle worker.
	Labels stringset.Set
}

IdleWorker represents a worker that is idle and wants to have a task assigned.

type State

type State struct {
	// contains filtered or unexported fields
}

State is the state of a reconciler.

func New

func New() *State

New returns a new initialized State instance.

func NewFromProto

func NewFromProto(proto *protos.Reconciler) *State

NewFromProto returns a new State instance from a proto representation.

func (*State) AddTaskError

func (state *State) AddTaskError(requestID scheduler.RequestID, err error)

AddTaskError marks a given task as having failed due to an error, and in need of cancellation.

func (*State) AssignTasks

func (state *State) AssignTasks(ctx context.Context, s *scheduler.Scheduler, t time.Time, events scheduler.EventSink, workers ...*IdleWorker) []Assignment

AssignTasks accepts one or more idle workers, and returns tasks to be assigned to those workers (if there are tasks available).

func (*State) Cancellations

func (state *State) Cancellations(ctx context.Context) []Cancellation

Cancellations returns the set of workers and tasks that should be cancelled.

func (*State) NotifyTaskAbsent

func (state *State) NotifyTaskAbsent(ctx context.Context, s *scheduler.Scheduler, events scheduler.EventSink, update *TaskAbsentRequest)

NotifyTaskAbsent informs the quotascheduler about an absent task.

func (*State) NotifyTaskRunning

func (state *State) NotifyTaskRunning(ctx context.Context, s *scheduler.Scheduler, events scheduler.EventSink, update *TaskRunningRequest)

NotifyTaskRunning informs the quotascheduler about a running task.

func (*State) NotifyTaskWaiting

func (state *State) NotifyTaskWaiting(ctx context.Context, s *scheduler.Scheduler, events scheduler.EventSink, update *TaskWaitingRequest)

NotifyTaskWaiting informs the quotascheduler about a waiting task.

func (*State) ToProto

func (state *State) ToProto() *protos.Reconciler

ToProto converts a reconciler state to proto representation.

type TaskAbsentRequest

type TaskAbsentRequest struct {
	// RequestID of the request that is running.
	RequestID scheduler.RequestID

	// Time at which the task was running.
	Time time.Time

	// WorkerID of the worker that is running the task.
	WorkerID scheduler.WorkerID
}

TaskAbsentRequest encapsulates the arguments to NotifyTaskAbsent.

type TaskRunningRequest

type TaskRunningRequest struct {
	// RequestID of the request that is running.
	RequestID scheduler.RequestID

	// Time at which the task was running.
	Time time.Time

	// WorkerID of the worker that is running the task.
	WorkerID scheduler.WorkerID
}

TaskRunningRequest encapsulates the arguments to NotifyTaskRunning.

type TaskWaitingRequest

type TaskWaitingRequest struct {
	// AccountID for the request.
	AccountID scheduler.AccountID

	// BaseLabels of the request that is waiting.
	BaseLabels stringset.Set

	// Time at which the task was first enqueued.
	EnqueueTime time.Time

	// ProvisionableLabels of the request that is waiting.
	ProvisionableLabels stringset.Set

	// RequestID of the request that is waiting.
	RequestID scheduler.RequestID

	// Tags is the set of tags for the request.
	Tags []string

	// Time at which the task was waiting.
	Time time.Time
}

TaskWaitingRequest encapsulates the arguments to NotifyTaskWaiting.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL