reflow

package module
Version: v0.0.0-...-c95a20c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2021 License: Apache-2.0 Imports: 22 Imported by: 23

README

Reflow

Gitter Build Status

Reflow is a system for incremental data processing in the cloud. Reflow enables scientists and engineers to compose existing tools (packaged in Docker images) using ordinary programming constructs. Reflow then evaluates these programs in a cloud environment, transparently parallelizing work and memoizing results. Reflow was created at GRAIL to manage our NGS (next generation sequencing) bioinformatics workloads on AWS, but has also been used for many other applications, including model training and ad-hoc data analyses.

Reflow comprises:

  • a functional, lazy, type-safe domain specific language for writing workflow programs;
  • a runtime for evaluating Reflow programs incrementally, coordinating cluster execution, and transparent memoization;
  • a cluster scheduler to dynamically provision and tear down resources from a cloud provider (AWS currently supported).

Reflow thus allows scientists and engineers to write straightforward programs and then have them transparently executed in a cloud environment. Programs are automatically parallelized and distributed across multiple machines, and redundant computations (even across runs and users) are eliminated by its memoization cache. Reflow evaluates its programs incrementally: whenever the input data or program changes, only those outputs that depend on the changed data or code are recomputed.

In addition to the default cluster computing mode, Reflow programs can also be run locally, making use of the local machine's Docker daemon (including Docker for Mac).

Reflow was designed to support sophisticated, large-scale bioinformatics workflows, but should be widely applicable to scientific and engineering computing workloads. It was built using Go.

Reflow joins a long list of systems designed to tackle bioinformatics workloads, but differ from these in important ways:

  • it is a vertically integrated system with a minimal set of external dependencies; this allows Reflow to be "plug-and-play": bring your cloud credentials, and you're off to the races;
  • it defines a strict data model which is used for transparent memoization and other optimizations;
  • it takes workflow software seriously: the Reflow DSL provides type checking, modularity, and other constructors that are commonplace in general purpose programming languages;
  • because of its high level data model and use of caching, Reflow computes incrementally: it is always able to compute the smallest set of operations given what has been computed previously.

Table of Contents

Getting Reflow

You can get binaries (macOS/amd64, Linux/amd64) for the latest release at the GitHub release page.

If you are developing Reflow, or would like to build it yourself, please follow the instructions in the section "Developing and building Reflow."

Quickstart - AWS

Reflow is distributed with an EC2 cluster manager, and a memoization cache implementation based on S3. These must be configured before use. Reflow maintains a configuration file in $HOME/.reflow/config.yaml by default (this can be overridden with the -config option). Reflow's setup commands modify this file directly. After each step, the current configuration can be examined by running reflow config.

Note Reflow must have access to AWS credentials and configuration in the environment (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION) while running these commands.

% reflow setup-ec2
% reflow config
cluster: ec2cluster
ec2cluster:
  ami: ami-d0e54eb0
  diskspace: 100
  disktype: gp2
  instancetypes:
  - c1.medium
  - c1.xlarge
  - c3.2xlarge
  - c3.4xlarge
  - c3.8xlarge
  - c3.large
  - c3.xlarge
  - c4.2xlarge
  - c4.4xlarge
  - c4.8xlarge
  - c4.large
  - c4.xlarge
  - cc2.8xlarge
  - m1.large
  - m1.medium
  - m1.small
  - m1.xlarge
  - m2.2xlarge
  - m2.4xlarge
  - m2.xlarge
  - m3.2xlarge
  - m3.large
  - m3.medium
  - m3.xlarge
  - m4.16xlarge
  - m4.4xlarge
  - m4.xlarge
  - r4.xlarge
  - t1.micro
  - t2.large
  - t2.medium
  - t2.micro
  - t2.nano
  - t2.small
  keyname: ""
  maxinstances: 10
  region: us-west-2
  securitygroup: <a newly created security group here>
  sshkey: <your public SSH key here>
https: httpsca,$HOME/.reflow/reflow.pem

After running reflow setup-ec2, we see that Reflow created a new security group (associated with the account's default VPC), and configured the cluster to use some default settings. Feel free to edit the configuration file ($HOME/.reflow/config.yaml) to your taste. If you want to use spot instances, add a new key under ec2cluster: spot: true.

Reflow only configures one security group per account: Reflow will reuse a previously created security group if reflow setup-ec2 is run anew. See reflow setup-ec2 -help for more details.

Next, we'll set up a cache. This isn't strictly necessary, but we'll need it in order to use many of Reflow's sophisticated caching and incremental computation features. On AWS, Reflow implements a cache based on S3 and DynamoDB. A new S3-based cache is provisioned by reflow setup-s3-repository and reflow setup-dynamodb-assoc, each of which takes one argument naming the S3 bucket and DynamoDB table name to be used, respectively. The S3 bucket is used to store file objects while the DynamoDB table is used to store associations between logically named computations and their concrete output. Note that S3 bucket names are global, so pick a name that's likely to be unique.

% reflow setup-s3-repository reflow-quickstart-cache
reflow: creating s3 bucket reflow-quickstart-cache
reflow: created s3 bucket reflow-quickstart-cache
% reflow setup-dynamodb-assoc reflow-quickstart
reflow: creating DynamoDB table reflow-quickstart
reflow: created DynamoDB table reflow-quickstart
% reflow config
assoc: dynamodb,reflow-quickstart
repository: s3,reflow-quickstart-cache

<rest is same as before>

The setup commands created the S3 bucket and DynamoDB table as needed, and modified the configuration accordingly.

We're now ready to run our first "hello world" program!

Create a file called "hello.rf" with the following contents:

val Main = exec(image := "ubuntu", mem := GiB) (out file) {"
	echo hello world >>{{out}}
"}

and run it:

% reflow run hello.rf
reflow: run ID: 6da656d1
	ec2cluster: 0 instances:  (<=$0.0/hr), total{}, waiting{mem:1.0GiB cpu:1 disk:1.0GiB
reflow: total n=1 time=0s
        ident      n   ncache transfer runtime(m) cpu mem(GiB) disk(GiB) tmp(GiB)
        hello.Main 1   1      0B

a948904f

Here, Reflow started a new t2.small instance (Reflow matches the workload with available instance types), ran echo hello world inside of an Ubuntu container, placed the output in a file, and returned its SHA256 digest. (Reflow represents file contents using their SHA256 digest.)

We're now ready to explore Reflow more fully.

Simple bioinformatics workflow

Let's explore some of Reflow's features through a simple task: aligning NGS read data from the 1000genomes project. Create a file called "align.rf" with the following. The code is commented inline for clarity.

// In order to align raw NGS data, we first need to construct an index
// against which to perform the alignment. We're going to be using
// the BWA aligner, and so we'll need to retrieve a reference sequence
// and create an index that's usable from BWA.

// g1kv37 is a human reference FASTA sequence. (All
// chromosomes.) Reflow has a static type system, but most type
// annotations can be omitted: they are inferred by Reflow. In this
// case, we're creating a file: a reference to the contents of the
// named URL. We're retrieving data from the public 1000genomes S3
// bucket.
val g1kv37 = file("s3://1000genomes/technical/reference/human_g1k_v37.fasta.gz")

// Here we create an indexed version of the g1kv37 reference. It is
// created using the "bwa index" command with the raw FASTA data as
// input. Here we encounter another way to produce data in reflow:
// the exec. An exec runs a (Bash) script inside of a Docker image,
// placing the output in files or directories (or both: execs can
// return multiple values). In this case, we're returning a
// directory since BWA stores multiple index files alongside the raw
// reference. We also declare that the image to be used is
// "biocontainers/bwa" (the BWA image maintained by the
// biocontainers project).
//
// Inside of an exec template (delimited by {" and "}) we refer to
// (interpolate) values in our environment by placing expressions
// inside of the {{ and }} delimiters. In this case we're referring
// to the file g1kv37 declared above, and our output, named out.
//
// Many types of expressions can be interpolated inside of an exec,
// for example strings, integers, files, and directories. Strings
// and integers are rendered using their normal representation,
// files and directories are materialized to a local path before
// starting execution. Thus, in this case, {{g1kv37}} is replaced at
// runtime by a path on disk with a file with the contents of the
// file g1kv37 (i.e.,
// s3://1000genomes/technical/reference/human_g1k_v37.fasta.gz)
val reference = exec(image := "biocontainers/bwa:v0.7.15_cv3", mem := 6*GiB, cpu := 1) (out dir) {"
	# Ignore failures here. The file from 1000genomes has a trailer
	# that isn't recognized by gunzip. (This is not recommended practice!)
	gunzip -c {{g1kv37}} > {{out}}/g1k_v37.fa || true
	cd {{out}}
	bwa index -a bwtsw g1k_v37.fa
"}

// Now that we have defined a reference, we can define a function to
// align a pair of reads against the reference, producing an output
// SAM-formatted file. Functions compute expressions over a set of
// abstract parameters, in this case, a pair of read files. Unlike almost
// everywhere else in Reflow, function parameters must be explicitly
// typed.
//
// (Note that we're using a syntactic short-hand here: parameter lists can 
// be abbreviated. "r1, r2 file" is equivalent to "r1 file, r2 file".)
//
// The implementation of align is a straightforward invocation of "bwa mem".
// Note that "r1" and "r2" inside of the exec refer to the function arguments,
// thus align can be invoked for any set of r1, r2.
func align(r1, r2 file) = 
	exec(image := "biocontainers/bwa:v0.7.15_cv3", mem := 20*GiB, cpu := 16) (out file) {"
        bwa mem -M -t 16 {{reference}}/g1k_v37.fa {{r1}} {{r2}} > {{out}}
	"}

// We're ready to test our workflow now. We pick an arbitrary read
// pair from the 1000genomes data set, and invoke align. There are a
// few things of note here. First is the identifier "Main". This
// names the expression that's evaluated by `reflow run` -- the
// entry point of the computation. Second, we've defined Main to be
// a block. A block is an expression that contains one or more
// definitions followed by an expression. The value of a block is the
// final expression. Finally, Main contains a @requires annotation.
// This instructs Reflow how many resources to reserve for the work
// being done. Note that, because Reflow is able to distribute work,
// if a single instance is too small to execute fully in parallel,
// Reflow will provision additional compute instances to help along.
// @requires thus denotes the smallest possible instance
// configuration that's required for the program.
@requires(cpu := 16, mem := 24*GiB, disk := 50*GiB)	
val Main = {
	r1 := file("s3://1000genomes/phase3/data/HG00103/sequence_read/SRR062640_1.filt.fastq.gz")
	r2 := file("s3://1000genomes/phase3/data/HG00103/sequence_read/SRR062640_2.filt.fastq.gz")
	align(r1, r2)
}

Now we're ready to run our module. First, let's run reflow doc. This does two things. First, it typechecks the module (and any dependent modules), and second, it prints documentation for the public declarations in the module. Identifiers that begin with an uppercase letter are public (and may be used from other modules); others are not.

% reflow doc align.rf
Declarations

val Main (out file)
    We're ready to test our workflow now. We pick an arbitrary read pair from the
    1000genomes data set, and invoke align. There are a few things of note here.
    First is the identifier "Main". This names the expression that's evaluated by
    `reflow run` -- the entry point of the computation. Second, we've defined Main
    to be a block. A block is an expression that contains one or more definitions
    followed by an expression. The value of a block is the final expression. Finally,
    Main contains a @requires annotation. This instructs Reflow how many resources
    to reserve for the work being done. Note that, because Reflow is able to
    distribute work, if a single instance is too small to execute fully in parallel,
    Reflow will provision additional compute instances to help along. @requires thus
    denotes the smallest possible instance configuration that's required for the
    program.

Then let's run it:

% reflow run align.rf
reflow: run ID: 82e63a7a
ec2cluster: 1 instances: c5.4xlarge:1 (<=$0.7/hr), total{mem:29.8GiB cpu:16 disk:250.0GiB intel_avx512:16}, waiting{}, pending{}
82e63a7a: elapsed: 2m30s, executing:1, completed: 3/5
  align.reference:  exec ..101f9a082e1679c16d23787c532a0107537c9c # Ignore failures here. The f..bwa index -a bwtsw g1k_v37.fa  2m4s

Reflow launched a new instance: the previously launched instance (a t2.small) was not big enough to fit the requirements of align.rf. Note also that Reflow assigns a run name for each reflow run invocation. This can be used to look up run details with the reflow info command. In this case:

% reflow info 82e63a7a
82e63a7aee201d137f8ade3d584c234b856dc6bdeba00d5d6efc9627bd988a68 (run)
    time:      Wed Dec 12 10:45:04 2018
    program:   /Users/you/align.rf
    phase:     Eval
    alloc:     ec2-34-213-42-76.us-west-2.compute.amazonaws.com:9000/5a0adaf6c879efb1
    resources: {mem:28.9GiB cpu:16 disk:245.1GiB intel_avx:16 intel_avx2:16 intel_avx512:16}
    log:       /Users/you/.reflow/runs/82e63a7aee201d137f8ade3d584c234b856dc6bdeba00d5d6efc9627bd988a68.execlog

Here we see that the run is currently being performed on the alloc named ec2-34-213-42-76.us-west-2.compute.amazonaws.com:9000/5a0adaf6c879efb1. An alloc is a resource reservation on a single machine. A run can make use of multiple allocs to distribute work across multiple machines. The alloc is a URI, and the first component is the real hostname. You can ssh into the host in order to inspect what's going on. Reflow launched the instance with your public SSH key (as long as it was set up by reflow setup-ec2, and $HOME/.ssh/id_rsa.pub existed at that time).

% ssh core@ec2-34-213-42-76.us-west-2.compute.amazonaws.com
...

As the run progresses, Reflow prints execution status of each task on the console.

...
align.Main.r2:    intern s3://1000genomes/phase3/data/HG00103/sequence_read/SRR062640_2.filt.fastq.gz                         23s
align.Main.r1:    intern done 1.8GiB                                                                                          23s
align.g1kv37:     intern done 851.0MiB                                                                                        23s
align.reference:  exec ..101f9a082e1679c16d23787c532a0107537c9c # Ignore failures here. The f..bwa index -a bwtsw g1k_v37.fa  6s

Here, Reflow started downloading r1 and r2 in parallel with creating the reference. Creating the reference is an expensive operation. We can examine it while it's running with reflow ps:

% reflow ps 
3674721e align.reference 10:46AM 0:00 running 4.4GiB 1.0 6.5GiB bwa

This tells us that the only task that's currently running is bwa to compute the reference. It's currently using 4.4GiB of memory, 1 cores, and 6.5GiB GiB of disk space. By passing the -l option, reflow ps also prints the task's exec URI.

% reflow ps -l
3674721e align.reference 10:46AM 0:00 running 4.4GiB 1.0 6.5GiB bwa ec2-34-213-42-76.us-west-2.compute.amazonaws.com:9000/5a0adaf6c879efb1/3674721e2d9e80b325934b08973fb3b1d3028b2df34514c9238be466112eb86e

An exec URI is a handle to the actual task being executed. It globally identifies all tasks, and can be examined with reflow info:

% reflow info ec2-34-213-42-76.us-west-2.compute.amazonaws.com:9000/5a0adaf6c879efb1/3674721e2d9e80b325934b08973fb3b1d3028b2df34514c9238be466112eb86e
ec2-34-213-42-76.us-west-2.compute.amazonaws.com:9000/5a0adaf6c879efb1/3674721e2d9e80b325934b08973fb3b1d3028b2df34514c9238be466112eb86e (exec)
    state: running
    type:  exec
    ident: align.reference
    image: index.docker.io/biocontainers/bwa@sha256:0529e39005e35618c4e52f8f56101f9a082e1679c16d23787c532a0107537c9c
    cmd:   "\n\t# Ignore failures here. The file from 1000genomes has a trailer\n\t# that isn't recognized by gunzip. (This is not recommended practice!)\n\tgunzip -c {{arg[0]}} > {{arg[1]}}/g1k_v37.fa || true\n\tcd {{arg[2]}}\n\tbwa index -a bwtsw g1k_v37.fa\n"
      arg[0]:
        .: sha256:8b6c538abf0dd92d3f3020f36cc1dd67ce004ffa421c2781205f1eb690bdb442 (851.0MiB)
      arg[1]: output 0
      arg[2]: output 0
    top:
         bwa index -a bwtsw g1k_v37.fa

Here, Reflow tells us that the currently running process is "bwa index...", its template command, and the SHA256 digest of its inputs. Programs often print helpful output to standard error while working; this output can be examined with reflow logs:

% reflow logs ec2-34-221-0-157.us-west-2.compute.amazonaws.com:9000/0061a20f88f57386/3674721e2d9e80b325934b08973fb3b1d3028b2df34514c9238be466112eb86e

gzip: /arg/0/0: decompression OK, trailing garbage ignored
[bwa_index] Pack FASTA... 18.87 sec
[bwa_index] Construct BWT for the packed sequence...
[BWTIncCreate] textLength=6203609478, availableWord=448508744
[BWTIncConstructFromPacked] 10 iterations done. 99999990 characters processed.
[BWTIncConstructFromPacked] 20 iterations done. 199999990 characters processed.
[BWTIncConstructFromPacked] 30 iterations done. 299999990 characters processed.
[BWTIncConstructFromPacked] 40 iterations done. 399999990 characters processed.
[BWTIncConstructFromPacked] 50 iterations done. 499999990 characters processed.
[BWTIncConstructFromPacked] 60 iterations done. 599999990 characters processed.
[BWTIncConstructFromPacked] 70 iterations done. 699999990 characters processed.
[BWTIncConstructFromPacked] 80 iterations done. 799999990 characters processed.
[BWTIncConstructFromPacked] 90 iterations done. 899999990 characters processed.
[BWTIncConstructFromPacked] 100 iterations done. 999999990 characters processed.
[BWTIncConstructFromPacked] 110 iterations done. 1099999990 characters processed.
[BWTIncConstructFromPacked] 120 iterations done. 1199999990 characters processed.
[BWTIncConstructFromPacked] 130 iterations done. 1299999990 characters processed.
[BWTIncConstructFromPacked] 140 iterations done. 1399999990 characters processed.
[BWTIncConstructFromPacked] 150 iterations done. 1499999990 characters processed.
[BWTIncConstructFromPacked] 160 iterations done. 1599999990 characters processed.
[BWTIncConstructFromPacked] 170 iterations done. 1699999990 characters processed.
[BWTIncConstructFromPacked] 180 iterations done. 1799999990 characters processed.
[BWTIncConstructFromPacked] 190 iterations done. 1899999990 characters processed.
[BWTIncConstructFromPacked] 200 iterations done. 1999999990 characters processed.
[BWTIncConstructFromPacked] 210 iterations done. 2099999990 characters processed.
[BWTIncConstructFromPacked] 220 iterations done. 2199999990 characters processed.
[BWTIncConstructFromPacked] 230 iterations done. 2299999990 characters processed.
[BWTIncConstructFromPacked] 240 iterations done. 2399999990 characters processed.
[BWTIncConstructFromPacked] 250 iterations done. 2499999990 characters processed.
[BWTIncConstructFromPacked] 260 iterations done. 2599999990 characters processed.
[BWTIncConstructFromPacked] 270 iterations done. 2699999990 characters processed.
[BWTIncConstructFromPacked] 280 iterations done. 2799999990 characters processed.
[BWTIncConstructFromPacked] 290 iterations done. 2899999990 characters processed.
[BWTIncConstructFromPacked] 300 iterations done. 2999999990 characters processed.
[BWTIncConstructFromPacked] 310 iterations done. 3099999990 characters processed.
[BWTIncConstructFromPacked] 320 iterations done. 3199999990 characters processed.
[BWTIncConstructFromPacked] 330 iterations done. 3299999990 characters processed.
[BWTIncConstructFromPacked] 340 iterations done. 3399999990 characters processed.
[BWTIncConstructFromPacked] 350 iterations done. 3499999990 characters processed.
[BWTIncConstructFromPacked] 360 iterations done. 3599999990 characters processed.
[BWTIncConstructFromPacked] 370 iterations done. 3699999990 characters processed.
[BWTIncConstructFromPacked] 380 iterations done. 3799999990 characters processed.
[BWTIncConstructFromPacked] 390 iterations done. 3899999990 characters processed.
[BWTIncConstructFromPacked] 400 iterations done. 3999999990 characters processed.
[BWTIncConstructFromPacked] 410 iterations done. 4099999990 characters processed.
[BWTIncConstructFromPacked] 420 iterations done. 4199999990 characters processed.
[BWTIncConstructFromPacked] 430 iterations done. 4299999990 characters processed.
[BWTIncConstructFromPacked] 440 iterations done. 4399999990 characters processed.
[BWTIncConstructFromPacked] 450 iterations done. 4499999990 characters processed.

At this point, it looks like everything is running as expected. There's not much more to do than wait. Note that, while creating an index takes a long time, Reflow only has to compute it once. When it's done, Reflow memoizes the result, uploading the resulting data directly to the configured S3 cache bucket. The next time the reference expression is encountered, Reflow will use the previously computed result. If the input file changes (e.g., we decide to use another reference sequence), Reflow will recompute the index again. The same will happen if the command (or Docker image) that's used to compute the index changes. Reflow keeps track of all the dependencies for a particular sub computation, and recomputes them only when dependencies have changed. This way, we always know what is being computed is correct (the result is the same as if we had computed the result from scratch), but avoid paying the cost of redundant computation.

After a little while, the reference will have finished generating, and Reflow begins alignment. Here, Reflow reports that the reference took 52 minutes to compute, and produced 8 GiB of output.

  align.reference:  exec done 8.0GiB                                                                                            52m37s
  align.align:      exec ..101f9a082e1679c16d23787c532a0107537c9c bwa mem -M -t 16 {{reference}..37.fa {{r1}} {{r2}} > {{out}}  4s

If we query ("info") the reference exec again, Reflow reports precisely what was produced:

% reflow info ec2-34-221-0-157.us-west-2.compute.amazonaws.com:9000/0061a20f88f57386/3674721e2d9e80b325934b08973fb3b1d3028b2df34514c9238be466112eb86e
ec2-34-221-0-157.us-west-2.compute.amazonaws.com:9000/0061a20f88f57386/3674721e2d9e80b325934b08973fb3b1d3028b2df34514c9238be466112eb86e (exec)
    state: complete
    type:  exec
    ident: align.reference
    image: index.docker.io/biocontainers/bwa@sha256:0529e39005e35618c4e52f8f56101f9a082e1679c16d23787c532a0107537c9c
    cmd:   "\n\t# Ignore failures here. The file from 1000genomes has a trailer\n\t# that isn't recognized by gunzip. (This is not recommended practice!)\n\tgunzip -c {{arg[0]}} > {{arg[1]}}/g1k_v37.fa || true\n\tcd {{arg[2]}}\n\tbwa index -a bwtsw g1k_v37.fa\n"
      arg[0]:
        .: sha256:8b6c538abf0dd92d3f3020f36cc1dd67ce004ffa421c2781205f1eb690bdb442 (851.0MiB)
      arg[1]: output 0
      arg[2]: output 0
    result:
      list[0]:
        g1k_v37.fa:     sha256:2f9cd9e853a9284c53884e6a551b1c7284795dd053f255d630aeeb114d1fa81f (2.9GiB)
        g1k_v37.fa.amb: sha256:dd51a07041a470925c1ebba45c2f534af91d829f104ade8fc321095f65e7e206 (6.4KiB)
        g1k_v37.fa.ann: sha256:68928e712ef48af64c5b6a443f2d2b8517e392ae58b6a4ab7191ef7da3f7930e (6.7KiB)
        g1k_v37.fa.bwt: sha256:2aec938930b8a2681eb0dfbe4f865360b98b2b6212c1fb9f7991bc74f72d79d8 (2.9GiB)
        g1k_v37.fa.pac: sha256:d62039666da85d859a29ea24af55b3c8ffc61ddf02287af4d51b0647f863b94c (739.5MiB)
        g1k_v37.fa.sa:  sha256:99eb6ff6b54fba663c25e2642bb2a6c82921c931338a7144327c1e3ee99a4447 (1.4GiB)

In this case, "bwa index" produced a number of auxiliary index files. These are the contents of the "reference" directory.

We can again query Reflow for running execs, and examine the alignment. We see now that the reference is passed in (argument 0), along side the read pairs (arguments 1 and 2).

% reflow ps -l
6a6c36f5 align.align 5:12PM 0:00 running 5.9GiB 12.3 0B  bwa ec2-34-221-0-157.us-west-2.compute.amazonaws.com:9000/0061a20f88f57386/6a6c36f5da6ee387510b0b61d788d7e4c94244d61e6bc621b43f59a73443a755
% reflow info ec2-34-221-0-157.us-west-2.compute.amazonaws.com:9000/0061a20f88f57386/6a6c36f5da6ee387510b0b61d788d7e4c94244d61e6bc621b43f59a73443a755
ec2-34-221-0-157.us-west-2.compute.amazonaws.com:9000/0061a20f88f57386/6a6c36f5da6ee387510b0b61d788d7e4c94244d61e6bc621b43f59a73443a755 (exec)
    state: running
    type:  exec
    ident: align.align
    image: index.docker.io/biocontainers/bwa@sha256:0529e39005e35618c4e52f8f56101f9a082e1679c16d23787c532a0107537c9c
    cmd:   "\n\t\tbwa mem -M -t 16 {{arg[0]}}/g1k_v37.fa {{arg[1]}} {{arg[2]}} > {{arg[3]}}\n\t"
      arg[0]:
        g1k_v37.fa:     sha256:2f9cd9e853a9284c53884e6a551b1c7284795dd053f255d630aeeb114d1fa81f (2.9GiB)
        g1k_v37.fa.amb: sha256:dd51a07041a470925c1ebba45c2f534af91d829f104ade8fc321095f65e7e206 (6.4KiB)
        g1k_v37.fa.ann: sha256:68928e712ef48af64c5b6a443f2d2b8517e392ae58b6a4ab7191ef7da3f7930e (6.7KiB)
        g1k_v37.fa.bwt: sha256:2aec938930b8a2681eb0dfbe4f865360b98b2b6212c1fb9f7991bc74f72d79d8 (2.9GiB)
        g1k_v37.fa.pac: sha256:d62039666da85d859a29ea24af55b3c8ffc61ddf02287af4d51b0647f863b94c (739.5MiB)
        g1k_v37.fa.sa:  sha256:99eb6ff6b54fba663c25e2642bb2a6c82921c931338a7144327c1e3ee99a4447 (1.4GiB)
      arg[1]:
        .: sha256:0c1f85aa9470b24d46d9fc67ba074ca9695d53a0dee580ec8de8ed46ef347a85 (1.8GiB)
      arg[2]:
        .: sha256:47f5e749123d8dda92b82d5df8e32de85273989516f8e575d9838adca271f630 (1.7GiB)
      arg[3]: output 0
    top:
         /bin/bash -e -l -o pipefail -c ..bwa mem -M -t 16 /arg/0/0/g1k_v37.fa /arg/1/0 /arg/2/0 > /return/0 .
         bwa mem -M -t 16 /arg/0/0/g1k_v37.fa /arg/1/0 /arg/2/0

Note that the read pairs are files. Files in Reflow do not have names; they are just blobs of data. When Reflow runs a process that requires input files, those anonymous files are materialized on disk, but the filenames are not meaningful. In this case, we can see from the "top" output (these are the actual running processes, as reported by the OS), that the r1 ended up being called "/arg/1/0" and r2 "/arg/2/0". The output is a file named "/return/0".

Finally, alignment is complete. Aligning a single read pair took around 19m, and produced 13.2 GiB of output. Upon completion, Reflow prints runtime statistics and the result.

reflow: total n=5 time=1h9m57s
        ident           n   ncache transfer runtime(m) cpu            mem(GiB)    disk(GiB)      tmp(GiB)
        align.align     1   0      0B       17/17/17   15.6/15.6/15.6 7.8/7.8/7.8 12.9/12.9/12.9 0.0/0.0/0.0
        align.Main.r2   1   0      0B
        align.Main.r1   1   0      0B
        align.reference 1   0      0B       51/51/51   1.0/1.0/1.0    4.4/4.4/4.4 6.5/6.5/6.5    0.0/0.0/0.0
        align.g1kv37    1   0      0B

becb0485

Reflow represents file values by the SHA256 digest of the file's content. In this case, that's not very useful: you want the file, not its digest. Reflow provides mechanisms to export data. In this case let's copy the resulting file to an S3 bucket.

We'll make use of the "files" system module to copy the aligned file to an external S3 bucket. Modify align.rf's Main to the following (but pick an S3 bucket you own), and then run it again. Commentary is inline for clarity.

@requires(cpu := 16, mem := 24*GiB, disk := 50*GiB)	
val Main = {
	r1 := file("s3://1000genomes/phase3/data/HG00103/sequence_read/SRR062640_1.filt.fastq.gz")
	r2 := file("s3://1000genomes/phase3/data/HG00103/sequence_read/SRR062640_2.filt.fastq.gz")
	// Instantiate the system modules "files" (system modules begin
	// with $), assigning its instance to the "files" identifier. To
	// view the documentation for this module, run `reflow doc
	// $/files`.
	files := make("$/files")
	// As before.
	aligned := align(r1, r2)
	// Use the files module's Copy function to copy the aligned file to
	// the provided destination.
	files.Copy(aligned, "s3://marius-test-bucket/aligned.sam")
}

And run it again:

% reflow run align.rf
reflow: run ID: 9f0f3596
reflow: total n=2 time=1m9s
        ident         n   ncache transfer runtime(m) cpu mem(GiB) disk(GiB) tmp(GiB)
        align_2.align 1   1      0B
        align_2.Main  1   0      13.2GiB

val<.=becb0485 13.2GiB>

Here we see that Reflow did not need to recompute the aligned file; it is instead retrieved from cache. The reference index generation is skipped altogether. Status lines that indicate "xfer" (instead of "run") means that Reflow is performing a cache transfer in place of running the computation. Reflow claims to have transferred a 13.2 GiB file to s3://marius-test-bucket/aligned.sam. Indeed it did:

% aws s3 ls s3://marius-test-bucket/aligned.sam
2018-12-13 16:29:49 14196491221 aligned.sam.

1000align

This code was modularized and generalized in 1000align. Here, fastq, bam, and alignment utilities are split into their own parameterized modules. The toplevel module, 1000align, is instantiated from the command line. Command line invocations (reflow run) can pass module parameters through flags (strings, booleans, and integers):

% reflow run 1000align.rf -help
usage of 1000align.rf:
  -out string
        out is the target of the output merged BAM file (required)
  -sample string
        sample is the name of the 1000genomes phase 3 sample (required)

For example, to align the full sample from above, we can invoke 1000align.rf with the following arguments:

% reflow run 1000align.rf -sample HG00103 -out s3://marius-test-bucket/HG00103.bam

In this case, if your account limits allow it, Reflow will launch additional EC2 instances in order to further parallelize the work to be done. (Since we're aligning multiple pairs of FASTQ files). In this run, we can see that Reflow is aligning 5 pairs in parallel across 2 instances (four can fit on the initial m4.16xlarge instance).

% reflow ps -l
e74d4311 align.align.sam 11:45AM 0:00 running 10.9GiB 31.8 6.9GiB   bwa ec2-34-210-201-193.us-west-2.compute.amazonaws.com:9000/6a7ffa00d6b0d9e1/e74d4311708f1c9c8d3894a06b59029219e8a545c69aa79c3ecfedc1eeb898f6
59c561be align.align.sam 11:45AM 0:00 running 10.9GiB 32.7 6.4GiB   bwa ec2-34-210-201-193.us-west-2.compute.amazonaws.com:9000/6a7ffa00d6b0d9e1/59c561be5f627143108ce592d640126b88c23ba3d00974ad0a3c801a32b50fbe
ba688daf align.align.sam 11:47AM 0:00 running 8.7GiB  22.6 2.9GiB   bwa ec2-18-236-233-4.us-west-2.compute.amazonaws.com:9000/ae348d6c8a33f1c9/ba688daf5d50db514ee67972ec5f0a684f8a76faedeb9a25ce3d412e3c94c75c
0caece7f align.align.sam 11:47AM 0:00 running 8.7GiB  25.9 3.4GiB   bwa ec2-18-236-233-4.us-west-2.compute.amazonaws.com:9000/ae348d6c8a33f1c9/0caece7f38dc3d451d2a7411b1fcb375afa6c86a7b0b27ba7dd1f9d43d94f2f9
0b59e00c align.align.sam 11:47AM 0:00 running 10.4GiB 22.9 926.6MiB bwa ec2-18-236-233-4.us-west-2.compute.amazonaws.com:9000/ae348d6c8a33f1c9/0b59e00c848fa91e3b0871c30da3ed7e70fbc363bdc48fb09c3dfd61684c5fd9

When it completes, an approximately 17GiB BAM file is deposited to s3:

% aws s3 ls s3://marius-test-bucket/HG00103.bam
2018-12-14 15:27:33 18761607096 HG00103.bam.

A note on Reflow's EC2 cluster manager

Reflow comes with a built-in cluster manager, which is responsible for elastically increasing or decreasing required compute resources. The AWS EC2 cluster manager keeps track of instance type availability and account limits, and uses these to launch the most appropriate set of instances for a given job. When instances become idle, they will terminate themselves if they are idle for more than 10 minutes; idle instances are reused when possible.

The cluster manager may be configured under the "ec2cluster" key in Reflow's configuration. Its parameters are documented by godoc. (Formal documentation is forthcoming.)

Documentation

Developing and building Reflow

Reflow is implemented in Go, and its packages are go-gettable. Reflow is also a Go module and uses modules to fix its dependency graph.

After checking out the repository, the usual go commands should work, e.g.:

% go test ./...

The package github.com/grailbio/reflow/cmd/reflow (or subdirectory cmd/reflow in the repository) defines the main command for Reflow. Because Reflow relies on being able to distribute its current build, the binary must be built using the buildreflow tool instead of the ordinary Go tooling. Command buildreflow acts like go build, but also cross compiles the binary for the remote target (Linux/amd64 currently supported), and embeds the cross-compiled binary.

% cd $CHECKOUT/cmd/reflow
% go install github.com/grailbio/reflow/cmd/buildreflow
% buildreflow

Debugging Reflow runs

The $HOME/.reflow/runs directory contains logs, traces and other information for each Reflow run. If the run you're looking for is no longer there, the info and cat tools can be used if you have the run ID:

% reflow info 2fd5a9b6
runid    user       start   end    ExecLog  SysLog   EvalGraph Trace
2fd5a9b6 username   4:41PM  4:41PM 29a4b506 41a8594d 90f40bfc  4ec75aac

% reflow cat 29a4b506 > /tmp/29a4b506.execlog

For more information about tracing, see: doc/tracing.md.

Support and community

Please join us on on Gitter or on the mailing list to discuss Reflow.

Documentation

Overview

Package reflow implements the core data structures and (abstract) runtime for Reflow.

Reflow is a system for distributed program execution. The programs are described by Flows, which are an abstract specification of the program's execution. Each Flow node can take any number of other Flows as dependent inputs and perform some (local) execution over these inputs in order to compute some output value.

Reflow supports a limited form of dynamic dependencies: a Flow may evaluate to a list of values, each of which may be executed independently. This mechanism also provides parallelism.

The system orchestrates Flow execution by evaluating the flow in the manner of an abstract syntax tree; see Eval for more details.

Index

Constants

This section is empty.

Variables

Functions

func AssertExact

func AssertExact(_ context.Context, source, target []*Assertions) bool

AssertExact implements Assert for an exact match. That is, for each key in target, the value should match exactly what's in src and target can't contain keys missing in src.

func AssertNever

func AssertNever(_ context.Context, _, _ []*Assertions) bool

AssertNever implements Assert for an always match (ie, never assert).

func PrettyDiff

func PrettyDiff(lefts, rights []*Assertions) string

PrettyDiff returns a pretty-printable string representing the differences between the set of Assertions in lefts and rights. Specifically only these differences are relevant: - any key present in any of the rights but not in lefts. - any entry (in any of the rights) with a mismatching assertion (in any of the lefts). TODO(swami): Add unit tests.

func SetFilesetOpConcurrencyLimit

func SetFilesetOpConcurrencyLimit(limit int)

SetFilesetOpConcurrencyLimit sets the limit of concurrent fileset operations. For a successful reset of the limit, this should be called before a call is made to GetFilesetOpLimiter, ie before any reflow evaluations have started, otherwise this will panic.

Types

type Arg

type Arg struct {
	// Out is true if this is an output argument.
	Out bool
	// Fileset is the fileset used as an input argument.
	Fileset *Fileset `json:",omitempty"`
	// Index is the output argument index.
	Index int
}

Arg represents an exec argument (either input or output).

type Assert

type Assert func(ctx context.Context, source, target []*Assertions) bool

Assert asserts whether the target set of assertions are compatible with the src set. Compatibility is directional and this strictly determines if the target is compatible with src and Assert(target, src) may not yield the same result.

type AssertionGenerator

type AssertionGenerator interface {
	// Generate computes assertions for a given AssertionKey.
	Generate(ctx context.Context, key AssertionKey) (*Assertions, error)
}

AssertionGenerator generates assertions based on a AssertionKey. Implementations are specific to a namespace and generate assertions for a given subject.

type AssertionGeneratorMux

type AssertionGeneratorMux map[string]AssertionGenerator

GeneratorMux multiplexes a number of AssertionGenerator implementations based on the namespace.

func (AssertionGeneratorMux) Generate

Generate implements the AssertionGenerator interface for AttributerMux.

type AssertionKey

type AssertionKey struct {
	Subject, Namespace string
}

AssertionKey represents a subject within a namespace whose properties can be asserted. - Subject represents the unique entity within the Namespace to which this Assertion applies.

(eg: full path to blob object, a Docker Image, etc)

- Namespace represents the namespace to which the subject of this Assertion belongs.

(eg: "blob" for blob objects, "docker" for docker images, etc)

func (AssertionKey) Less

func (a AssertionKey) Less(b AssertionKey) bool

Less returns whether the given AssertionKey is lexicographically smaller than this one.

type Assertions

type Assertions struct {
	// contains filtered or unexported fields
}

Assertions represent a collection of AssertionKeys with specific values for various properties of theirs. Assertions are immutable and constructed in one of the following ways:

NewAssertions: creates an empty Assertions and is typically used when subsequent operations are to AddFrom.
AssertionsFromEntry: creates Assertions from a single entry mapping an AssertionKey
to various properties (within the key's Namespace) of the named Subject in the key.
AssertionsFromMap: creates Assertions from a mapping of AssertionKey to properties.
MergeAssertions: merges a list of Assertions into a single Assertions.

func AssertionsFromEntry

func AssertionsFromEntry(k AssertionKey, v map[string]string) *Assertions

AssertionsFromEntry creates an Assertions from a single entry. It is similar to AssertionsFromMap and exists for convenience.

func AssertionsFromMap

func AssertionsFromMap(m map[AssertionKey]map[string]string) *Assertions

AssertionsFromMap creates an Assertions from a given mapping of AssertionKey to a map representing its property names and corresponding values.

func DistinctAssertions

func DistinctAssertions(list ...*Assertions) ([]*Assertions, int)

DistinctAssertions returns the distinct list of non-empty Assertions from the given list and a total size.

func MergeAssertions

func MergeAssertions(list ...*Assertions) (*Assertions, error)

MergeAssertions merges a list of Assertions into a single Assertions. Returns an error if the same key maps to a conflicting value as a result of the merge.

func NewAssertions

func NewAssertions() *Assertions

NewAssertions creates a new (empty) Assertions object.

func (*Assertions) Digest

func (s *Assertions) Digest() digest.Digest

Digest returns the assertions' digest.

func (*Assertions) Equal

func (s *Assertions) Equal(t *Assertions) bool

Equal returns whether the given Assertions is equal to this one.

func (*Assertions) IsEmpty

func (s *Assertions) IsEmpty() bool

IsEmpty returns whether this is empty, which it is if its a nil reference or has no entries.

func (*Assertions) PrettyDiff

func (s *Assertions) PrettyDiff(t *Assertions) string

PrettyDiff returns a pretty-printable string representing the differences in the given Assertions that conflict with this one. Specifically only these differences are relevant: - any key present in t but not in s. - any entry with a mismatching assertion in t and s.

func (*Assertions) Short

func (s *Assertions) Short() string

Short returns a short, string representation of assertions.

func (*Assertions) String

func (s *Assertions) String() string

String returns a full, human-readable string representing the assertions.

type Cache

type Cache interface {
	// Lookup returns the value associated with a (digest) key.
	// Lookup returns an error flagged errors.NotExist when there
	// is no such value.
	//
	// Lookup should also check to make sure that the objects
	// actually exist, and provide a reasonable guarantee that they'll
	// be available for transfer.
	//
	// TODO(marius): allow the caller to maintain a lease on the desired
	// objects so that garbage collection can (safely) be run
	// concurrently with flows. This isn't a correctness concern (the
	// flows may be restarted), but rather one of efficiency.
	Lookup(context.Context, digest.Digest) (Fileset, error)

	// Transfer transmits the file objects associated with value v
	// (usually retrieved by Lookup) to the repository dst. Transfer
	// should be used in place of direct (cache) repository access since
	// it may apply additional policies (e.g., rate limiting, etc.)
	Transfer(ctx context.Context, dst Repository, v Fileset) error

	// NeedTransfer returns the set of files in the Fileset v that are absent
	// in the provided repository.
	NeedTransfer(ctx context.Context, dst Repository, v Fileset) ([]File, error)

	// Write stores the Value v, whose file objects exist in Repository repo,
	// under the key id. If the repository is nil no objects are transferred.
	Write(ctx context.Context, id digest.Digest, v Fileset, repo Repository) error

	// Delete removes the value named by id from this cache.
	Delete(ctx context.Context, id digest.Digest) error

	// Repository returns this cache's underlying repository. It should
	// not be used for data transfer during the course of evaluation; see
	// Transfer.
	Repository() Repository
}

A Cache stores Values and their associated File objects for later retrieval. Caches may be temporary: objects are not guaranteed to persist.

type Exec

type Exec interface {
	// ID returns the digest of the exec. This is equivalent to the Digest of the value computed
	// by the Exec.
	ID() digest.Digest

	// URI names execs in a process-agnostic fashion.
	URI() string

	// Result returns the exec's result after it has been completed.
	Result(ctx context.Context) (Result, error)

	// Inspect inspects the exec. It can be called at any point in the Exec's lifetime.
	// If a repo is provided, Inspect will also marshal this exec's inspect (but only if the exec is complete),
	// into the given repo and returns the digest of the marshaled contents.
	Inspect(ctx context.Context, repo *url.URL) (ExecInspect, digest.Digest, error)

	// Wait awaits completion of the Exec.
	Wait(ctx context.Context) error

	// Logs returns the standard error and/or standard output of the Exec.
	// If it is called during execution, and if follow is true, it follows
	// the logs until completion of execution.
	// Completed Execs return the full set of available logs.
	Logs(ctx context.Context, stdout, stderr, follow bool) (io.ReadCloser, error)

	// RemoteLogs returns the remote location of the logs (if available).
	// Returns the standard error (if 'stdout' is false) or standard output of the Exec.
	// The location is just a reference and the content must be retrieved as appropriate for the type.
	// RemoteLogs may (or may not) return a valid location during execution.
	RemoteLogs(ctx context.Context, stdout bool) (RemoteLogs, error)

	// Shell invokes /bin/bash inside an Exec. It can be invoked only when
	// the Exec is executing. r provides the shell input. The returned read
	// closer has the shell output. The caller has to close the read closer
	// once done.
	// TODO(pgopal) - Implement shell for zombie execs.
	Shell(ctx context.Context) (io.ReadWriteCloser, error)

	// Promote installs this exec's objects into the alloc's repository.
	// Promote assumes that the Exec is complete. i.e. Wait returned successfully.
	Promote(context.Context) error
}

An Exec computes a Value. It is created from an ExecConfig; the Exec interface permits waiting on completion, and inspection of results as well as ongoing execution.

type ExecConfig

type ExecConfig struct {
	// The type of exec: "exec", "intern", "extern"
	Type string

	// A human-readable name for the exec.
	Ident string

	// intern, extern: the URL from which data is fetched or to which
	// data is pushed.
	URL string

	// exec: the docker image used to perform an exec
	Image string

	// The docker image that is specified by the user
	OriginalImage string

	// exec: the Sprintf-able command that is to be run inside of the
	// Docker image.
	Cmd string

	// exec: the set of arguments (one per %s in Cmd) passed to the command
	// extern: the single argument which is to be exported
	Args []Arg

	// exec: the resource requirements for the exec
	Resources

	// NeedAWSCreds indicates the exec needs AWS credentials defined in
	// its environment: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and
	// AWS_SESSION_TOKEN will be available with the user's default
	// credentials.
	NeedAWSCreds bool

	// NeedDockerAccess indicates that the exec needs access to the host docker daemon
	NeedDockerAccess bool

	// OutputIsDir tells whether an output argument (by index)
	// is a directory.
	OutputIsDir []bool `json:",omitempty"`
}

ExecConfig contains all the necessary information to perform an exec.

func (ExecConfig) String

func (e ExecConfig) String() string

type ExecInspect

type ExecInspect struct {
	Created time.Time
	Config  ExecConfig
	State   string        // "created", "waiting", "running", .., "zombie"
	Status  string        // human readable status
	Error   *errors.Error `json:",omitempty"` // non-nil runtime on error
	Profile Profile

	// Gauges are used to export realtime exec stats. They are used only
	// while the Exec is in running state.
	Gauges Gauges
	// Commands running from top, for live inspection.
	Commands []string
	// Docker inspect output.
	Docker types.ContainerJSON
	// ExecError stores exec result errors.
	ExecError *errors.Error `json:",omitempty"`
}

ExecInspect describes the current state of an Exec.

func (ExecInspect) Runtime

func (e ExecInspect) Runtime() time.Duration

Runtime computes the exec's runtime based on Docker's timestamps.

type Executor

type Executor interface {
	// Put creates a new Exec at id. It is idempotent.
	Put(ctx context.Context, id digest.Digest, exec ExecConfig) (Exec, error)

	// Get retrieves the Exec named id.
	Get(ctx context.Context, id digest.Digest) (Exec, error)

	// Remove deletes an Exec.
	Remove(ctx context.Context, id digest.Digest) error

	// Execs lists all Execs known to the Executor.
	Execs(ctx context.Context) ([]Exec, error)

	// Load fetches missing files into the executor's repository. Load fetches
	// resolved files from the specified backing repository and unresolved files
	// directly from the source. The resolved fileset is returned and is available
	// on the executor on successful return. The client has to explicitly unload the
	// files to free them.
	Load(ctx context.Context, repo *url.URL, fileset Fileset) (Fileset, error)

	// VerifyIntegrity verifies the integrity of the given set of files
	VerifyIntegrity(ctx context.Context, fileset Fileset) error

	// Unload the data from the executor's repository. Any use of the unloaded files
	// after the successful return of Unload is undefined.
	Unload(ctx context.Context, fileset Fileset) error

	// Resources indicates the total amount of resources available at the Executor.
	Resources() Resources

	// Repository returns the Repository associated with this Executor.
	Repository() Repository
}

Executor manages Execs and their values.

type File

type File struct {
	// The digest of the contents of the file.
	ID digest.Digest

	// The size of the file.
	Size int64

	// Source stores a URL for the file from which it may
	// be retrieved.
	Source string `json:",omitempty"`

	// ETag stores an optional entity tag for the Source file.
	ETag string `json:",omitempty"`

	// LastModified stores the file's last modified time.
	LastModified time.Time `json:",omitempty"`

	// ContentHash is the digest of the file contents and can be present
	// for unresolved (ie reference) files.
	// ContentHash is expected to equal ID once this file is resolved.
	ContentHash digest.Digest `json:",omitempty"`

	// Assertions are the set of assertions representing the state
	// of all the dependencies that went into producing this file.
	// Unlike Etag/Size etc which are properties of this File,
	// Assertions can include properties of other subjects that
	// contributed to producing this File.
	// In order to include Assertions when converting to/from JSON,
	// the custom Fileset.WriteJSON and Fileset.ReadJSON methods must be used.
	// The standard JSON library (and probably most third party ones) will ignore this field.
	Assertions *Assertions `json:"-"`
}

File represents a File inside of Reflow. A file is said to be resolved if it contains the digest of the file's contents (ID). Otherwise, a File is said to be a reference, in which case it must contain a source and etag and may contain a ContentHash. Any type of File (resolved or reference) can contain Assertions. TODO(swami): Split into resolved/reference files explicitly.

func (File) Digest

func (f File) Digest() digest.Digest

Digest returns the file's digest: if the file is a reference and it's ContentHash is unset, the digest comprises the reference, source, etag and assertions. Reference files will return ContentHash if set (which is assumed to be the digest of the file's contents). Resolved files return ID which is the digest of the file's contents.

func (File) Equal

func (f File) Equal(g File) bool

Equal returns whether files f and g represent the same content. Since equality is a property of the file's contents, assertions are ignored.

func (File) IsRef

func (f File) IsRef() bool

IsRef returns whether this file is a file reference.

func (File) Short

func (f File) Short() string

func (File) String

func (f File) String() string

type Fileset

type Fileset struct {
	List []Fileset       `json:",omitempty"`
	Map  map[string]File `json:"Fileset,omitempty"`
}

Fileset is the result of an evaluated flow. Values may either be lists of values or Filesets. Filesets are a map of paths to Files.

func (*Fileset) AddAssertions

func (v *Fileset) AddAssertions(a *Assertions) error

AddAssertions adds the given assertions to all files in this Fileset.

func (Fileset) AnyEmpty

func (v Fileset) AnyEmpty() bool

AnyEmpty tells whether this value, or any of its constituent values contain no files.

func (Fileset) Assertions

func (v Fileset) Assertions() *Assertions

Assertions returns all the assertions across all the Files in this Fileset.

func (Fileset) Diff

func (v Fileset) Diff(w Fileset) (string, bool)

Diff deep-compares the values two filesets assuming they have the same structure and returns a pretty-diff of the differences (if any) and a boolean if they are different.

func (Fileset) Digest

func (v Fileset) Digest() digest.Digest

Digest returns a digest representing the value. Digests preserve semantics: two values with the same digest are considered to be equivalent.

func (Fileset) Empty

func (v Fileset) Empty() bool

Empty tells whether this value is empty, that is, it contains no files.

func (Fileset) Equal

func (v Fileset) Equal(w Fileset) bool

Equal reports whether v is equal to w.

func (Fileset) File

func (v Fileset) File() (File, error)

File returns the only file expected to be contained in this Fileset. Returns error if the fileset does not contain only one file.

func (Fileset) Files

func (v Fileset) Files() []File

Files returns the set of Files that comprise the value.

func (Fileset) Flatten

func (v Fileset) Flatten() []Fileset

Flatten is a convenience function to flatten (shallowly) the value v, returning a list of Values. If the value is a list value, the list is returned; otherwise a unary list of the value v is returned.

func (*Fileset) MapAssertionsByFile

func (v *Fileset) MapAssertionsByFile(files []File)

MapAssertionsByFile maps the assertions from the given set of files to the corresponding same file (based on file.Digest()), if any, in this fileset.

func (Fileset) N

func (v Fileset) N() int

N returns the number of files (not necessarily unique) in this value.

func (Fileset) Pullup

func (v Fileset) Pullup() Fileset

Pullup merges this value (tree) into a single toplevel fileset.

func (*Fileset) ReadJSON

func (v *Fileset) ReadJSON(r io.Reader) error

ReadJSON reads (unmarshals) JSON from the given Reader into this Fileset. ReadJSON is a lot more efficient than `json.Unmarshal` and specifically it will unmarshal Assertions (if any) for every File within this Fileset.

func (*Fileset) Replace

func (v *Fileset) Replace(f func(file File) File)

Replace replaces each file in this Fileset as per function f.

func (Fileset) Short

func (v Fileset) Short() string

Short returns a short, human-readable string representing the value. Its intended use is for pretty-printed output. In particular, hashes are abbreviated, and lists display only the first member, followed by ellipsis. For example, a list of values is printed as:

list<val<sample.fastq.gz=f2c59c40>, ...50MB>

func (Fileset) Size

func (v Fileset) Size() int64

Size returns the total size of this value.

func (Fileset) String

func (v Fileset) String() string

String returns a full, human-readable string representing the value v. Unlike Short, string is fully descriptive: it contains the full digest and lists are complete. For example:

list<sample.fastq.gz=sha256:f2c59c40a1d71c0c2af12d38a2276d9df49073c08360d72320847efebc820160>,
  sample2.fastq.gz=sha256:59eb82c49448e349486b29540ad71f4ddd7f53e5a204d50997f054d05c939adb>>

func (Fileset) Subst

func (v Fileset) Subst(sub map[digest.Digest]File) (out Fileset, resolved bool)

Subst the files in fileset using the provided mapping of File object digests to Files. Subst returns whether the fileset is fully resolved after substitution. That is, any unresolved file f in this fileset tree, will be substituted by sub[f.Digest()].

func (Fileset) WriteDigest

func (v Fileset) WriteDigest(w io.Writer)

WriteDigest writes the digestible material for v to w. The io.Writer is assumed to be produced by a Digester, and hence infallible. Errors are not checked.

func (*Fileset) WriteJSON

func (v *Fileset) WriteJSON(w io.Writer) error

WriteJSON writes (marshals) this Fileset in JSON format to the given writer. WriteJSON is more efficient than `json.Marshal` and more specifically it includes the Assertions of every File within this Fileset.

type FilesetLimiter

type FilesetLimiter struct {
	*limiter.Limiter
	// contains filtered or unexported fields
}

func GetFilesetOpLimiter

func GetFilesetOpLimiter() *FilesetLimiter

func (*FilesetLimiter) Limit

func (l *FilesetLimiter) Limit() int

type Gauges

type Gauges map[string]float64

Gauges stores a set of named gauges.

func (Gauges) Snapshot

func (g Gauges) Snapshot() Gauges

Snapshot returns a snapshot of the gauge values g.

type Profile

type Profile map[string]struct {
	Max, Mean, Var float64
	N              int64
	First, Last    time.Time
}

Profile stores keyed statistical summaries (currently: mean, max, N).

func (Profile) String

func (p Profile) String() string

type RWAssertions

type RWAssertions struct {
	// contains filtered or unexported fields
}

RWAssertions are a mutable representation of Assertions.

func NewRWAssertions

func NewRWAssertions(a *Assertions) *RWAssertions

NewRWAssertions creates a new RWAssertions with the given Assertions.

func (*RWAssertions) AddFrom

func (s *RWAssertions) AddFrom(list ...*Assertions) error

AddFrom adds to this RWAssertions from the given list of Assertions. Returns an error if the same key maps to a conflicting value as a result of the adding. AddFrom panics if s is nil.

func (*RWAssertions) Filter

func (s *RWAssertions) Filter(t *Assertions) (*Assertions, []AssertionKey)

Filter returns new Assertions mapping keys from t with values from s (panics if s is nil) and a list of AssertionKeys that exist in t but are missing in s.

type RemoteLogs

type RemoteLogs struct {
	Type RemoteLogsType

	// LogGroupName is the log group name (applicable if Type is 'Cloudwatch')
	LogGroupName string
	// LogStreamName is the log stream name (applicable if Type is 'Cloudwatch')
	LogStreamName string
}

RemoteLogs is a description of remote logs primarily useful for storing a reference. It is expected to contain basic details necessary to retrieve the logs but does not provide the means to do so.

func (RemoteLogs) String

func (r RemoteLogs) String() string

type RemoteLogsType

type RemoteLogsType string
const (
	RemoteLogsTypeUnknown    RemoteLogsType = "Unknown"
	RemoteLogsTypeCloudwatch RemoteLogsType = "cloudwatch"
)

type Repository

type Repository interface {
	// Collect removes from this repository any objects not in the
	// Liveset
	Collect(context.Context, liveset.Liveset) error

	// CollectWithThreshold removes from this repository any objects not in the live set and
	// is either in the dead set or its creation times are not more recent than the threshold time.
	CollectWithThreshold(ctx context.Context, live liveset.Liveset, dead liveset.Liveset, threshold time.Time, dryrun bool) error

	// Stat returns the File metadata for the blob with the given digest.
	// It returns errors.NotExist if the blob does not exist in this
	// repository.
	Stat(context.Context, digest.Digest) (File, error)

	// Get streams the blob named by the given Digest.
	// If it does not exist in this repository, an error with code
	// errors.NotFound will be returned.
	Get(context.Context, digest.Digest) (io.ReadCloser, error)

	// Put streams a blob to the repository and returns its
	// digest when completed.
	Put(context.Context, io.Reader) (digest.Digest, error)

	// WriteTo writes a blob identified by a Digest directly to a
	// foreign repository named by a URL. If the repository is
	// unable to write directly to the foreign repository, an error
	// with flag errors.NotSupported is returned.
	WriteTo(context.Context, digest.Digest, *url.URL) error

	// ReadFrom reads a blob identified by a Digest directly from a
	// foreign repository named by a URL. If the repository is
	// unable to read directly from the foreign repository, an error
	// with flag errors.NotSupported is returned.
	ReadFrom(context.Context, digest.Digest, *url.URL) error

	// URL returns the URL of this repository, or nil if it does not
	// have one. The returned URL may be used for direct transfers via
	// WriteTo or ReadFrom.
	URL() *url.URL
}

Repository defines an interface used for servicing blobs of data that are named-by-hash.

type Requirements

type Requirements struct {
	// Min is the smallest amount of resources that must be allocated
	// to satisfy the requirements.
	Min Resources
	// Width is the width of the requirements. A width of zero indicates
	// a "narrow" job: minimum describes the exact resources needed.
	// Widths greater than zero require a multiple (ie, 1 + Width) of the minimum requirement.
	Width int
}

Requirements stores resource requirements, comprising the minimum amount of acceptable resources and a width.

func (*Requirements) Add

func (r *Requirements) Add(s Requirements)

Add adds the provided requirements s to the requirements r. R's minimum requirements are set to the larger of the two; the two widths are added.

func (*Requirements) AddParallel

func (r *Requirements) AddParallel(s Resources)

AddParallel adds the provided resources s to the requirements, and also increases the requirement's width by one.

func (*Requirements) AddSerial

func (r *Requirements) AddSerial(s Resources)

AddSerial adds the provided resources s to the requirements.

func (Requirements) Equal

func (r Requirements) Equal(s Requirements) bool

Equal reports whether r and s represent the same requirements.

func (*Requirements) Max

func (r *Requirements) Max() Resources

Max is the maximum amount of resources represented by this resource request.

func (Requirements) String

func (r Requirements) String() string

String renders a human-readable representation of r.

type Resources

type Resources map[string]float64

Resources describes a set of labeled resources. Each resource is described by a string label and assigned a value. The zero value of Resources represents the resources with zeros for all labels.

func (*Resources) Add

func (r *Resources) Add(x, y Resources) *Resources

Add sets r to the sum x[key]+y[key] for all keys and returns r.

func (Resources) Available

func (r Resources) Available(s Resources) bool

Available tells if s resources are available from r.

func (Resources) Div

func (r Resources) Div(s Resources) map[string]float64

Div returns a mapping of the intersection of keys in r and s to the fraction r[key]/s[key]. Since the returned value cannot be treated as Resources, Div simply returns a map.

func (Resources) Equal

func (r Resources) Equal(s Resources) bool

Equal tells whether the resources r and s are equal in all dimensions of both r and s.

func (*Resources) Max

func (r *Resources) Max(x, y Resources) *Resources

Max sets r to the maximum max(x[key], y[key]) for all keys and returns r.

func (Resources) MaxRatio

func (r Resources) MaxRatio(s Resources) (max float64)

MaxRatio computes the max across ratios of values in r to s for the intersection of keys in r and s.

func (*Resources) Min

func (r *Resources) Min(x, y Resources) *Resources

Min sets r to the minimum min(x[key], y[key]) for all keys and returns r.

func (*Resources) Scale

func (r *Resources) Scale(s Resources, factor float64) *Resources

Scale sets r to the scaled resources s[key]*factor for all keys and returns r.

func (Resources) ScaledDistance

func (r Resources) ScaledDistance(u Resources) float64

ScaledDistance returns the distance between two resources computed as a sum of the differences in memory, cpu and disk with some predefined scaling.

func (*Resources) Set

func (r *Resources) Set(s Resources) *Resources

Set sets r[key]=s[key] for all keys and returns r.

func (Resources) String

func (r Resources) String() string

String renders a Resources. All nonzero-valued labels are included; mem, cpu, and disk are always included regardless of their value.

func (*Resources) Sub

func (r *Resources) Sub(x, y Resources) *Resources

Sub sets r to the difference x[key]-y[key] for all keys and returns r.

type Result

type Result struct {
	// Fileset is the fileset produced by an exec.
	Fileset Fileset `json:",omitempty"`

	// Err is error produced by an exec.
	Err *errors.Error `json:",omitempty"`
}

Result is the result of an exec.

func (Result) Equal

func (r Result) Equal(s Result) bool

Equal tells whether r is equal to s.

func (Result) Short

func (r Result) Short() string

Short renders an abbreviated human-readable string of this result.

func (Result) String

func (r Result) String() string

String renders a human-readable string of this result.

type StringDigest

type StringDigest struct {
	// contains filtered or unexported fields
}

StringDigest holds any string and its digest.

func NewStringDigest

func NewStringDigest(s string) StringDigest

NewStringDigest creates a StringDigest based on the given string.

func (StringDigest) Digest

func (i StringDigest) Digest() digest.Digest

Digest returns the digest of the underlying string.

func (StringDigest) IsValid

func (i StringDigest) IsValid() bool

IsValid returns whether this StringDigest is valid (ie, the underlying string is non-empty).

func (StringDigest) String

func (i StringDigest) String() string

String returns the underlying string.

type Transferer

type Transferer interface {
	// Transfer transfers a set of files from the src to the dst
	// repository. A transfer manager may apply policies (e.g., rate
	// limits and concurrency limits) to these transfers.
	Transfer(ctx context.Context, dst, src Repository, files ...File) error

	NeedTransfer(ctx context.Context, dst Repository, files ...File) ([]File, error)
}

Transferer defines an interface used for management of transfers between multiple repositories.

Directories

Path Synopsis
Package assoc defines data types for associative maps used within Reflow.
Package assoc defines data types for associative maps used within Reflow.
dydbassoc
Package dydbassoc implements an assoc.Assoc based on AWS's DynamoDB.
Package dydbassoc implements an assoc.Assoc based on AWS's DynamoDB.
Package batch implements support for running batches of reflow (stateful) evaluations.
Package batch implements support for running batches of reflow (stateful) evaluations.
Package blob implements a set of generic interfaces used to implement blob storage implementations such as S3, GCS, and local file system implementations.
Package blob implements a set of generic interfaces used to implement blob storage implementations such as S3, GCS, and local file system implementations.
s3blob
Package s3blob implements the blob interfaces for S3.
Package s3blob implements the blob interfaces for S3.
testblob
Package testblob implements a blobstore appropriate for testing.
Package testblob implements a blobstore appropriate for testing.
client
Package client implements a remote client for the reflow bootstrap.
Package client implements a remote client for the reflow bootstrap.
cmd
Package ec2authenticator implements Docker repository authentication for ECR using an AWS SDK session and a root.
Package ec2authenticator implements Docker repository authentication for ECR using an AWS SDK session and a root.
Package ec2cluster implements support for maintaining elastic clusters of Reflow instances on EC2.
Package ec2cluster implements support for maintaining elastic clusters of Reflow instances on EC2.
volume
Package volume implements support for maintaining (EBS) volumes on an EC2 instance by watching the disk usage of the underlying disk device and resizing the EBS volumes whenever necessary based on provided parameters.
Package volume implements support for maintaining (EBS) volumes on an EC2 instance by watching the disk usage of the underlying disk device and resizing the EBS volumes whenever necessary based on provided parameters.
Package errors provides a standard error definition for use in Reflow.
Package errors provides a standard error definition for use in Reflow.
Package lang implements the reflow language.
Package lang implements the reflow language.
pooltest
Package pooltest tests pools.
Package pooltest tests pools.
testutil
Package testutil provides utilities for testing code that involves pools.
Package testutil provides utilities for testing code that involves pools.
Package localcluster implements a runner.Cluster using the local machine's docker.
Package localcluster implements a runner.Cluster using the local machine's docker.
Package log implements leveling and teeing on top of Go's standard logs package.
Package log implements leveling and teeing on top of Go's standard logs package.
Package pool implements resource pools for reflow.
Package pool implements resource pools for reflow.
client
Package client implements a remoting client for reflow pools.
Package client implements a remoting client for reflow pools.
server
Package server exposes a pool implementation for remote access.
Package server exposes a pool implementation for remote access.
Package repository provides common ways to dial reflow.Repository implementations; it also provides some common utilities for working with repositories.
Package repository provides common ways to dial reflow.Repository implementations; it also provides some common utilities for working with repositories.
blobrepo
Package blobrepo implements a generic reflow.Repository on top of a blob.Bucket.
Package blobrepo implements a generic reflow.Repository on top of a blob.Bucket.
client
Package client implements repository REST client.
Package client implements repository REST client.
filerepo
Package filerepo implements a filesystem-backed repository.
Package filerepo implements a filesystem-backed repository.
s3
server
Package server implements a Repository REST server.
Package server implements a Repository REST server.
Package rest provides a framework for serving and accessing hierarchical resource-based APIs.
Package rest provides a framework for serving and accessing hierarchical resource-based APIs.
Package sched implements task scheduling for Reflow.
Package sched implements task scheduling for Reflow.
Package syntax implements the Reflow language.
Package syntax implements the Reflow language.
Package taskdb defines interfaces and data types for storing and querying reflow runs, tasks, pools and allocs.
Package taskdb defines interfaces and data types for storing and querying reflow runs, tasks, pools and allocs.
dynamodbtask
Package dynamodbtask implements the taskdb.TaskDB interface for AWS dynamodb backend.
Package dynamodbtask implements the taskdb.TaskDB interface for AWS dynamodb backend.
test
flow
Package flow contains a number of constructors for Flow nodes that are convenient for testing.
Package flow contains a number of constructors for Flow nodes that are convenient for testing.
Package trace provides a tracing system for Reflow events.
Package trace provides a tracing system for Reflow events.
Package types contains data structures and algorithms for dealing with value types in Reflow.
Package types contains data structures and algorithms for dealing with value types in Reflow.
Package values defines data structures for representing (runtime) values in Reflow.
Package values defines data structures for representing (runtime) values in Reflow.
Package wg implements a channel-enabled WaitGroup.
Package wg implements a channel-enabled WaitGroup.
internal
ecrauth
Package ecrauth provides an interface and utilities for authenticating AWS EC2 ECR Docker repositories.
Package ecrauth provides an interface and utilities for authenticating AWS EC2 ECR Docker repositories.
fs
scanner
Package scanner provides a scanner and tokenizer for UTF-8-encoded text.
Package scanner provides a scanner and tokenizer for UTF-8-encoded text.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL