run

package
v0.0.0-...-211e212 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2021 License: GPL-3.0 Imports: 13 Imported by: 0

Documentation

Overview

Package run contains machinery for working with runnable tasks and objects which control tasks.

Index

Constants

This section is empty.

Variables

View Source
var ErrNoAgentsRetry = errors.New("No agents available to handle the request; retrying")
View Source
var ErrNoAgentsRunLocal = errors.New("No agents available to handle the request; running locally")
View Source
var (
	ErrUnsupportedTask = errors.New("Task not supported")
)

Functions

func IsCompilerError

func IsCompilerError(err error) bool

func RunAsync

func RunAsync(t Task) <-chan error

RunAsync will run the given task and return a channel which will eventually contain the task's error value, then the channel will be closed. Tasks are responsible for their own cancellation. If a task should be canceled, it should take the necessary actions and return the error context.Canceled.

func RunWait

func RunWait(t Task) error

RunWait will run the task, wait for it to complete, then return its error.

Types

type ArgParser

type ArgParser interface {
	Parse()
	CanRunRemote() bool
	DeepCopy() ArgParser
}

ArgParser is a high-level interface that represents an object capable of parsing a set of command-line arguments, and determining whether the associated request can be run remotely based on the arguments given. The concrete object should manage its own data, but should not perform any parsing until the Parse function is called. CanRunRemote will always be called after Parse.

type CompilerError

type CompilerError struct {
	// contains filtered or unexported fields
}

func NewCompilerError

func NewCompilerError(text string) *CompilerError

func (*CompilerError) Error

func (e *CompilerError) Error() string

type Controller

type Controller interface {
	// With transforms the Controller into a ToolchainController by providing
	// it with a concrete toolchain instance.
	With(*types.Toolchain) ToolchainController
}

Controller represents an object that can control requests for a particular toolchain, when provided with a concrete instance of such a toolchain with parameters set correctly for the host.

type DelegatingExecutor

type DelegatingExecutor struct {
	// contains filtered or unexported fields
}

DelegatingExecutor is an executor that does not run a worker pool, runs all tasks as soon as possible, and is always available. It will report that all of its tasks are Delegated, and will not report counts for queued or running tasks.

func NewDelegatingExecutor

func NewDelegatingExecutor() *DelegatingExecutor

func (*DelegatingExecutor) CompleteTaskStatus

func (x *DelegatingExecutor) CompleteTaskStatus(stat *metrics.TaskStatus)

func (*DelegatingExecutor) CompleteUsageLimits

func (x *DelegatingExecutor) CompleteUsageLimits(stat *metrics.UsageLimits)

func (*DelegatingExecutor) Exec

func (x *DelegatingExecutor) Exec(task Task) error

func (*DelegatingExecutor) ExecAsync

func (x *DelegatingExecutor) ExecAsync(task Task) <-chan error

type ExecCommandTask

type ExecCommandTask struct {
	TaskOptions
	util.NullableError
	// contains filtered or unexported fields
}

func (*ExecCommandTask) Run

func (t *ExecCommandTask) Run()

type Executor

type Executor interface {
	metrics.UsageLimitsCompleter
	metrics.TaskStatusCompleter
	Exec(task Task) error
}

An Executor is an object which runs tasks.

type ExecutorOption

type ExecutorOption func(*ExecutorOptions)

func WithUsageLimits

func WithUsageLimits(cfg *metrics.UsageLimits) ExecutorOption

type ExecutorOptions

type ExecutorOptions struct {
	UsageLimits *metrics.UsageLimits
}

func (*ExecutorOptions) Apply

func (o *ExecutorOptions) Apply(opts ...ExecutorOption)

type ExecutorStatus

type ExecutorStatus int

type NoRunnerForKind

type NoRunnerForKind struct{}

func (NoRunnerForKind) Error

func (e NoRunnerForKind) Error() string

type PackagedRequest

type PackagedRequest struct {
	util.NullableError
	// contains filtered or unexported fields
}

PackagedRequest is a runnable closure which can invoke a RequestManager's Process method with the provided arguments when desired.

func PackageRequest

func PackageRequest(
	rm RequestManager,
	ctx PairContext,
	request interface{},
) PackagedRequest

PackageRequest creates a PackagedRequest object and returns it. It does not run the request.

func (*PackagedRequest) Response

func (pr *PackagedRequest) Response() chan interface{}

func (*PackagedRequest) Run

func (pr *PackagedRequest) Run()

Invoke will run the PackagedRequest by calling the packaged RequestManager's Process method with the arguments given at the time of its creation.

type PairContext

type PairContext struct {
	ServerContext context.Context
	ClientContext context.Context
}

PairContext is a pair of client and server contexts.

func (PairContext) Deadline

func (pc PairContext) Deadline() (deadline time.Time, ok bool)

func (PairContext) Done

func (pc PairContext) Done() <-chan struct{}

func (PairContext) Err

func (pc PairContext) Err() error

func (PairContext) Value

func (pc PairContext) Value(key interface{}) interface{}

type ProcessOptions

type ProcessOptions struct {
	Stdout  io.Writer
	Stderr  io.Writer
	Stdin   io.Reader
	Env     []string
	Args    []string
	WorkDir string
	UID     uint32
	GID     uint32
}

type QueuedExecutor

type QueuedExecutor struct {
	ExecutorOptions
	// contains filtered or unexported fields
}

func NewQueuedExecutor

func NewQueuedExecutor(opts ...ExecutorOption) *QueuedExecutor

func (*QueuedExecutor) CompleteTaskStatus

func (x *QueuedExecutor) CompleteTaskStatus(stat *metrics.TaskStatus)

func (*QueuedExecutor) CompleteUsageLimits

func (x *QueuedExecutor) CompleteUsageLimits(stat *metrics.UsageLimits)

func (*QueuedExecutor) Exec

func (x *QueuedExecutor) Exec(task Task) error

func (*QueuedExecutor) ExecAsync

func (x *QueuedExecutor) ExecAsync(task Task) <-chan error

func (*QueuedExecutor) SetUsageLimits

func (x *QueuedExecutor) SetUsageLimits(cfg *metrics.UsageLimits)

type RequestManager

type RequestManager interface {
	// Process consumes the request and blocks until it is complete, returning
	// a matching response object and an error. Errors returned from this function
	// do not indicate the request has failed (where "failure" is specific to and
	// defined by the request itself), rather it would indicate that the request
	// could not be completed, either due to a network error, an internal error,
	// or a similar issue. Responses should encode success or failure within
	// the response type itself.
	Process(ctx PairContext, request interface{}) (response interface{}, err error)
}

RequestManager represents an entity that is responsible for the entire lifecycle of a request (right now either a RunRequest or CompileRequest) by creating and running tasks.

type Resizer

type Resizer interface {
	// Resize should set the target number of contained resources to the given
	// value. It should block until the resize operation is complete.
	Resize(int64)
}

A Resizer is an object that is able to dynamically resize its contained resources.

type ResizerManager

type ResizerManager interface {
	Manage(Resizer)
}

A ResizerManager is an object that can "take ownership" of a Resizer and be expected to manage its resource count.

type ResultOptions

type ResultOptions struct {
	OutputWriter io.Writer
	OutputVar    interface{}
	NoTempFile   bool
}

type SchedulerClientStream

type SchedulerClientStream interface {
	LoadNewStream(types.Scheduler_StreamOutgoingTasksClient)
	Compile(*types.CompileRequest) (*types.CompileResponse, error)
}

type StoreAddFunc

type StoreAddFunc func(*ToolchainRunnerStore)

type Task

type Task interface {
	// Run will run the task. It will block until the task is completed.
	Run()
	// Err will return the task's error value once it has completed.
	// If called before Run() returns, it should panic.
	Err() error
}

Task represents a single runnable action.

func NewExecCommandTask

func NewExecCommandTask(tc *types.Toolchain, opts ...TaskOption) Task

type TaskOption

type TaskOption func(*TaskOptions)

func InPlace

func InPlace(inPlace bool) TaskOption

func WithArgs

func WithArgs(args []string) TaskOption

func WithContext

func WithContext(ctx context.Context) TaskOption

func WithEnv

func WithEnv(env []string) TaskOption

func WithLog

func WithLog(lg *zap.SugaredLogger) TaskOption

func WithOutputStreams

func WithOutputStreams(stdout, stderr io.Writer) TaskOption

func WithOutputVar

func WithOutputVar(v interface{}) TaskOption

func WithOutputWriter

func WithOutputWriter(w io.Writer) TaskOption

func WithStdin

func WithStdin(stdin io.Reader) TaskOption

func WithUidGid

func WithUidGid(uid, gid uint32) TaskOption

func WithWorkDir

func WithWorkDir(dir string) TaskOption

type TaskOptions

type TaskOptions struct {
	ProcessOptions
	ResultOptions

	Context context.Context
	Log     *zap.SugaredLogger
}

func (*TaskOptions) Apply

func (o *TaskOptions) Apply(opts ...TaskOption)

type ToolchainController

type ToolchainController interface {
	// RunLocal returns a RequestManager which can handle a request locally
	// without dispatching any or all of its tasks to a remote agent.
	RunLocal(ArgParser) RequestManager
	// SendRemote returns a RequestManager which will dispatch some or all of
	// its tasks to be processed by a remote agent. This RequestManager is
	// only responsible for local tasks associated with the request (if any),
	// and sending/waiting on tasks using the provided client.
	SendRemote(ArgParser, SchedulerClientStream) RequestManager
	// RecvRemote returns a RequestManager which will run its tasks locally
	// under the assumption that it is running them on behalf of a consumer
	// somewhere else on the network.
	RecvRemote() RequestManager
	// NewArgParser returns a new concrete ArgParser for this toolchain.
	NewArgParser(ctx context.Context, args []string) ArgParser
}

A ToolchainController is an object capable of managing the entire lifecycle of requests for a given toolchain.

type ToolchainRunnerStore

type ToolchainRunnerStore struct {
	// contains filtered or unexported fields
}

func NewToolchainRunnerStore

func NewToolchainRunnerStore() *ToolchainRunnerStore

func (*ToolchainRunnerStore) Add

func (*ToolchainRunnerStore) Get

type UpstreamQueue

type UpstreamQueue interface {
	Select() (Task, bool)
}

func SingularQueue

func SingularQueue(ch chan Task) UpstreamQueue

type WorkerPool

type WorkerPool struct {
	*util.PauseController
	// contains filtered or unexported fields
}

WorkerPool is a dynamic pool of worker goroutines which run on a shared task queue. The number of workers can be changed at any time, and the stream itself can be paused and unpaused, which can be used to temporarily stop/start all workers.

func NewWorkerPool

func NewWorkerPool(taskQueue UpstreamQueue, opts ...WorkerPoolOption) *WorkerPool

NewWorkerPool creates a new WorkerPool with the provided task queue.

func (*WorkerPool) Resize

func (wp *WorkerPool) Resize(count int64)

Resize sets the target number of workers that should be running in the pool. When decreasing the number of workers, only workers which are not currently running a task will be stopped. If all workers are busy, the pool will stop the next available workers when they have finished their current task.

func (*WorkerPool) Size

func (wp *WorkerPool) Size() int64

type WorkerPoolOption

type WorkerPoolOption func(*WorkerPoolOptions)

func DefaultPaused

func DefaultPaused() WorkerPoolOption

DefaultPaused indicates the worker pool should start in the paused state. This should be used instead of starting the pool and immediately pausing it to avoid race conditions.

func WithRunner

func WithRunner(f func(Task)) WorkerPoolOption

WithRunner sets the function that will be run by a worker when processing a task.

type WorkerPoolOptions

type WorkerPoolOptions struct {
	// contains filtered or unexported fields
}

func (*WorkerPoolOptions) Apply

func (o *WorkerPoolOptions) Apply(opts ...WorkerPoolOption)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL