Package exec implements compilation, evaluation, and execution of Bigslice slice operations.



    View Source
    const BigmachineStatusGroup = "bigmachine"
    View Source
    const DefaultMaxLoad = 0.95

      DefaultMaxLoad is the default machine max load.


      View Source
      var DoShuffleReaders = true

        DoShuffleReaders determines whether reader tasks should be shuffled in order to avoid potential thundering herd issues. This should only be used in testing when deterministic ordering matters.

        TODO(marius): make this a session option instead.

        View Source
        var ErrTaskLost = errors.New("task was lost")

          ErrTaskLost indicates that a Task was in TaskLost state.

          View Source
          var ProbationTimeout = 30 * time.Second

            ProbationTimeout is the amount of time that a machine will remain in probation without being explicitly marked healthy.


            func Eval

            func Eval(ctx context.Context, executor Executor, roots []*Task, group *status.Group) error

              Eval simultaneously evaluates a set of task graphs from the provided set of roots. Eval uses the provided executor to dispatch tasks when their dependencies have been satisfied. Eval returns on evaluation error or else when all roots are fully evaluated.

              TODO(marius): we can often stream across shuffle boundaries. This would complicate scheduling, but may be worth doing.


              type CompileEnv

              type CompileEnv struct {
              	// Writable is true if this environment is writable. It is only exported so
              	// that it can be gob-{en,dec}oded.
              	Writable bool
              	// TaskCached indicates whether a task's results can be read from cache. It
              	// is only exported so that it can be gob-{en,dec}oded.
              	TaskCached map[TaskName]bool

                CompileEnv is the environment for compilation. This environment should capture all external state that can affect compilation of an invocation. It is shared across compilations of the same invocation (e.g. on worker nodes) to guarantee consistent compilation results. This is a requirement of bigslice's computation model, as we assume that all nodes share the same view of the task graph.

                func (*CompileEnv) Freeze

                func (e *CompileEnv) Freeze()

                  Freeze freezes the state, marking e no longer writable.

                  func (CompileEnv) IsCached

                  func (e CompileEnv) IsCached(n TaskName) bool

                    IsCached returns whether the task named n is cached.

                    func (CompileEnv) IsWritable

                    func (e CompileEnv) IsWritable() bool

                      IsWritable returns whether this environment is writable.

                      func (CompileEnv) MarkCached

                      func (e CompileEnv) MarkCached(n TaskName)

                        MarkCached marks the task named n as cached.

                        type Executor

                        type Executor interface {
                        	// Name returns a human-friendly name for this executor.
                        	Name() string
                        	// Start starts the executor. It is called before evaluation has started
                        	// and after all funcs have been registered. Start need not return:
                        	// for example, the Bigmachine implementation of Executor uses
                        	// Start as an entry point for worker processes.
                        	Start(*Session) (shutdown func())
                        	// Run runs a task. The executor sets the state of the task as it
                        	// progresses. The task should enter in state TaskWaiting; by the
                        	// time Run returns the task state is >= TaskOk.
                        	// Reader returns a locally accessible ReadCloser for the requested task.
                        	Reader(*Task, int) sliceio.ReadCloser
                        	// Discard discards the storage resources held by a computed task.
                        	// Discarding is best-effort, so no error is returned.
                        	Discard(context.Context, *Task)
                        	// Eventer returns the eventer used to log events relevant to this executor.
                        	Eventer() eventlog.Eventer
                        	// HandleDebug adds executor-specific debug handlers to the provided
                        	// http.ServeMux. This is used to serve diagnostic information relating
                        	// to the executor.
                        	HandleDebug(handler *http.ServeMux)

                          Executor defines an interface used to provide implementations of task runners. An Executor is responsible for running single tasks, partitioning their outputs, and instantiating readers to retrieve the output of any given task.

                          type Option

                          type Option func(s *Session)

                            An Option represents a session configuration parameter value.

                            var Local Option = func(s *Session) {
                            	s.executor = newLocalExecutor()

                              Local configures a session with the local in-binary executor.

                              var MachineCombiners Option = func(s *Session) {
                              	s.machineCombiners = true

                                MachineCombiners is a session option that turns on machine-local combine buffers. If turned on, each combiner task that belongs to the same shard-set and runs on the same machine combines values into a single, machine-local combine buffer. This can be a big performance optimization for tasks that have low key cardinality, or a key-set with very hot keys. However, due to the way it is implemented, error recovery is currently not implemented for such tasks.

                                func Bigmachine

                                func Bigmachine(system bigmachine.System, params ...bigmachine.Param) Option

                                  Bigmachine configures a session using the bigmachine executor configured with the provided system. If any params are provided, they are applied to each bigmachine allocated by Bigslice.

                                  func Eventer

                                  func Eventer(e eventlog.Eventer) Option

                                    Eventer configures the session with an Eventer that will be used to log session events (for analytics).

                                    func MaxLoad

                                    func MaxLoad(maxLoad float64) Option

                                      MaxLoad configures the session with the provided max machine load.

                                      func Parallelism

                                      func Parallelism(p int) Option

                                        Parallelism configures the session with the provided target parallelism.

                                        func Status

                                        func Status(status *status.Status) Option

                                          Status configures the session with a status object to which run statuses are reported.

                                          func TracePath

                                          func TracePath(path string) Option

                                            TracePath configures the path to which a trace event file for the session will be written on shutdown.

                                            type Result

                                            type Result struct {
                                            	// contains filtered or unexported fields

                                              A Result is the output of a Slice evaluation. It is the only type implementing bigslice.Slice that is a legal argument to a bigslice.Func.

                                              func (*Result) Discard

                                              func (r *Result) Discard(ctx context.Context)

                                                Discard discards the storage resources held by the subgraph of tasks used to compute r. This should be used to discard results that are no longer needed. If the results are needed by another computation, they will be recomputed. Discarding is best-effort, so no error is returned.

                                                func (*Result) Scanner

                                                func (r *Result) Scanner() *sliceio.Scanner

                                                  Scanner returns a scanner that scans the output. If the output contains multiple shards, they are scanned sequentially. You must call Close on the returned scanner when you are done scanning. You may get and scan multiple scanners concurrently from r.

                                                  func (*Result) Scope

                                                  func (r *Result) Scope() *metrics.Scope

                                                    Scope returns the merged metrics scope for the entire task graph represented by the result r. Scope relies on the local values in the scopes of the task graph, and thus are not precise.

                                                    TODO(marius): flow and merge scopes along with data to provide precise metrics.

                                                    type Session

                                                    type Session struct {
                                                    	// contains filtered or unexported fields

                                                      Session represents a Bigslice compute session. A session shares a binary and executor, and is valid for the run of the binary. A session can run multiple bigslice functions, allowing for iterative computing.

                                                      A session is started by the Start method. Some executors use may launch multiple copies of the binary: these additional binaries are called workers and Start in these Start does not return.

                                                      All functions must be created before Start is called, and must be created in a deterministic order. This is provided by default when functions are created as part of package initialization. Registering toplevel functions this way is both safe and encouraged:

                                                      var Computation = bigslice.Func(func(..) (slice Slice) {
                                                      	// Build up the computation, parameterized by the function.
                                                      	slice = ...
                                                      	slice = ...
                                                      	return slice
                                                      // Possibly in another package:
                                                      func main() {
                                                      	sess := exec.Start()
                                                      	if err := sess.Run(ctx, Computation, args...); err != nil {
                                                      	// Success!

                                                      func Start

                                                      func Start(options ...Option) *Session

                                                        Start creates and starts a new bigslice session, configuring it according to the provided options. Only one session may be created in a single binary invocation. The returned session remains valid for the lifetime of the binary. If no executor is configured, the session is configured to use the bigmachine executor.

                                                        func (*Session) Discard

                                                        func (s *Session) Discard(ctx context.Context, roots []*Task)

                                                          Discard discards the storage resources held by the subgraph given by roots. This should be used to discard tasks whose results are no longer needed. If the task results are needed by another computation, they will be recomputed. Discarding is best-effort, so no error is returned.

                                                          func (*Session) HandleDebug

                                                          func (s *Session) HandleDebug(handler *http.ServeMux)

                                                          func (*Session) MaxLoad

                                                          func (s *Session) MaxLoad() float64

                                                            MaxLoad returns the maximum load on each allocated machine.

                                                            func (*Session) Must

                                                            func (s *Session) Must(ctx context.Context, funcv *bigslice.FuncValue, args ...interface{}) *Result

                                                              Must is a version of Run that panics if the computation fails.

                                                              func (*Session) Parallelism

                                                              func (s *Session) Parallelism() int

                                                                Parallelism returns the desired amount of evaluation parallelism.

                                                                func (*Session) Run

                                                                func (s *Session) Run(ctx context.Context, funcv *bigslice.FuncValue, args ...interface{}) (*Result, error)

                                                                  Run evaluates the slice returned by the bigslice func funcv applied to the provided arguments. Tasks are run by the session's executor. Run returns when the computation has completed, or else on error. It is safe to make concurrent calls to Run; the underlying computation will be performed in parallel.

                                                                  func (*Session) Shutdown

                                                                  func (s *Session) Shutdown()

                                                                    Shutdown tears down resources associated with this session. It should be called when the session is discarded.

                                                                    func (*Session) Status

                                                                    func (s *Session) Status() *status.Status

                                                                      Status returns the session's status aggregator.

                                                                      type Store

                                                                      type Store interface {
                                                                      	// Create returns a writer that populates data for the given
                                                                      	// task name and partition. The data is not be available
                                                                      	// to Open until the returned closer has been closed.
                                                                      	// TODO(marius): should we allow writes to be discarded as well?
                                                                      	Create(ctx context.Context, task TaskName, partition int) (writeCommitter, error)
                                                                      	// Open returns a ReadCloser from which the stored contents of the named task
                                                                      	// and partition can be read. If the task and partition are not stored, an
                                                                      	// error with kind errors.NotExist is returned. The offset specifies the byte
                                                                      	// position from which to read.
                                                                      	Open(ctx context.Context, task TaskName, partition int, offset int64) (io.ReadCloser, error)
                                                                      	// Stat returns metadata for the stored slice.
                                                                      	Stat(ctx context.Context, task TaskName, partition int) (sliceInfo, error)
                                                                      	// Discard discards the data stored for task and partition. Subsequent calls
                                                                      	// to Open for the given (task, partition) will fail. ReadClosers that
                                                                      	// already exist may start returning errors, depending on the
                                                                      	// implementation. If no such (task, partition) is stored, returns a non-nil
                                                                      	// error.
                                                                      	Discard(ctx context.Context, task TaskName, partition int) error

                                                                        Store is an abstraction that stores partitioned data as produced by a task.

                                                                        type Task

                                                                        type Task struct {
                                                                        	// Invocation is the task's invocation, i.e. the Func invocation
                                                                        	// from which this task was compiled.
                                                                        	Invocation execInvocation
                                                                        	// Name is the name of the task. Tasks are named uniquely inside each
                                                                        	// Bigslice session.
                                                                        	Name TaskName
                                                                        	// Do starts computation for this task, returning a reader that
                                                                        	// computes batches of values on demand. Do is invoked with readers
                                                                        	// for the task's dependencies.
                                                                        	Do func([]sliceio.Reader) sliceio.Reader
                                                                        	// Deps are the task's dependencies. See TaskDep for details.
                                                                        	Deps []TaskDep
                                                                        	// Partitioner is used to partition the task's output. It will only
                                                                        	// be called when NumPartition > 1.
                                                                        	Partitioner bigslice.Partitioner
                                                                        	// NumPartition is the number of partitions that are output by this task.
                                                                        	// If NumPartition > 1, then the task must also define a partitioner.
                                                                        	NumPartition int
                                                                        	// Combiner specifies an (optional) combiner to use for this task's output.
                                                                        	// If a Combiner is not Nil, CombineKey names the combine buffer used:
                                                                        	// each combine buffer contains combiner outputs from multiple tasks.
                                                                        	// If CombineKey is not set, then per-task buffers are used instead.
                                                                        	Combiner   slicefunc.Func
                                                                        	CombineKey string
                                                                        	// Pragma comprises the pragmas of all slice operations that
                                                                        	// are pipelined into this task.
                                                                        	// Slices is the set of slices to which this task directly contributes.
                                                                        	Slices []bigslice.Slice
                                                                        	// Group stores an ordered list of peer tasks. If Group is nonempty,
                                                                        	// it is guaranteed that these sets of tasks constitute a shuffle
                                                                        	// dependency, and share a set of shuffle dependencies. This allows
                                                                        	// the evaluator to perform optimizations while tracking such
                                                                        	// dependencies.
                                                                        	Group []*Task
                                                                        	// Scopes is the metrics scope for this task. It is populated with the
                                                                        	// metrics produced during execution of this task.
                                                                        	Scope metrics.Scope
                                                                        	// Status is a status object to which task status is reported.
                                                                        	Status *status.Task
                                                                        	// contains filtered or unexported fields

                                                                          A Task represents a concrete computational task. Tasks form graphs through dependencies; task graphs are compiled from slices.

                                                                          Tasks also maintain executor state, and are used to coordinate execution between concurrent evaluators and a single executor (which may be evaluating many tasks concurrently). Tasks thus embed a mutex for coordination and provide a context-aware conditional variable to coordinate runtime state changes.

                                                                          func (*Task) All

                                                                          func (t *Task) All() []*Task

                                                                            All returns all tasks reachable from t. The returned set of tasks is unique.

                                                                            func (*Task) Broadcast

                                                                            func (t *Task) Broadcast()

                                                                              Broadcast notifies waiters of a state change. Broadcast must only be called while the task's lock is held.

                                                                              func (*Task) Err

                                                                              func (t *Task) Err() error

                                                                                Err returns an error if the task's state is >= TaskErr. When the state is > TaskErr, Err returns an error describing the task's failed state, otherwise, t.err is returned.

                                                                                func (*Task) Error

                                                                                func (t *Task) Error(err error)

                                                                                  Error sets the task's state to TaskErr and its error to the provided error. Waiters are notified.

                                                                                  func (*Task) Errorf

                                                                                  func (t *Task) Errorf(format string, v ...interface{})

                                                                                    Errorf formats an error message using fmt.Errorf, sets the task's state to TaskErr and its err to the resulting error message.

                                                                                    func (*Task) GraphString

                                                                                    func (t *Task) GraphString() string

                                                                                      GraphString returns a schematic string of the task graph rooted at t.

                                                                                      func (*Task) Head

                                                                                      func (t *Task) Head() *Task

                                                                                        Head returns the head task of this task's phase. If the task does not belong to a phase, Head returns the task t.

                                                                                        func (*Task) Phase

                                                                                        func (t *Task) Phase() []*Task

                                                                                          Phase returns the phase to which this task belongs.

                                                                                          func (*Task) Set

                                                                                          func (t *Task) Set(state TaskState)

                                                                                            Set sets the task's state to the provided state and notifies any waiters.

                                                                                            func (*Task) State

                                                                                            func (t *Task) State() TaskState

                                                                                              State returns the task's current state.

                                                                                              func (*Task) String

                                                                                              func (t *Task) String() string

                                                                                                String returns a short, human-readable string describing the task's state.

                                                                                                func (*Task) Subscribe

                                                                                                func (t *Task) Subscribe(s *TaskSubscriber)

                                                                                                  Subscribe subscribes s to be notified of any changes to t's state. If s has already been subscribed, no-op.

                                                                                                  func (*Task) Unsubscribe

                                                                                                  func (t *Task) Unsubscribe(s *TaskSubscriber)

                                                                                                    Unsubscribe unsubscribes previously subscribe s. s will on longer receive task state change notifications. No-op if s was never subscribed.

                                                                                                    func (*Task) Wait

                                                                                                    func (t *Task) Wait(ctx context.Context) error

                                                                                                      Wait returns after the next call to Broadcast, or if the context is complete. The task's lock must be held when calling Wait.

                                                                                                      func (*Task) WaitState

                                                                                                      func (t *Task) WaitState(ctx context.Context, state TaskState) (TaskState, error)

                                                                                                        WaitState returns when the task's state is at least the provided state, or else when the context is done.

                                                                                                        func (*Task) WriteGraph

                                                                                                        func (t *Task) WriteGraph(w io.Writer)

                                                                                                          WriteGraph writes a schematic string of the task graph rooted at t into w.

                                                                                                          type TaskDep

                                                                                                          type TaskDep struct {
                                                                                                          	// Head holds the underlying task that represents this dependency.
                                                                                                          	// For shuffle dependencies, that task is the head task of the
                                                                                                          	// phase, and the evaluator must expand the phase.
                                                                                                          	Head      *Task
                                                                                                          	Partition int
                                                                                                          	// Expand indicates that the task's dependencies for a given
                                                                                                          	// partition should not be merged, but rather passed individually to
                                                                                                          	// the task implementation.
                                                                                                          	Expand bool
                                                                                                          	// CombineKey is an optional label that names the combination key to
                                                                                                          	// be used by this dependency. It is used to name a single combiner
                                                                                                          	// buffer from which is read a number of combined tasks.
                                                                                                          	// CombineKeys must be provided to tasks that contain combiners.
                                                                                                          	CombineKey string

                                                                                                            A TaskDep describes a single dependency for a task. A dependency comprises one or more tasks and the partition number of the task set that must be read at run time.

                                                                                                            func (TaskDep) NumTask

                                                                                                            func (d TaskDep) NumTask() int

                                                                                                              NumTask returns the number of tasks that are comprised by this dependency.

                                                                                                              func (TaskDep) Task

                                                                                                              func (d TaskDep) Task(i int) *Task

                                                                                                                Task returns the i'th task comprised by this dependency.

                                                                                                                type TaskName

                                                                                                                type TaskName struct {
                                                                                                                	// InvIndex is the index of the invocation for which the task was compiled.
                                                                                                                	InvIndex uint64
                                                                                                                	// Op is a unique string describing the operation that is provided
                                                                                                                	// by the task.
                                                                                                                	Op string
                                                                                                                	// Shard and NumShard describe the shard processed by this task
                                                                                                                	// and the total number of shards to be processed.
                                                                                                                	Shard, NumShard int

                                                                                                                  A TaskName uniquely names a task by its constituent components. Tasks with 0 shards are taken to be combiner tasks: they are machine-local buffers of combiner outputs for some (non-overlapping) subset of shards for a task.

                                                                                                                  func (TaskName) IsCombiner

                                                                                                                  func (n TaskName) IsCombiner() bool

                                                                                                                    IsCombiner returns whether the named task is a combiner task.

                                                                                                                    func (TaskName) String

                                                                                                                    func (n TaskName) String() string

                                                                                                                      String returns a canonical representation of the task name, formatted as:


                                                                                                                      type TaskState

                                                                                                                      type TaskState int

                                                                                                                        TaskState represents the runtime state of a Task. TaskState values are defined so that their magnitudes correspond with task progression.

                                                                                                                        const (
                                                                                                                        	// TaskInit is the initial state of a task. Tasks in state TaskInit
                                                                                                                        	// have usually not yet been seen by an executor.
                                                                                                                        	TaskInit TaskState = iota
                                                                                                                        	// TaskWaiting indicates that a task has been scheduled for
                                                                                                                        	// execution (it is runnable) but has not yet been allocated
                                                                                                                        	// resources by the executor.
                                                                                                                        	// TaskRunning is the state of a task that's currently being run or
                                                                                                                        	// discarded. After a task is in state TaskRunning, it can only enter a
                                                                                                                        	// larger-valued state.
                                                                                                                        	// TaskOk indicates that a task has successfully completed;
                                                                                                                        	// the task's results are available to dependent tasks.
                                                                                                                        	// All TaskState values greater than TaskOk indicate task
                                                                                                                        	// errors.
                                                                                                                        	// TaskErr indicates that the task experienced a failure while
                                                                                                                        	// running.
                                                                                                                        	// TaskLost indicates that the task was lost, usually because
                                                                                                                        	// the machine to which the task was assigned failed.

                                                                                                                        func (TaskState) String

                                                                                                                        func (s TaskState) String() string

                                                                                                                          String returns the task's state as an upper-case string.

                                                                                                                          type TaskSubscriber

                                                                                                                          type TaskSubscriber struct {
                                                                                                                          	// contains filtered or unexported fields

                                                                                                                            TaskSubscriber is subscribed to a Task using Subscribe. It is then notified whenever the Task state changes. This is useful for efficiently observing the state changes of many tasks.

                                                                                                                            func NewTaskSubscriber

                                                                                                                            func NewTaskSubscriber() *TaskSubscriber

                                                                                                                              NewTaskSubscriber returns a new TaskSubscriber. It needs to be subscribed to a Task with Subscribe for it to be notified of task state changes.

                                                                                                                              func (*TaskSubscriber) Notify

                                                                                                                              func (s *TaskSubscriber) Notify(task *Task)

                                                                                                                                Notify notifies s of a task whose state has changed.

                                                                                                                                func (*TaskSubscriber) Ready

                                                                                                                                func (s *TaskSubscriber) Ready() <-chan struct{}

                                                                                                                                  Ready returns a channel that is closed if a subsequent call to Tasks will return a non-nil slice.

                                                                                                                                  func (*TaskSubscriber) Tasks

                                                                                                                                  func (s *TaskSubscriber) Tasks() []*Task

                                                                                                                                    Tasks returns the tasks whose state has changed since the last call to Tasks.