v0.0.0-...-30c4c12 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2023 License: Apache-2.0 Imports: 59 Imported by: 1



Package exec implements compilation, evaluation, and execution of Bigslice slice operations.



View Source
const BigmachineStatusGroup = "bigmachine"
View Source
const DefaultMaxLoad = 0.95

DefaultMaxLoad is the default machine max load.


View Source
var DoShuffleReaders = true

DoShuffleReaders determines whether reader tasks should be shuffled in order to avoid potential thundering herd issues. This should only be used in testing when deterministic ordering matters.

TODO(marius): make this a session option instead.

View Source
var ErrTaskLost = errors.New("task was lost")

ErrTaskLost indicates that a Task was in TaskLost state.

View Source
var ProbationTimeout = 30 * time.Second

ProbationTimeout is the amount of time that a machine will remain in probation without being explicitly marked healthy.


func Eval

func Eval(ctx context.Context, executor Executor, roots []*Task, group *status.Group) error

Eval simultaneously evaluates a set of task graphs from the provided set of roots. Eval uses the provided executor to dispatch tasks when their dependencies have been satisfied. Eval returns on evaluation error or else when all roots are fully evaluated.

TODO(marius): we can often stream across shuffle boundaries. This would complicate scheduling, but may be worth doing.


type CompileEnv

type CompileEnv struct {
	// Writable is true if this environment is writable. It is only
	// exported so that it can be gob-{en,dec}oded.
	Writable bool
	// Cached indicates whether a task operation's results can be read from
	// cache. An "operation" is one of the pipelined elements that a task
	// may perform. It is only exported so that it can be gob-{en,dec}oded.
	Cached map[taskOp]bool

CompileEnv is the environment for compilation. This environment should capture all external state that can affect compilation of an invocation. It is shared across compilations of the same invocation (e.g. on worker nodes) to guarantee consistent compilation results. This is a requirement of bigslice's computation model, as we assume that all nodes share the same view of the task graph. It must be gob-encodable for transport to workers.

func (*CompileEnv) Freeze

func (e *CompileEnv) Freeze()

Freeze freezes the state, marking e no longer writable.

func (CompileEnv) IsCached

func (e CompileEnv) IsCached(n TaskName, opIdx int) bool

IsCached returns whether the (task, operation) given by (n, opIdx) is cached.

func (CompileEnv) IsWritable

func (e CompileEnv) IsWritable() bool

IsWritable returns whether this environment is writable.

func (CompileEnv) MarkCached

func (e CompileEnv) MarkCached(n TaskName, opIdx int)

MarkCached marks the (task, operation) given by (n, opIdx) as cached.

type Executor

type Executor interface {
	// Name returns a human-friendly name for this executor.
	Name() string

	// Start starts the executor. It is called before evaluation has started
	// and after all funcs have been registered. Start need not return:
	// for example, the Bigmachine implementation of Executor uses
	// Start as an entry point for worker processes.
	Start(*Session) (shutdown func())

	// Run runs a task. The executor sets the state of the task as it
	// progresses. The task should enter in state TaskWaiting; by the
	// time Run returns the task state is >= TaskOk.

	// Reader returns a locally accessible ReadCloser for the requested task.
	Reader(*Task, int) sliceio.ReadCloser

	// Discard discards the storage resources held by a computed task.
	// Discarding is best-effort, so no error is returned.
	Discard(context.Context, *Task)

	// Eventer returns the eventer used to log events relevant to this executor.
	Eventer() eventlog.Eventer

	// HandleDebug adds executor-specific debug handlers to the provided
	// http.ServeMux. This is used to serve diagnostic information relating
	// to the executor.
	HandleDebug(handler *http.ServeMux)

Executor defines an interface used to provide implementations of task runners. An Executor is responsible for running single tasks, partitioning their outputs, and instantiating readers to retrieve the output of any given task.

type Option

type Option func(s *Session)

An Option represents a session configuration parameter value.

var Local Option = func(s *Session) {
	s.executor = newLocalExecutor()

Local configures a session with the local in-binary executor.

var MachineCombiners Option = func(s *Session) {
	s.machineCombiners = true

MachineCombiners is a session option that turns on machine-local combine buffers. If turned on, each combiner task that belongs to the same shard-set and runs on the same machine combines values into a single, machine-local combine buffer. This can be a big performance optimization for tasks that have low key cardinality, or a key-set with very hot keys. However, due to the way it is implemented, error recovery is currently not implemented for such tasks.

func Bigmachine

func Bigmachine(system bigmachine.System, params ...bigmachine.Param) Option

Bigmachine configures a session using the bigmachine executor configured with the provided system. If any params are provided, they are applied to each bigmachine allocated by Bigslice.

func Eventer

func Eventer(e eventlog.Eventer) Option

Eventer configures the session with an Eventer that will be used to log session events (for analytics).

func MaxLoad

func MaxLoad(maxLoad float64) Option

MaxLoad configures the session with the provided max machine load.

func Parallelism

func Parallelism(p int) Option

Parallelism configures the session with the provided target parallelism.

func Status

func Status(status *status.Status) Option

Status configures the session with a status object to which run statuses are reported.

func TracePath

func TracePath(path string) Option

TracePath configures the path to which a trace event file for the session will be written on shutdown.

type Result

type Result struct {
	// contains filtered or unexported fields

A Result is the output of a Slice evaluation. It is the only type implementing bigslice.Slice that is a legal argument to a bigslice.Func.

func (*Result) Discard

func (r *Result) Discard(ctx context.Context)

Discard discards the storage resources held by the subgraph of tasks used to compute r. This should be used to discard results that are no longer needed. If the results are needed by another computation, they will be recomputed. Discarding is best-effort, so no error is returned.

func (*Result) Scanner

func (r *Result) Scanner() *sliceio.Scanner

Scanner returns a scanner that scans the output. If the output contains multiple shards, they are scanned sequentially. You must call Close on the returned scanner when you are done scanning. You may get and scan multiple scanners concurrently from r.

func (*Result) Scope

func (r *Result) Scope() *metrics.Scope

Scope returns the merged metrics scope for the entire task graph represented by the result r. Scope relies on the local values in the scopes of the task graph, and thus are not precise.

TODO(marius): flow and merge scopes along with data to provide precise metrics.

type Session

type Session struct {
	// contains filtered or unexported fields

Session represents a Bigslice compute session. A session shares a binary and executor, and is valid for the run of the binary. A session can run multiple bigslice functions, allowing for iterative computing.

A session is started by the Start method. Some executors use may launch multiple copies of the binary: these additional binaries are called workers and Start in these Start does not return.

All functions must be created before Start is called, and must be created in a deterministic order. This is provided by default when functions are created as part of package initialization. Registering toplevel functions this way is both safe and encouraged:

var Computation = bigslice.Func(func(..) (slice Slice) {
	// Build up the computation, parameterized by the function.
	slice = ...
	slice = ...
	return slice

// Possibly in another package:
func main() {
	sess := exec.Start()
	if err := sess.Run(ctx, Computation, args...); err != nil {
	// Success!

func Start

func Start(options ...Option) *Session

Start creates and starts a new bigslice session, configuring it according to the provided options. Only one session may be created in a single binary invocation. The returned session remains valid for the lifetime of the binary. If no executor is configured, the session is configured to use the bigmachine executor.

func (*Session) Discard

func (s *Session) Discard(ctx context.Context, roots []*Task)

Discard discards the storage resources held by the subgraph given by roots. This should be used to discard tasks whose results are no longer needed. If the task results are needed by another computation, they will be recomputed. Discarding is best-effort, so no error is returned.

func (*Session) HandleDebug

func (s *Session) HandleDebug(handler *http.ServeMux)

func (*Session) MaxLoad

func (s *Session) MaxLoad() float64

MaxLoad returns the maximum load on each allocated machine.

func (*Session) Must

func (s *Session) Must(ctx context.Context, funcv *bigslice.FuncValue, args ...interface{}) *Result

Must is a version of Run that panics if the computation fails.

func (*Session) Parallelism

func (s *Session) Parallelism() int

Parallelism returns the desired amount of evaluation parallelism.

func (*Session) Run

func (s *Session) Run(ctx context.Context, funcv *bigslice.FuncValue, args ...interface{}) (*Result, error)

Run evaluates the slice returned by the bigslice func funcv applied to the provided arguments. Tasks are run by the session's executor. Run returns when the computation has completed, or else on error. It is safe to make concurrent calls to Run; the underlying computation will be performed in parallel.

func (*Session) Shutdown

func (s *Session) Shutdown()

Shutdown tears down resources associated with this session. It should be called when the session is discarded.

func (*Session) Status

func (s *Session) Status() *status.Status

Status returns the session's status aggregator.

type Store

type Store interface {
	// Create returns a writer that populates data for the given
	// task name and partition. The data is not be available
	// to Open until the returned closer has been closed.
	// TODO(marius): should we allow writes to be discarded as well?
	Create(ctx context.Context, task TaskName, partition int) (writeCommitter, error)

	// Open returns a ReadCloser from which the stored contents of the named task
	// and partition can be read. If the task and partition are not stored, an
	// error with kind errors.NotExist is returned. The offset specifies the byte
	// position from which to read.
	Open(ctx context.Context, task TaskName, partition int, offset int64) (io.ReadCloser, error)

	// Stat returns metadata for the stored slice.
	Stat(ctx context.Context, task TaskName, partition int) (sliceInfo, error)

	// Discard discards the data stored for task and partition. Subsequent calls
	// to Open for the given (task, partition) will fail. ReadClosers that
	// already exist may start returning errors, depending on the
	// implementation. If no such (task, partition) is stored, returns a non-nil
	// error.
	Discard(ctx context.Context, task TaskName, partition int) error

Store is an abstraction that stores partitioned data as produced by a task.

type Task

type Task struct {
	// Invocation is the task's invocation, i.e. the Func invocation
	// from which this task was compiled.
	Invocation execInvocation
	// Name is the name of the task. Tasks are named uniquely inside each
	// Bigslice session.
	Name TaskName
	// Do starts computation for this task, returning a reader that
	// computes batches of values on demand. Do is invoked with readers
	// for the task's dependencies.
	Do func([]sliceio.Reader) sliceio.Reader
	// Deps are the task's dependencies. See TaskDep for details.
	Deps []TaskDep

	// Partitioner is used to partition the task's output. It will only
	// be called when NumPartition > 1.
	Partitioner bigslice.Partitioner
	// NumPartition is the number of partitions that are output by this task.
	// If NumPartition > 1, then the task must also define a partitioner.
	NumPartition int

	// Combiner specifies an (optional) combiner to use for this task's output.
	// If a Combiner is not Nil, CombineKey names the combine buffer used:
	// each combine buffer contains combiner outputs from multiple tasks.
	// If CombineKey is not set, then per-task buffers are used instead.
	Combiner   slicefunc.Func
	CombineKey string

	// Pragma comprises the pragmas of all slice operations that
	// are pipelined into this task.

	// Slices is the set of slices to which this task directly contributes.
	Slices []bigslice.Slice

	// Group stores an ordered list of peer tasks. If Group is nonempty,
	// it is guaranteed that these sets of tasks constitute a shuffle
	// dependency, and share a set of shuffle dependencies. This allows
	// the evaluator to perform optimizations while tracking such
	// dependencies.
	Group []*Task

	// Scopes is the metrics scope for this task. It is populated with the
	// metrics produced during execution of this task.
	Scope metrics.Scope


	// Status is a status object to which task status is reported.
	Status *status.Task
	// contains filtered or unexported fields

A Task represents a concrete computational task. Tasks form graphs through dependencies; task graphs are compiled from slices.

Tasks also maintain executor state, and are used to coordinate execution between concurrent evaluators and a single executor (which may be evaluating many tasks concurrently). Tasks thus embed a mutex for coordination and provide a context-aware conditional variable to coordinate runtime state changes.

func (*Task) All

func (t *Task) All() []*Task

All returns all tasks reachable from t. The returned set of tasks is unique.

func (*Task) Broadcast

func (t *Task) Broadcast()

Broadcast notifies waiters of a state change. Broadcast must only be called while the task's lock is held.

func (*Task) Err

func (t *Task) Err() error

Err returns an error if the task's state is >= TaskErr. When the state is > TaskErr, Err returns an error describing the task's failed state, otherwise, t.err is returned.

func (*Task) Error

func (t *Task) Error(err error)

Error sets the task's state to TaskErr and its error to the provided error. Waiters are notified.

func (*Task) Errorf

func (t *Task) Errorf(format string, v ...interface{})

Errorf formats an error message using fmt.Errorf, sets the task's state to TaskErr and its err to the resulting error message.

func (*Task) GraphString

func (t *Task) GraphString() string

GraphString returns a schematic string of the task graph rooted at t.

func (*Task) Head

func (t *Task) Head() *Task

Head returns the head task of this task's phase. If the task does not belong to a phase, Head returns the task t.

func (*Task) Phase

func (t *Task) Phase() []*Task

Phase returns the phase to which this task belongs.

func (*Task) Set

func (t *Task) Set(state TaskState)

Set sets the task's state to the provided state and notifies any waiters.

func (*Task) State

func (t *Task) State() TaskState

State returns the task's current state.

func (*Task) String

func (t *Task) String() string

String returns a short, human-readable string describing the task's state.

func (*Task) Subscribe

func (t *Task) Subscribe(s *TaskSubscriber)

Subscribe subscribes s to be notified of any changes to t's state. If s has already been subscribed, no-op.

func (*Task) Unsubscribe

func (t *Task) Unsubscribe(s *TaskSubscriber)

Unsubscribe unsubscribes previously subscribe s. s will on longer receive task state change notifications. No-op if s was never subscribed.

func (*Task) Wait

func (t *Task) Wait(ctx context.Context) error

Wait returns after the next call to Broadcast, or if the context is complete. The task's lock must be held when calling Wait.

func (*Task) WaitState

func (t *Task) WaitState(ctx context.Context, state TaskState) (TaskState, error)

WaitState returns when the task's state is at least the provided state, or else when the context is done.

func (*Task) WriteGraph

func (t *Task) WriteGraph(w io.Writer)

WriteGraph writes a schematic string of the task graph rooted at t into w.

type TaskDep

type TaskDep struct {
	// Head holds the underlying task that represents this dependency.
	// For shuffle dependencies, that task is the head task of the
	// phase, and the evaluator must expand the phase.
	Head      *Task
	Partition int

	// Expand indicates that the task's dependencies for a given
	// partition should not be merged, but rather passed individually to
	// the task implementation.
	Expand bool

	// CombineKey is an optional label that names the combination key to
	// be used by this dependency. It is used to name a single combiner
	// buffer from which is read a number of combined tasks.
	// CombineKeys must be provided to tasks that contain combiners.
	CombineKey string

A TaskDep describes a single dependency for a task. A dependency comprises one or more tasks and the partition number of the task set that must be read at run time.

func (TaskDep) NumTask

func (d TaskDep) NumTask() int

NumTask returns the number of tasks that are comprised by this dependency.

func (TaskDep) Task

func (d TaskDep) Task(i int) *Task

Task returns the i'th task comprised by this dependency.

type TaskName

type TaskName struct {
	// InvIndex is the index of the invocation for which the task was compiled.
	InvIndex uint64
	// Op is a unique string describing the operation that is provided
	// by the task.
	Op string
	// Shard and NumShard describe the shard processed by this task
	// and the total number of shards to be processed.
	Shard, NumShard int

A TaskName uniquely names a task by its constituent components. Tasks with 0 shards are taken to be combiner tasks: they are machine-local buffers of combiner outputs for some (non-overlapping) subset of shards for a task.

func (TaskName) IsCombiner

func (n TaskName) IsCombiner() bool

IsCombiner returns whether the named task is a combiner task.

func (TaskName) String

func (n TaskName) String() string

String returns a canonical representation of the task name, formatted as:


type TaskState

type TaskState int

TaskState represents the runtime state of a Task. TaskState values are defined so that their magnitudes correspond with task progression.

const (
	// TaskInit is the initial state of a task. Tasks in state TaskInit
	// have usually not yet been seen by an executor.
	TaskInit TaskState = iota

	// TaskWaiting indicates that a task has been scheduled for
	// execution (it is runnable) but has not yet been allocated
	// resources by the executor.
	// TaskRunning is the state of a task that's currently being run or
	// discarded. After a task is in state TaskRunning, it can only enter a
	// larger-valued state.

	// TaskOk indicates that a task has successfully completed;
	// the task's results are available to dependent tasks.
	// All TaskState values greater than TaskOk indicate task
	// errors.

	// TaskErr indicates that the task experienced a failure while
	// running.
	// TaskLost indicates that the task was lost, usually because
	// the machine to which the task was assigned failed.

func (TaskState) String

func (s TaskState) String() string

String returns the task's state as an upper-case string.

type TaskSubscriber

type TaskSubscriber struct {
	// contains filtered or unexported fields

TaskSubscriber is subscribed to a Task using Subscribe. It is then notified whenever the Task state changes. This is useful for efficiently observing the state changes of many tasks.

func NewTaskSubscriber

func NewTaskSubscriber() *TaskSubscriber

NewTaskSubscriber returns a new TaskSubscriber. It needs to be subscribed to a Task with Subscribe for it to be notified of task state changes.

func (*TaskSubscriber) Notify

func (s *TaskSubscriber) Notify(task *Task)

Notify notifies s of a task whose state has changed.

func (*TaskSubscriber) Ready

func (s *TaskSubscriber) Ready() <-chan struct{}

Ready returns a channel that is closed if a subsequent call to Tasks will return a non-nil slice.

func (*TaskSubscriber) Tasks

func (s *TaskSubscriber) Tasks() []*Task

Tasks returns the tasks whose state has changed since the last call to Tasks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL