flow

package
v0.0.0-...-90deddd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2023 License: Apache-2.0 Imports: 43 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Digester is the Digester used throughout reflow. We use a SHA256 digest.

View Source
var Universe string

Universe is the global namespace for digest computation.

Functions

func ImageQualifiers

func ImageQualifiers(image string) (img string, aws, docker bool)

ImageQualifiers analyzes the given image for the presence of backdoor qualifiers, strips the image of them, and returns a boolean for each known qualifier if present.

func WithBackground

func WithBackground(ctx context.Context, wg WaitGroup) (context.Context, context.CancelFunc)

WithBackground returns a new context.Context with an affiliated Context, accessible via Background. The background context may be canceled with the returned cancellation function. The supplied WaitGroup is used to inform the caller of pending background operations: wg.Add(1) is called for each call to Background; wg.Done is called when the context returned from Background is disposed of through (Context).Complete.

Types

type Config

type Config struct {
	// HashV1 should be set to true if the flow should use the legacy
	// "v1" hash algorithm.
	HashV1 bool
}

Config stores flow configuration information. Configs modulate Flow behavior.

func (Config) IsZero

func (c Config) IsZero() bool

IsZero tells whether this config stores any non-default config.

func (*Config) Merge

func (c *Config) Merge(d Config)

Merge merges config d into config c.

func (Config) String

func (c Config) String() string

String returns a summary of the configuration c.

type Context

type Context struct {
	context.Context
	// contains filtered or unexported fields
}

Context is a context.Context that is used for background operations within reflow. In addition to providing a common background context for operations, it also carries a WaitGroup, so that the caller can wait for background operation completion.

func Background

func Background(ctx context.Context) Context

Background returns the Context associated with the given / parent context.Context. If there is no associated context, it returns a fresh Context without an affiliated WaitGroup.

func (Context) Complete

func (c Context) Complete()

Complete should be called when the operation is complete.

type Edge

type Edge struct {
	graph.Edge
	// contains filtered or unexported fields
}

Edge represents a dependency from a flow node to another.

func (Edge) Attributes

func (e Edge) Attributes() []encoding.Attribute

Attributes implments encoding.Attributer. Blue edges are dynamically explored edges.

type Eval

type Eval struct {
	// EvalConfig is the evaluation configuration used in this
	// evaluation.
	EvalConfig
	// contains filtered or unexported fields
}

Eval is an evaluator for Flows.

func NewEval

func NewEval(root *Flow, config EvalConfig) *Eval

NewEval creates and initializes a new evaluator using the provided evaluation configuration and root flow.

func (*Eval) CacheWrite

func (e *Eval) CacheWrite(ctx context.Context, f *Flow) error

CacheWrite writes the cache entry for flow f, with objects in the provided source repository. CacheWrite returns nil on success, or else the first error encountered.

func (*Eval) Do

func (e *Eval) Do(ctx context.Context) error

Do evaluates a flow (as provided in Init) and returns its value, or error.

There are two evaluation modes, configured by EvalConfig.BottomUp.

When BottomUp is true, the Flow is evaluated in bottom-up mode. Each node's dependencies are evaluated (recursively); a node is evaluated when all of its dependencies are complete (and error free). Before a node is run, its result is first looked up in the configured cache. If there is a cache hit, evaluation without any work done. Only the node's value is downloaded; its objects are fetched lazily. When a node is ready to be evaluated, we check that all of the objects that it depends on are present in the executor's repository; missing objects are retrieved from cache. If these objects are not present in the cache (this can happen if the object is removed from the cache's repository after the cache lookup was done but before the transfer began), evaluation fails with a restartable error.

When BottomUp is false, the flow is evaluated first top-down, and then bottom up. In this mode, objects are looked up first in the top-down phase; a nodes dependencies are explored only on cache miss. Once this phase is complete, evaluation proceeds in bottom-up mode. Object retrievial is as in bottom-up mode.

Eval keeps track of the evaluation state of each node; these are described in the documentation for State.

Evaluation is performed by simplification: ready nodes are added to a todo list. Single-step evaluation yields either a fully evaluated node (where (*Flow).Value is set to its result) or by a new Flow node (whose (*Flow).Parent is always set to its ancestor). Evaluations are restartable.

This provides a simple evaluation scheme that also does not leave any parallelism "on the ground".

Eval employs a conservative admission controller to ensure that we do not exceed available resources.

The root flow is canonicalized before evaluation.

TODO(marius): wait for all nodes to complete before returning (early) when cancelling...

func (*Eval) Err

func (e *Eval) Err() error

Err returns the root evaluation error, if any.

func (*Eval) Flow

func (e *Eval) Flow() *Flow

Flow returns the root flow of this eval.

func (*Eval) LogFlow

func (e *Eval) LogFlow(ctx context.Context, f *Flow)

LogFlow logs flow f's state, and then tracks it for future logging.

func (*Eval) LogSummary

func (e *Eval) LogSummary(log *log.Logger)

LogSummary prints an execution summary to an io.Writer.

func (*Eval) Mutate

func (e *Eval) Mutate(f *Flow, muts ...interface{})

Mutate safely applies a set of mutations which may be applied concurrently with each other.

func (*Eval) Requirements

func (e *Eval) Requirements() reflow.Requirements

Requirements returns the minimum and maximum resource requirements for this Eval's flow.

func (*Eval) Value

func (e *Eval) Value() values.T

Value returns the root value of this eval.

type EvalConfig

type EvalConfig struct {

	// Scheduler is used to run tasks.
	// The scheduler must use the same repository as the evaluator.
	Scheduler *sched.Scheduler

	// Predictor is used to predict the tasks' resource usage. It
	// will only be used if a Scheduler is defined.
	Predictor *predictor.Predictor

	// Snapshotter is used to snapshot source URLs into unloaded
	// filesets. If non-nil, then files are delay-loaded.
	Snapshotter Snapshotter

	// An (optional) logger to which the evaluation transcript is printed.
	Log *log.Logger

	// DotWriter is an (optional) writer where the evaluator will write the flowgraph to in dot format.
	DotWriter io.Writer

	// Status gets evaluation status reports.
	Status *status.Group

	// An (optional) logger to print evaluation trace.
	Trace *log.Logger

	// Repository is the main, shared repository between evaluations.
	Repository reflow.Repository

	// Assoc is the main, shared assoc that is used to store cache and
	// metadata associations.
	Assoc assoc.Assoc

	// AssertionGenerator is the implementation for generating assertions.
	AssertionGenerator reflow.AssertionGenerator

	// Assert is the policy to use for asserting cached Assertions.
	Assert reflow.Assert

	// RunID is a unique identifier for the run
	RunID taskdb.RunID

	// CacheMode determines whether the evaluator reads from
	// or writes to the cache. If CacheMode is nonzero, Assoc,
	// Repository, and Transferer must be non-nil.
	CacheMode infra2.CacheMode

	// RecomputeEmpty determines whether cached empty values
	// are recomputed.
	RecomputeEmpty bool

	// BottomUp determines whether we perform bottom-up only
	// evaluation, skipping the top-down phase.
	BottomUp bool

	// PostUseChecksum indicates whether input filesets are checksummed after use.
	PostUseChecksum bool

	// Config stores the flow config to be used.
	Config Config

	// ImageMap stores the canonical names of the images.
	// A canonical name has a fully qualified registry host,
	// and image digest instead of image tag.
	ImageMap map[string]string

	// CacheLookupTimeout is the timeout for cache lookups.
	// After the timeout expires, a cache lookup is considered
	// a miss.
	CacheLookupTimeout time.Duration

	// Invalidate is a function that determines whether or not f's cached
	// results should be invalidated.
	Invalidate func(f *Flow) bool

	// Labels is the labels for this run.
	Labels pool.Labels

	// MaxResources is the max resources that can be used for a single task for this evaluation
	MaxResources reflow.Resources
}

EvalConfig provides runtime configuration for evaluation instances.

func (EvalConfig) String

func (e EvalConfig) String() string

String returns a human-readable form of the evaluation configuration.

type ExecArg

type ExecArg struct {
	// Out tells whether this argument is an output argument.
	Out bool
	// Index is the dependency index represented by this argument.
	Index int
}

ExecArg indexes arguments to dependencies.

type Flow

type Flow struct {
	// The operation represented by this node. See Op
	// for definitions.
	Op Op

	// Parent is set when a node is Forked.
	Parent *Flow

	// Deps holds this Flow's data dependencies.
	Deps []*Flow

	// Config stores this Flow's config.
	Config Config

	Image   string                                 // OpExec
	Cmd     string                                 // OpExec
	URL     *url.URL                               // OpIntern, Extern
	Re      *regexp.Regexp                         // Groupby, Collect
	Repl    string                                 // Collect
	MapFunc func(*Flow) *Flow                      // Map, MapMerge
	MapFlow *Flow                                  // Map
	K       func(vs []values.T) *Flow              // K
	Kctx    func(ctx KContext, v []values.T) *Flow // Kctx
	Coerce  func(values.T) (values.T, error)       // Coerce

	// ArgMap maps exec arguments to dependencies. (OpExec).
	Argmap []ExecArg

	// OutputIsDir tells whether the output i is a directory.
	OutputIsDir []bool

	// Original fields if this Flow was rewritten with canonical values.
	OriginalImage string

	// Argstrs stores a symbolic argument name, used for pretty printing
	// and debugging.
	Argstrs []string

	// FlowDigest stores, for Val, K and Coerce, a digest representing
	// just the operation or value.
	FlowDigest digest.Digest

	// ExtraDigest is considered as an additional digestible material of this flow
	// and included in the the flow's logical and physical digest computation.
	ExtraDigest digest.Digest

	// A human-readable identifier for the node, for use in
	// debugging output, etc.
	Ident string

	// Source code position of this node.
	Position string

	// State stores the evaluation state of the node; see State
	// for details.
	State State

	// Resources indicates the expected resource usage of this node.
	// Currently it is only defined for OpExec.
	Resources reflow.Resources

	// Reserved stores the amount of resources that have been reserved
	// on behalf of this node.
	Reserved reflow.Resources

	// FlowRequirements stores the requirements indicated by
	// Requirements.
	FlowRequirements reflow.Requirements

	// Value stores the Value to which the node was evaluated.
	Value values.T
	// Err stores any evaluation error that occurred during flow evaluation.
	Err *errors.Error

	// The total runtime for evaluating this node.
	Runtime time.Duration

	// The current owning executor of this Flow.
	Owner reflow.Executor

	// The exec working on this node.
	Exec reflow.Exec

	// Cached stores whether the flow was retrieved from cache.
	Cached bool

	// RunInfo stores a subset of the exec's inspect data, used for logging.
	RunInfo reflow.ExecRunInfo

	Tracked bool

	Status *status.Task

	Data []byte // Data

	// MustIntern is set to true if an OpIntern must be
	// fully interned and cannot be pre-resolved.
	MustIntern bool

	// Dirty is used by the evaluator to track which nodes are dirtied
	// by this node: once the node has been evaluated, these flows
	// may be eligible for evaluation.
	Dirty []*Flow

	// Pending maintains a map of this node's dependent nodes that
	// are pending evaluation. It is maintained by the evaluator to trigger
	// evaluation.
	Pending map[*Flow]bool

	// NonDeterministic, in the case of Execs, denotes if the exec is non-deterministic.
	NonDeterministic bool

	// ExecDepIncorrectCacheKeyBug is set for nodes that are known to be impacted by a bug
	// which causes the cache keys to be incorrectly computed.
	// See https://github.com/grailbio/reflow/pull/128 or T41260.
	ExecDepIncorrectCacheKeyBug bool
	// contains filtered or unexported fields
}

Flow defines an AST for data flows. It is a logical union of ops as defined by type Op. Child nodes witness computational dependencies and must therefore be evaluated before its parents.

func (*Flow) AbbrevCmd

func (f *Flow) AbbrevCmd() string

AbbrevCmd returns the abbreviated command line for an exec flow.

func (*Flow) CacheKeys

func (f *Flow) CacheKeys() []digest.Digest

CacheKeys returns all the valid cache keys for this flow node. They are returned in order from most concrete to least concrete.

func (*Flow) Canonicalize

func (f *Flow) Canonicalize(config Config) *Flow

Canonicalize returns a canonical version of Flow f, where semantically equivalent flows (as per Flow.Digest) are collapsed into one.

func (*Flow) Copy

func (f *Flow) Copy() *Flow

Copy performs a shallow copy of the Flow.

func (*Flow) DebugString

func (f *Flow) DebugString() string

DebugString returns a human readable representation of the flow appropriate for debugging.

func (*Flow) Digest

func (f *Flow) Digest() digest.Digest

Digest produces a digest of Flow f. The digest captures the entirety of the Flows semantics: two flows with the same digest must evaluate to the same value. Map Flows are canonicalized by passing a no-op Flow to its MapFunc.

func (*Flow) ExecArg

func (f *Flow) ExecArg(i int) ExecArg

ExecArg returns the ith ExecArg. It is drawn from f.Argmap if it is defined, or else it just the i'th input argument.

ExecArg panics if i >= f.NExecArg().

func (*Flow) ExecConfig

func (f *Flow) ExecConfig() reflow.ExecConfig

ExecConfig returns the flow's exec configuration. The flows dependencies must already be computed before invoking ExecConfig. ExecConfig is valid only for Intern, Extern, and Exec ops.

func (*Flow) ExecReset

func (f *Flow) ExecReset()

ExecReset resets all flow parameters related to running a single exec.

func (*Flow) ExecString

func (f *Flow) ExecString(cache bool) string

ExecString renders a string representing the operation performed by this node. Cache should be set to true if the result was retrieved from cache; in this case, values from dependencies are not rendered in this case since they may not be available. The returned string has the following format:

how:digest(ident) shortvalue = execstring (runtime transfer rate)

where "execstring" is a string indicating the operation performed.

For example, the ExecString of a node that interns a directory of FASTQs looks like this:

ecae46a4(inputfastq) val<161216_E00472_0063_AH7LWNALXX/CNVS-LUAD-120587007-cfDNA-WGS-Rep1_S1_L001_R1_001.fastq.gz=87f7ca18, ...492.45GB> = intern "s3://grail-avalon/samples/CNVS-LUAD-120587007-cfDNA-WGS-Rep1/fastq/" (1h29m33.908587144s 93.83MB/s)

func (*Flow) Fork

func (f *Flow) Fork(flow *Flow)

Fork creates a new fork of this flow. The current version of Flow f becomes the parent flow.

func (*Flow) Label

func (f *Flow) Label(ident string)

Label labels this flow with ident. It then recursively labels its ancestors. Labeling stops when a node is already labeled.

func (*Flow) MapInit

func (f *Flow) MapInit()

MapInit initializes Flow.MapFlow from the supplied MapFunc.

func (*Flow) NExecArg

func (f *Flow) NExecArg() int

NExecArg returns the number of exec arguments of this node. If f.Argmap is defined, it returns the length of the argument map, or else the number of dependencies.

func (*Flow) Requirements

func (f *Flow) Requirements() (req reflow.Requirements)

Requirements computes the minimum and maximum resource requirements for this flow. It currently assumes that the width of any map operation is infinite.

BUG(marius): Requirements cannot accurately determine disk space requirements as they depend on object liveness. They are currently treated as entirely ephemeral.

TODO(marius): include width hints on map nodes

TODO(marius): account for static parallelism too.

func (*Flow) String

func (f *Flow) String() string

Strings returns a shallow and human readable string representation of the flow.

func (*Flow) Visitor

func (f *Flow) Visitor() *FlowVisitor

Visitor returns a new FlowVisitor rooted at this node.

func (*Flow) WriteDigest

func (f *Flow) WriteDigest(w io.Writer)

WriteDigest writes the digestible material of f to w. The io.Writer is assumed to be produced by a Digester, and hence infallible. Errors are not checked.

type FlowVisitor

type FlowVisitor struct {
	*Flow
	// contains filtered or unexported fields
}

FlowVisitor implements a convenient visitor for flow graphs.

func (*FlowVisitor) Push

func (v *FlowVisitor) Push(f *Flow)

Push pushes node f onto visitor stack.

func (*FlowVisitor) Reset

func (v *FlowVisitor) Reset()

Reset resets the flow visitor state.

func (*FlowVisitor) Visit

func (v *FlowVisitor) Visit()

Visit pushes the current node's children on to the visitor stack, including both data and control dependencies.

func (*FlowVisitor) Walk

func (v *FlowVisitor) Walk() bool

Walk visits the next flow node on the stack. Walk returns false when it runs out of nodes to visit; it also guarantees that each node is visited only once.

type Fork

type Fork *Flow

Fork is a an argument to (*Eval).Mutate to indicate a fork mutation.

type KContext

type KContext interface {
	// Context is supplied context.
	context.Context
	// Repository returns the repository.
	Repository() reflow.Repository
}

KContext is the context provided to a continuation (Kctx).

type Mutation

type Mutation int

Mutation is a type of mutation.

const (
	Invalid Mutation = iota
	// Cached is the mutation that sets the flow's flag.
	Cached
	// Refresh is the mutation that refreshes the status of the flow node.
	Refresh
	// MustIntern sets the flow's MustIntern flag to true.
	MustIntern
	// NoStatus indicates that a flow node's status should not be updated.
	NoStatus
	// Propagate is the mutation that propagates a flow's dependency assertions
	// to the flow's result Fileset.  Results in a no-op if the flow has no result fileset.
	Propagate
)

func (Mutation) String

func (i Mutation) String() string

type Node

type Node struct {
	*Flow
}

Node is a flow node in the dot graph.

func (Node) Attributes

func (n Node) Attributes() []encoding.Attribute

Attributes implments encoding.Attributer.

func (Node) DOTID

func (n Node) DOTID() string

DOTID implements dot.Node.

func (Node) ID

func (n Node) ID() int64

ID is the unique identifier for this node. Implements graph.Node

type Op

type Op int

Op is an enum representing operations that may be performed in a Flow.

const (
	// Exec runs a command in a docker container on the inputs
	// represented by the Flow's dependencies.
	Exec Op = 1 + iota
	// Intern imports datasets from URLs.
	Intern
	// Extern exports values to URLs.
	Extern
	// Groupby applies a regular expression to group an input value.
	Groupby
	// Map applies a function (which returns a Flow) to each element
	// in the input.
	Map
	// Collect filters and rewrites values.
	Collect
	// Merge merges a set of flows.
	Merge
	// Val returns a value.
	Val
	// Pullup merges a set of results into one value.
	Pullup
	// K is a flow continuation.
	K
	// Coerce is a flow value coercion. (Errors allowed.)
	Coerce
	// Requirements modifies the flow's requirements.
	Requirements
	// Data evaluates to a literal (inline) piece of data.
	Data
	// Kctx is a flow continuation with access to the evaluator context.
	Kctx
)

func (Op) DigestString

func (i Op) DigestString() string

func (Op) External

func (o Op) External() bool

External returns whether the op requires external execution.

func (Op) String

func (o Op) String() string

type Repair

type Repair struct {
	// EvalConfig is the repair's configuration. Only Assoc and Repository need
	// to be configured.
	EvalConfig
	// GetLimit is applied to the assoc's get requests.
	GetLimit *limiter.Limiter
	// NumWrites is incremented for each new assoc entry written by the repair job.
	NumWrites int64
	// contains filtered or unexported fields
}

Repair performs cache-repair for flows. Repair can forward-migrate Reflow caches when more key types are added.

Repair works by simulating evaluation using logical cache keys (and performing direct evaluation on the cached metadata where it can) and then writing new cache keys back to the assoc.

func NewRepair

func NewRepair(config EvalConfig) *Repair

NewRepair returns a new repair configured with the provided EvalConfig.

The caller must call (*Repair).Go before submitting evaluations through (*Repair.Do).

func (*Repair) Do

func (r *Repair) Do(ctx context.Context, f *Flow)

Do repairs the flow f. Repair is performed by using cached evaluations to populate values, and, when the cache is missing entries and a value can be computed immediately (i.e., without consulting an executor), computing that value. Flows that are successfully evaluated this way (sustaining no errors) are written back with their completed set of cache keys.

Only OpExec flows are written back.

func (*Repair) Done

func (r *Repair) Done() error

Done should be called after all evaluation is complete. Done returns after all outstanding writebacks have been performed.

func (*Repair) Go

func (r *Repair) Go(ctx context.Context, concurrency int)

Go starts the repair's background writeback threads, writing back to the configured assoc with the provided maximum concurrency.

type SetReserved

type SetReserved reflow.Resources

SetReserved sets the flow's Reserved resources.

type Snapshotter

type Snapshotter interface {
	Snapshot(ctx context.Context, url string) (reflow.Fileset, error)
}

Snapshotter provides an interface for snapshotting source URL data into unloaded filesets.

type State

type State int64

State is an enum representing the state of a Flow node during evaluation.

const (
	// Init indicates that the flow is initialized but not evaluated
	Init State = iota

	// NeedLookup indicates that the evaluator should perform a
	// cache lookup on the flow node.
	NeedLookup
	// Lookup indicates that the evaluator is currently performing a
	// cache lookup of the flow node. After a successful cache lookup,
	// the node is transfered to Done, and the (cached) value is
	// attached to the flow node. The objects may not be transfered into
	// the evaluator's repository.
	Lookup

	// TODO indicates that the evaluator should consider the node
	// for evaluation once its dependencies are completed.
	TODO

	// Ready indicates that the node is ready for evaluation and should
	// be scheduled by the evaluator. A node is ready only once all of its
	// dependent objects are available in the evaluator's repository.
	Ready

	// NeedSubmit indicates the task is ready to be submitted to the
	// scheduler.
	NeedSubmit

	// Running indicates that the node is currently being evaluated by
	// the evaluator.
	Running

	Execing

	// Done indicates that the node has completed evaluation.
	Done

	// Max is the number of flow states.
	Max
)

State denotes a Flow node's state during evaluation. Flows begin their life in Init, where they remain until they are examined by the evaluator. The precise state transitions depend on the evaluation mode (whether it is evaluating bottom-up or top-down, and whether a cache is used), but generally follow the order in which they are laid out here.

func (State) Name

func (s State) Name() string

Name returns the FlowStat's string name.

func (State) String

func (i State) String() string

type Status

type Status string

Status amends the task's status string.

type Value

type Value struct{ Value values.T }

Value is an argument to (*Eval).Mutate to indicate a set-value mutation.

type WaitGroup

type WaitGroup interface {
	Add(int)
	Done()
}

WaitGroup defines a subset of sync.WaitGroup's interface for use with Context.

Notes

Bugs

  • Requirements cannot accurately determine disk space requirements as they depend on object liveness. They are currently treated as entirely ephemeral.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL