Back to godoc.org
github.com/grailbio/bigslice

Package bigslice

v0.0.0-...-4cae006
Latest Go to latest

The latest major version is .

Published: Jul 27, 2020 | License: Apache-2.0 | Module: github.com/grailbio/bigslice

Overview

Package bigslice implements a distributed data processing system. Users compose computations by operating over large collections ("big slices") of data, transforming them with a handful of combinators. While users express computations using collections-style operations, bigslice takes care of the details of parallel execution and distribution across multiple machines.

Bigslice jobs can run locally, but uses bigmachine for distribution among a cluster of compute nodes. In either case, user code does not change; the details of distribution are handled by the combination of bigmachine and bigslice.

Because Go cannot easily serialize code to be sent over the wire and executed remotely, bigslice programs have to be written with a few constraints:

1. All slices must be constructed by bigslice funcs (bigslice.Func), and all such functions must be instantiated before bigslice.Start is called. This rule is easy to follow: if funcs are global variables, and bigslice.Start is called from a program's main, then the program is compliant.

2. The driver program must be compiled on the same GOOS and GOARCH as the target architecture. When running locally, this is not a concern, but programs that require distribution must be run from a linux/amd64 binary. Bigslice also supports the fat binary format implemented by github.com/grailbio/base/fatbin. The bigslice tool (github.com/grailbio/bigslice/cmd/bigslice) uses this package to compile portable fat binaries.

Some Bigslice operations may be annotated with runtime pragmas: directives for the Bigslice runtime. See Pragma for details.

User provided functions in Bigslice

Functions provided to the various bigslice combinators (e.g., bigslice.Map) may take an additional argument of type context.Context. If specified, then the lifetime of the context is tied to that of the underlying bigslice task. Additionally, the context carries a metrics scope (github.com/grailbio/base/bigslice/metrics.Scope) which can be used to update metric values during data processing.

Index

Examples

func FuncLocations

func FuncLocations() []string

FuncLocations returns a slice of strings that describe the locations of Func creation, in the same order as the Funcs registry. We use this to verify that worker processes have the same Funcs. Note that this is not a precisely correct verification, as it's possible to define multiple Funcs on the same line. However, it's good enough for the scenarios we have encountered or anticipate.

func FuncLocationsDiff

func FuncLocationsDiff(lhs, rhs []string) []string

FuncLocationsDiff returns a slice of strings that describes the differences between lhs and rhs locations slices as returned by FuncLocations. The slice is a unified diff between the slices, so if you print each element on a line, you'll get interpretable output. For example:

for _, edit := FuncLocationsDiff([]string{"a", "b", "c"}, []string{"a", "c"}) {
    fmt.Println(edit)
}

will produce:

a
- b
c

If the slices are identical, it returns nil.

func Helper

func Helper()

Helper is used to mark a function as a helper function: names for newly created slices will be attributed to the caller of the function instead of the function itself.

func String

func String(slice Slice) string

String returns a string describing the slice and its type.

type Accumulator

type Accumulator interface {
	// Accumulate the provided columns of length n.
	Accumulate(in frame.Frame, n int)
	// Read a batch of accumulated values into keys and values. These
	// are slices of the key type and accumulator type respectively.
	Read(keys, values reflect.Value) (int, error)
}

An Accumulator represents a stateful accumulation of values of a certain type. Accumulators maintain their state in memory.

Accumulators should be read only after accumulation is complete.

type Dep

type Dep struct {
	Slice
	Shuffle     bool
	Partitioner Partitioner
	// Expand indicates that each shard of a shuffle dependency (i.e.,
	// all the shards of a given partition) should be expanded (i.e.,
	// not merged) when handed to the slice implementation. This is to
	// support merge-sorting of shards of the same partition.
	Expand bool
}

A Dep is a Slice dependency. Deps comprise a slice and a boolean flag determining whether this is represents a shuffle dependency. Shuffle dependencies must perform a data shuffle step: the dependency must partition its output according to the Slice's partitioner, and, when the dependent Slice is computed, the evaluator must pass in Readers that read a single partition from all dependent shards. If Shuffle is true, then the provided partitioner determines how the output is partitioned. If it is nil, the default (hash by first column) partitioner is used.

type FuncValue

type FuncValue struct {
	// contains filtered or unexported fields
}

A FuncValue represents a Bigslice function, as returned by Func.

func Func

func Func(fn interface{}) *FuncValue

Func creates a bigslice function from the provided function value. Bigslice funcs must return a single Slice value. Funcs provide bigslice with a means of dynamic abstraction: since Funcs can be invoked remotely, dynamically created slices may be named across process boundaries.

func (*FuncValue) Apply

func (f *FuncValue) Apply(args ...interface{}) Slice

Apply invokes the function f with the provided arguments, returning the computed Slice. Apply panics with a type error if argument type or arity do not match.

func (*FuncValue) Exclusive

func (f *FuncValue) Exclusive() *FuncValue

Exclusive marks this func to require mutually exclusive machine allocation.

NOTE: This is an experimental API that may change.

func (*FuncValue) In

func (f *FuncValue) In(i int) reflect.Type

In returns the i'th argument type of function f.

func (*FuncValue) Invocation

func (f *FuncValue) Invocation(location string, args ...interface{}) Invocation

Invocation creates an invocation representing the function f applied to the provided arguments. Invocation panics with a type error if the provided arguments do not match in type or arity.

func (*FuncValue) NumIn

func (f *FuncValue) NumIn() int

NumIn returns the number of input arguments to f.

type Invocation

type Invocation struct {
	Index     uint64
	Func      uint64
	Args      []interface{}
	Exclusive bool
	Location  string
}

Invocation represents an invocation of a Bigslice func of the same binary. Invocations can be transmitted across process boundaries and thus may be invoked by remote executors.

Each invocation carries an invocation index, which is a unique index for invocations within a process namespace. It can thus be used to represent a particular function invocation from a driver process.

Invocations must be created by newInvocation.

func (Invocation) Invoke

func (i Invocation) Invoke() Slice

Invoke performs the Func invocation represented by this Invocation instance, returning the resulting slice.

func (Invocation) String

func (inv Invocation) String() string

type Name

type Name struct {
	// Op is the operation that the slice performs (e.g. "reduce", "map")
	Op string
	// File is the file in which the slice was defined.
	File string
	// Line is the line in File at which the slice was defined.
	Line int
	// Index disambiguates slices created on the same File and Line.
	Index int
}

Name is a unique name for a slice, constructed with useful context for diagnostic or status display.

func MakeName

func MakeName(op string) Name

func (Name) String

func (n Name) String() string

type Partitioner

type Partitioner func(ctx context.Context, frame frame.Frame, nshard int, shards []int)

A Partitioner is used to assign partitions to rows in a frame.

type Pragma

type Pragma interface {
	// Procs returns the number of procs a slice task needs to run. It is
	// superceded by Exclusive and clamped to the maximum number of procs per
	// machine.
	Procs() int
	// Exclusive indicates that a slice task should be given
	// exclusive access to the underlying machine.
	Exclusive() bool
	// Materialize indicates that the result of the slice task should be
	// materialized, i.e. break pipelining.
	Materialize() bool
}

Pragma comprises runtime directives used during bigslice execution.

var Exclusive Pragma = exclusive{}

Exclusive is a Pragma that indicates the slice task should be given exclusive access to the machine that runs it. Exclusive takes precedence over Procs.

var ExperimentalMaterialize Pragma = materialize{}

ExperimentalMaterialize is a Pragma that indicates the slice task results should be materialized, i.e. not pipelined. You may want to use this to materialize and reuse results of tasks that would normally have been pipelined.

It is tagged "experimental" because we are considering other ways of achieving this.

TODO(jcharumilind): Consider doing this automatically for slices on which multiple slices depend.

func Procs

func Procs(n int) Pragma

Procs returns a pragma that sets the number of procs a slice task needs to run to n. It is superceded by Exclusive and clamped to the maximum number of procs per machine.

type Pragmas

type Pragmas []Pragma

Pragmas composes multiple underlying Pragmas.

func (Pragmas) Exclusive

func (p Pragmas) Exclusive() bool

Exclusive implements Pragma.

func (Pragmas) Materialize

func (p Pragmas) Materialize() bool

Materialize implements Pragma.

func (Pragmas) Procs

func (p Pragmas) Procs() int

Procs implements Pragma. If multiple tasks with Procs pragmas are pipelined, we allocate the maximum to the composed pipeline.

type ShardType

type ShardType int

ShardType indicates the type of sharding used by a Slice.

const (
	// HashShard Slices are partitioned by an (unspecified)
	// hash of an record. That is, the same record should
	// be assigned a stable shard number.
	HashShard ShardType = iota
	// RangeShard Slices are partitioned by the range of a key. The key
	// is always the first column of the slice.
	RangeShard
)

type Slice

type Slice interface {
	slicetype.Type

	// Name returns a unique (composite) name for this Slice that also has
	// useful context for diagnostic or status display.
	Name() Name

	// NumShard returns the number of shards in this Slice.
	NumShard() int
	// ShardType returns the sharding type of this Slice.
	ShardType() ShardType

	// NumDep returns the number of dependencies of this Slice.
	NumDep() int
	// Dep returns the i'th dependency for this Slice.
	Dep(i int) Dep

	// Combiner is an optional function that is used to combine multiple values
	// with the same key from the slice's output. No combination is performed
	// if Nil.
	Combiner() slicefunc.Func

	// Reader returns a Reader for a shard of this Slice. The reader itself
	// computes the shard's values on demand. The caller must provide Readers
	// for all of this shard's dependencies, constructed according to the
	// dependency type (see Dep).
	Reader(shard int, deps []sliceio.Reader) sliceio.Reader
}

A Slice is a shardable, ordered dataset. Each slice consists of zero or more columns of data distributed over one or more shards. Slices may declare dependencies on other slices from which it is computed. In order to compute a slice, its dependencies must first be computed, and their resulting Readers are passed to a Slice's Reader method.

Since Go does not support generic typing, Slice combinators perform their own dynamic type checking. Schematically we write the n-ary slice with types t1, t2, ..., tn as Slice<t1, t2, ..., tn>.

Types that implement the Slice interface must be comparable.

func Cache

func Cache(ctx context.Context, slice Slice, prefix string) Slice

Cache caches the output of a slice to the given file prefix. Cached data are stored as "prefix-nnnn-of-mmmm" for shards nnnn of mmmm. When the slice is computed, each shard is encoded and written to a separate file with this prefix. If all shards exist, then Cache shortcuts computation and instead reads directly from the previously computed output. The user must guarantee cache consistency: if the cache could be invalid (e.g., because of code changes), the user is responsible for removing existing cached files, or picking a different prefix that correctly represents the operation to be cached.

Cache uses GRAIL's file library, so prefix may refer to URLs to a distributed object store such as S3.

Example

Code:

package main

import (
	"context"
	"fmt"
	"github.com/grailbio/base/log"
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
	"io/ioutil"
	"os"
	"sync/atomic"
)

func main() {
	// Compute a slice that performs a mapping computation and uses Cache to
	// cache the result, showing that we had to execute the mapping computation.
	// Compute another slice that uses the cache, showing that we produced the
	// same result without executing the mapping computation again.
	dir, err := ioutil.TempDir("", "example-cache")
	if err != nil {
		log.Fatalf("could not create temp directory: %v", err)
	}
	defer os.RemoveAll(dir)
	slice := bigslice.Const(2, []int{0, 1, 2, 3})
	// slicetest.Print uses local evaluation, so we can use shared memory across
	// all shard computations.
	var computed atomic.Value
	computed.Store(false)
	slice = bigslice.Map(slice, func(x int) int {
		computed.Store(true)
		return x
	})
	// The first evaluation causes the map to be evaluated.
	slice0 := bigslice.Cache(context.Background(), slice, dir+"/")
	fmt.Println("# first evaluation")
	slicetest.Print(slice0)
	fmt.Printf("computed: %t\n", computed.Load().(bool))

	// Reset the computed state for our second evaluation. The second evaluation
	// will read from the cache that was written by the first evaluation, so the
	// map will not be evaluated.
	computed.Store(false)
	slice1 := bigslice.Cache(context.Background(), slice, dir+"/")
	fmt.Println("# second evaluation")
	slicetest.Print(slice1)
	fmt.Printf("computed: %t\n", computed.Load().(bool))
}
# first evaluation
0
1
2
3
computed: true
# second evaluation
0
1
2
3
computed: false

func CachePartial

func CachePartial(ctx context.Context, slice Slice, prefix string) Slice

CachePartial caches the output of the slice to the given file prefix (it uses the same file naming scheme as Cache). However, unlike Cache, if CachePartial finds incomplete cached results (from an earlier failed or interrupted run), it will use them and recompute only the missing data.

WARNING: The user is responsible for ensuring slice's contents are deterministic between bigslice runs. If keys are non-deterministic, for example due to pseudorandom seeding based on time, or reading the state of a modifiable file in S3, CachePartial produces corrupt results.

As with Cache, the user must guarantee cache consistency.

Example

Code:

package main

import (
	"context"
	"fmt"
	"github.com/grailbio/base/log"
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
	"io/ioutil"
	"os"
	"path/filepath"
	"sync/atomic"
)

func main() {
	// Compute a slice that performs a mapping computation and uses Cache to
	// cache the result, showing that we had to execute the mapping computation
	// for each row. Manually remove only part of the cached data. Compute
	// another slice that uses the cache, showing that we produced the same
	// result, only executing the mapping computation on the rows whose data we
	// removed from the cache.
	dir, err := ioutil.TempDir("", "example-cache-partial")
	if err != nil {
		log.Fatalf("could not create temp directory: %v", err)
	}
	defer os.RemoveAll(dir)
	slice := bigslice.Const(2, []int{0, 1, 2, 3})
	// slicetest.Print uses local evaluation, so we can use shared memory across
	// all shard computations.
	var computed int32
	slice = bigslice.Map(slice, func(x int) int {
		atomic.AddInt32(&computed, 1)
		return x
	})
	// The first evaluation causes the map to be evaluated.
	slice0 := bigslice.CachePartial(context.Background(), slice, dir+"/")
	fmt.Println("# first evaluation")
	slicetest.Print(slice0)
	fmt.Printf("computed: %d\n", computed)

	// Remove one of the cache files. This will leave us with a partial cache,
	// i.e. a cache with only some shards cached.
	infos, err := ioutil.ReadDir(dir)
	if err != nil {
		log.Fatalf("error reading temp dir %s: %v", dir, err)
	}
	path := filepath.Join(dir, infos[0].Name())
	if err = os.Remove(path); err != nil {
		log.Fatalf("error removing cache file %s: %v", path, err)
	}

	// Reset the computed state for our second evaluation. The second evaluation
	// will read from the partial cache that was written by the first
	// evaluation, so only some rows will need recomputation.
	computed = 0
	slice1 := bigslice.CachePartial(context.Background(), slice, dir+"/")
	fmt.Println("# second evaluation")
	slicetest.Print(slice1)
	fmt.Printf("computed: %d\n", computed)

	// Note that this example is fragile for a couple of reasons. First, it
	// relies on how the cache is stored in files. If that changes, we may need
	// to change how we construct a partial cache. Second, it relies on the
	// stability of the shard allocation. If that changes, we may end up with
	// different sharding and a different number of rows needing computation.

}
# first evaluation
0
1
2
3
computed: 4
# second evaluation
0
1
2
3
computed: 3

func Cogroup

func Cogroup(slices ...Slice) Slice

Cogroup returns a slice that, for each key in any slice, contains the group of values for that key, in each slice. Schematically:

Cogroup(Slice<tk1, ..., tkp, t11, ..., t1n>, Slice<tk1, ..., tkp, t21, ..., t2n>, ..., Slice<tk1, ..., tkp, tm1, ..., tmn>)
	Slice<tk1, ..., tkp, []t11, ..., []t1n, []t21, ..., []tmn>

It thus implements a form of generalized JOIN and GROUP.

Cogroup uses the prefix columns of each slice as its key; keys must be partitionable.

TODO(marius): don't require spilling to disk when the input data set is small enough.

TODO(marius): consider providing a version that returns scanners in the returned slice, so that we can stream through. This would require some changes downstream, however, so that buffering and encoding functionality also know how to read scanner values.

Example

Code:

package main

import (
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
)

func main() {
	slice0 := bigslice.Const(2,
		[]int{0, 1, 2, 3, 0, 1},
		[]string{"zero", "one", "two", "three", "cero", "uno"},
	)
	slice1 := bigslice.Const(2,
		[]int{0, 1, 2, 3, 4, 5, 6},
		[]int{0, 1, 4, 9, 16, 25, 36},
	)
	slice := bigslice.Cogroup(slice0, slice1)
	slicetest.Print(slice)
}
0 [cero zero] [0]
1 [one uno] [1]
2 [two] [4]
3 [three] [9]
4 [] [16]
5 [] [25]
6 [] [36]
Example (One)

Code:

package main

import (
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
)

func main() {
	slice := bigslice.Const(2,
		[]int{0, 1, 2, 3, 0, 1},
		[]string{"zero", "one", "two", "three", "cero", "uno"},
	)
	slice = bigslice.Cogroup(slice)
	slicetest.Print(slice)
}
0 [cero zero]
1 [one uno]
2 [two]
3 [three]

func Const

func Const(nshard int, columns ...interface{}) Slice

Const returns a Slice representing the provided value. Each column of the Slice should be provided as a Go slice of the column's type. The value is split into nshard shards.

Example

Code:

package main

import (
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
)

func main() {
	slice := bigslice.Const(2,
		[]int{0, 1, 2, 3},
		[]string{"zero", "one", "two", "three"},
	)
	slicetest.Print(slice)
}
0 zero
1 one
2 two
3 three

func Filter

func Filter(slice Slice, pred interface{}, prags ...Pragma) Slice

Filter returns a slice where the provided predicate is applied to each element in the given slice. The output slice contains only those entries for which the predicate is true.

The predicate function should receive each column of slice and return a single boolean value.

Schematically:

Filter(Slice<t1, t2, ..., tn>, func(t1, t2, ..., tn) bool) Slice<t1, t2, ..., tn>
Example

Code:

package main

import (
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
)

func main() {
	slice := bigslice.Const(2,
		[]int{0, 1, 2, 3, 4, 5},
		[]string{"zero", "one", "two", "three", "four", "five"},
	)
	slice = bigslice.Filter(slice, func(x int, s string) bool {
		return x%2 == 0
	})
	slicetest.Print(slice)
}
0 zero
2 two
4 four

func Flatmap

func Flatmap(slice Slice, fn interface{}, prags ...Pragma) Slice

Flatmap returns a Slice that applies the function fn to each record in the slice, flattening the returned slice. That is, the function fn should be of the form:

func(in1 inType1, in2 inType2, ...) (out1 []outType1, out2 []outType2)

Schematically:

Flatmap(Slice<t1, t2, ..., tn>, func(v1 t1, v2 t2, ..., vn tn) ([]r1, []r2, ..., []rn)) Slice<r1, r2, ..., rn>
Example

Code:

package main

import (
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
	"strings"
)

func main() {
	// Flatmap to split strings into words using different separators. The input
	// is of type Slice<string, string>:
	// - col0: the string
	// - col1: the separator
	//
	// The output is of type Slice<string, int>:
	// - col0: a word from the input strings
	// - col1: the length of the word
	slice := bigslice.Const(2,
		[]string{
			"Lorem ipsum dolor sit amet",
			"consectetur:adipiscing",
			"elit",
			"sed.do.eiusmod.tempor.incididunt",
		},
		[]string{" ", ":", ";", "."}, // Separators.
	)
	slice = bigslice.Flatmap(slice, func(s, sep string) ([]string, []int) {
		split := strings.Split(s, sep)
		lengths := make([]int, len(split))
		for i := range lengths {
			lengths[i] = len(split[i])
		}
		return split, lengths
	})
	slicetest.Print(slice)
}
Lorem 5
adipiscing 10
amet 4
consectetur 11
do 2
dolor 5
eiusmod 7
elit 4
incididunt 10
ipsum 5
sed 3
sit 3
tempor 6

func Fold

func Fold(slice Slice, fold interface{}) Slice

Fold returns a slice that aggregates values by the first column using a custom aggregation function. For an input slice Slice<t1, t2, ..., tn>, Fold requires that the provided accumulator function follow the form:

func(accum acctype, v2 t2, ..., vn tn) acctype

The function is invoked once for each slice element with the same value for column 1 (t1). On the first invocation, the accumulator is passed the zero value of its accumulator type.

Fold requires that the first column of the slice is partitionable. See the documentation for Keyer for more details.

Schematically:

Fold(Slice<t1, t2, ..., tn>, func(accum acctype, v2 t2, ..., vn tn) acctype) Slice<t1, acctype>

BUG(marius): Fold does not yet support slice grouping

Example

Code:

package main

import (
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
)

func main() {
	// Fold over the input Slice<string, int, string> to accumulate a struct
	// holding:
	// - the sum of the integers in col1.
	// - the product of the integers in col1.
	// - the longest string encountered in col2.
	slice := bigslice.Const(2,
		[]string{"c", "a", "b", "c", "c", "b", "a", "a", "a", "a", "c"},
		[]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
		[]string{
			"Lorem",
			"ipsum",
			"dolor",
			"sit",
			"amet",
			"consectetur",
			"adipiscing",
			"elit",
			"sed",
			"do",
			"eiusmod",
		},
	)
	type accum struct {
		ready bool
		// sum is the sum of integers in the second column.
		sum int
		// product is the product of integers in the second column.
		product int
		// longest is the longest string in the third column.
		longest string
	}
	slice = bigslice.Fold(slice, func(acc accum, i int, s string) accum {
		if !acc.ready {
			// Initialize product as the multiplicative identity, 1.
			acc.product = 1
			acc.ready = true
		}
		acc.sum += i
		acc.product *= i
		if len(acc.longest) < len(s) {
			acc.longest = s
		}
		return acc
	})
	slicetest.Print(slice)
}
a {true 36 10080 adipiscing}
b {true 9 18 consectetur}
c {true 21 220 eiusmod}
func Head(slice Slice, n int) Slice

Head returns a slice that returns at most the first n items from each shard of the underlying slice. Its type is the same as the provided slice.

Example

Code:

package main

import (
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
)

func main() {
	// Use one shard, as Head operates per shard.
	slice := bigslice.Const(1,
		[]int{0, 1, 2, 3, 4, 5},
		[]string{"zero", "one", "two", "three", "four", "five"},
	)
	slice = bigslice.Head(slice, 3)
	slicetest.Print(slice)
}
0 zero
1 one
2 two

func Map

func Map(slice Slice, fn interface{}, prags ...Pragma) Slice

Map transforms a slice by invoking a function for each record. The type of slice must match the arguments of the function fn. The type of the returned slice is the set of columns returned by fn. The returned slice matches the input slice's sharding, but is always hash partitioned.

Schematically:

Map(Slice<t1, t2, ..., tn>, func(v1 t1, v2 t2, ..., vn tn) (r1, r2, ..., rn)) Slice<r1, r2, ..., rn>
Example

Code:

package main

import (
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
)

func main() {
	// Map an input of Slice<int, string>...:
	// - col0: an integer
	// - col1: a label for that integer
	//
	// ... to a Slice<int, string, int, string>:
	// - col0: original integer
	// - col1: original label
	// - col2: square of original integer
	// - col3: original label with ".squared" appended
	slice := bigslice.Const(2,
		[]int{0, 1, 2, 3},
		[]string{"zero", "one", "two", "three"},
	)
	slice = bigslice.Map(slice, func(x int, s string) (int, string, int, string) {
		return x, s, x * x, s + ".squared"
	})
	slicetest.Print(slice)
}
0 zero 0 zero.squared
1 one 1 one.squared
2 two 4 two.squared
3 three 9 three.squared

func Prefixed

func Prefixed(slice Slice, prefix int) Slice

Prefixed returns a slice with the provided prefix. A prefix determines the number of columns (starting at 0) in the slice that compose the key values for that slice for operations like reduce.

Example

Code:

package main

import (
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
)

func main() {
	// Count the number of pets of the same type and name by using Prefixed to
	// make a slice with the type and name columns as the key, then using Reduce
	// to count the number of elements that have that key.
	slice := bigslice.Const(2,
		[]string{
			"dog",
			"dog",
			"cat",
			"cat",
			"cat",
			"fish",
			"dog",
			"dog",
			"cat",
			"fish",
			"fish",
		},
		[]string{
			"spot",
			"spot",
			"mittens",
			"socks",
			"socks",
			"nemo",
			"lassie",
			"spot",
			"mittens",
			"nemo",
			"dory",
		},
		[]int{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
	)
	slice = bigslice.Prefixed(slice, 2)
	slice = bigslice.Reduce(slice, func(a, b int) int { return a + b })
	slicetest.Print(slice)
}
cat mittens 2
cat socks 2
dog lassie 1
dog spot 3
fish dory 1
fish nemo 2

func ReadCache

func ReadCache(ctx context.Context, typ slicetype.Type, numShard int, prefix string) Slice

ReadCache reads from an existing cache but does not write any cache itself. This may be useful if you want to reuse a cache from a previous computation and fail if it does not exist. typ is the type of the cached and returned slice. You may construct typ using slicetype.New or pass a Slice, which embeds slicetype.Type.

Example

Code:

package main

import (
	"context"
	"fmt"
	"github.com/grailbio/base/log"
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
	"io/ioutil"
	"os"
)

func main() {
	// Compute a slice that uses Cache to cache the result. Use ReadCache to
	// read from that same cache. Observe that we get the same data.
	const numShards = 2
	dir, err := ioutil.TempDir("", "example-cache")
	if err != nil {
		log.Fatalf("could not create temp directory: %v", err)
	}
	defer os.RemoveAll(dir)
	slice0 := bigslice.Const(numShards, []int{0, 1, 2, 3})
	slice0 = bigslice.Cache(context.Background(), slice0, dir+"/")
	fmt.Println("# build cache")
	slicetest.Print(slice0)

	slice1 := bigslice.ReadCache(context.Background(), slice0, numShards, dir+"/")
	fmt.Println("# use ReadCache to read cache")
	slicetest.Print(slice1)
}
# build cache
0
1
2
3
# use ReadCache to read cache
0
1
2
3

func ReaderFunc

func ReaderFunc(nshard int, read interface{}, prags ...Pragma) Slice

ReaderFunc returns a Slice that uses the provided function to read data. The function read must be of the form:

func(shard int, state stateType, col1 []col1Type, col2 []col2Type, ..., colN []colNType) (int, error)

This returns a slice of the form:

Slice<col1Type, col2Type, ..., colNType>

The function is invoked to fill a vector of elements. col1, ..., colN are preallocated slices that should be filled by the reader function. The function should return the number of elements that were filled. The error EOF should be returned when no more data are available.

ReaderFunc provides the function with a zero-value state upon the first invocation of the function for a given shard. (If the state argument is a pointer, it is allocated.) Subsequent invocations of the function receive the same state value, thus permitting the reader to maintain local state across the read of a whole shard.

Example

Code:

package main

import (
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/sliceio"
	"github.com/grailbio/bigslice/slicetest"
)

func main() {
	// Use ReaderFunc to make an evenly sharded Slice<int, string> from the
	// alphabet:
	// - col0: the 1-indexed index of the letter in the alphabet
	// - col1: the letter
	const numShards = 6
	const alphabet = "abcdefghijklmnopqrstuvwxyz"
	type state struct {
		// next is the index of the next element of the alphabet to be read.
		next int
	}
	slice := bigslice.ReaderFunc(numShards,
		func(shard int, s *state, is []int, ss []string) (int, error) {
			// Each shard will handle a portion of the alphabet.
			// Shard 0 reads letters 1, 7, 13, ....
			// Shard 1 reads letters 2, 8, 14, ....
			// ...
			// Shard 5 reads letters 6, 12, 18, ....
			if s.next == 0 {
				// This is the first call, so we initialize our state.
				s.next = shard + 1
			}
			for n := 0; ; n++ {
				if len(alphabet) < s.next {
					// Our shard is complete, so return EOF.
					return n, sliceio.EOF
				}
				if n == len(is) {
					// We have filled the passed buffers, so there is nothing
					// left to do in this invocation.
					return n, nil
				}
				is[n] = s.next
				ss[n] = string(alphabet[s.next-1])
				s.next += numShards
			}
		})
	slicetest.Print(slice)
}
1 a
2 b
3 c
4 d
5 e
6 f
7 g
8 h
9 i
10 j
11 k
12 l
13 m
14 n
15 o
16 p
17 q
18 r
19 s
20 t
21 u
22 v
23 w
24 x
25 y
26 z

func Reduce

func Reduce(slice Slice, reduce interface{}) Slice

Reduce returns a slice that reduces elements pairwise. Reduce operations must be commutative and associative. Schematically:

Reduce(Slice<k, v>, func(v1, v2 v) v) Slice<k, v>

The provided reducer function is invoked to aggregate values of type v. Reduce can perform map-side "combining", so that data are reduced to their aggregated value aggressively. This can often speed up computations significantly.

The slice to be reduced must have exactly 1 residual column: that is, its prefix must leave just one column as the value column to be aggregated.

TODO(marius): Reduce currently maintains the working set of keys in memory, and is thus appropriate only where the working set can fit in memory. For situations where this is not the case, Cogroup should be used instead (at an overhead). Reduce should spill to disk when necessary.

TODO(marius): consider pushing combiners into task dependency definitions so that we can combine-read all partitions on one machine simultaneously.

Example

Code:

package main

import (
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
)

func main() {
	slice := bigslice.Const(2,
		[]string{"c", "a", "b", "c", "c", "b", "a", "a", "a", "a", "c"},
		[]int{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
	)
	slice = bigslice.Reduce(slice, func(a, b int) int { return a + b })
	slicetest.Print(slice)
}
a 5
b 2
c 4

func Repartition

func Repartition(slice Slice, fn interface{}) Slice

Repartition (re-)partitions the slice according to the provided function fn, which is invoked for each record in the slice to assign that record's shard. The function is supplied with the number of shards to partition over as well as the column values; the assigned shard is returned.

Schematically:

Repartition(Slice<t1, t2, ..., tn> func(nshard int, v1 t1, ..., vn tn) int)  Slice<t1, t2, ..., tn>
Example

Code:

package main

import (
	"fmt"
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
)

func main() {
	// Count rows per shard before and after using Repartition to get ideal
	// partitioning by taking advantage of the knowledge that our keys are
	// sequential integers.

	// countRowsPerShard is a utility that counts the number of rows per shard
	// and stores it in rowsPerShard.
	var rowsPerShard []int
	countRowsPerShard := func(numShards int, slice bigslice.Slice) bigslice.Slice {
		rowsPerShard = make([]int, numShards)
		return bigslice.WriterFunc(slice,
			func(shard int, _ struct{}, _ error, xs []int) error {
				rowsPerShard[shard] += len(xs)
				return nil
			},
		)
	}

	const numShards = 2
	slice := bigslice.Const(numShards, []int{1, 2, 3, 4, 5, 6})

	slice0 := countRowsPerShard(numShards, slice)
	fmt.Println("# default partitioning")
	fmt.Println("## slice contents")
	slicetest.Print(slice0)
	fmt.Println("## row count per shard")
	for shard, count := range rowsPerShard {
		fmt.Printf("shard:%d count:%d\n", shard, count)
	}

	slice1 := bigslice.Repartition(slice, func(nshard, x int) int {
		// We know our slice keys are sequential integers, so we partition
		// perfectly with mod.
		return x % nshard
	})
	slice1 = countRowsPerShard(numShards, slice1)
	fmt.Println("# repartitioned")
	// Note that the slice contents are unchanged.
	fmt.Println("## slice contents")
	slicetest.Print(slice1)
	// Note that the partitioning has changed.
	fmt.Println("## row count per shard")
	for shard, count := range rowsPerShard {
		fmt.Printf("shard:%d count:%d\n", shard, count)
	}
}
# default partitioning
## slice contents
1
2
3
4
5
6
## row count per shard
shard:0 count:4
shard:1 count:2
# repartitioned
## slice contents
1
2
3
4
5
6
## row count per shard
shard:0 count:3
shard:1 count:3

func Reshard

func Reshard(slice Slice, nshard int) Slice

Reshard returns a slice that is resharded to the given number of shards; this is done by re-shuffling to the provided number of shards.

Example

Code:

package main

import (
	"fmt"
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
)

func main() {
	// Count rows per shard before and after using Reshard to change the number
	// of shards from 2 to 4.

	// countRowsPerShard is a utility that counts the number of rows per shard
	// and stores it in rowsPerShard.
	var rowsPerShard []int
	countRowsPerShard := func(numShards int, slice bigslice.Slice) bigslice.Slice {
		rowsPerShard = make([]int, numShards)
		return bigslice.WriterFunc(slice,
			func(shard int, _ struct{}, _ error, xs []int) error {
				rowsPerShard[shard] += len(xs)
				return nil
			},
		)
	}

	const beforeNumShards = 2
	slice := bigslice.Const(beforeNumShards, []int{1, 2, 3, 4, 5, 6})

	before := countRowsPerShard(beforeNumShards, slice)
	fmt.Println("# before")
	fmt.Println("## slice contents")
	slicetest.Print(before)
	fmt.Println("## row count per shard")
	for shard, count := range rowsPerShard {
		fmt.Printf("shard:%d count:%d\n", shard, count)
	}

	// Reshard to 4 shards.
	const afterNumShards = 4
	after := bigslice.Reshard(slice, afterNumShards)
	after = countRowsPerShard(afterNumShards, after)
	fmt.Println("# after")
	fmt.Println("## slice contents")
	slicetest.Print(after)
	fmt.Println("## row count per shard")
	for shard, count := range rowsPerShard {
		fmt.Printf("shard:%d count:%d\n", shard, count)
	}
}
# before
## slice contents
1
2
3
4
5
6
## row count per shard
shard:0 count:4
shard:1 count:2
# after
## slice contents
1
2
3
4
5
6
## row count per shard
shard:0 count:2
shard:1 count:1
shard:2 count:1
shard:3 count:2

func Reshuffle

func Reshuffle(slice Slice) Slice

Reshuffle returns a slice that shuffles rows by prefix so that all rows with equal prefix values end up in the same shard. Rows are not sorted within a shard.

The output slice has the same type as the input.

TODO: Add ReshuffleSort, which also sorts keys within each shard.

Example

Code:

package main

import (
	"fmt"
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
)

func main() {
	// Count rows per shard before and after a Reshuffle, showing same-keyed
	// rows all go to the same shard.

	// countRowsPerShard is a utility that counts the number of rows per shard
	// and stores it in rowsPerShard.
	var rowsPerShard []int
	countRowsPerShard := func(numShards int, slice bigslice.Slice) bigslice.Slice {
		rowsPerShard = make([]int, numShards)
		return bigslice.WriterFunc(slice,
			func(shard int, _ struct{}, _ error, xs []int) error {
				rowsPerShard[shard] += len(xs)
				return nil
			},
		)
	}

	const numShards = 2
	slice := bigslice.Const(numShards, []int{1, 2, 3, 4, 5, 6})
	slice = bigslice.Map(slice, func(_ int) int { return 0 })

	before := countRowsPerShard(numShards, slice)
	fmt.Println("# before")
	fmt.Println("## slice contents")
	slicetest.Print(before)
	fmt.Println("## row count per shard")
	for shard, count := range rowsPerShard {
		fmt.Printf("shard:%d count:%d\n", shard, count)
	}

	after := bigslice.Reshuffle(slice)
	after = countRowsPerShard(numShards, after)
	fmt.Println("# after")
	// We set all our keys to 0. After reshuffling, all rows will be in the same
	// shard.
	fmt.Println("## slice contents")
	slicetest.Print(after)
	fmt.Println("## row count per shard")
	for shard, count := range rowsPerShard {
		fmt.Printf("shard:%d count:%d\n", shard, count)
	}
}
# before
## slice contents
0
0
0
0
0
0
## row count per shard
shard:0 count:4
shard:1 count:2
# after
## slice contents
0
0
0
0
0
0
## row count per shard
shard:0 count:6
shard:1 count:0

func Scan

func Scan(slice Slice, scan func(shard int, scanner *sliceio.Scanner) error) Slice

Scan invokes a function for each shard of the input Slice. It returns a unit Slice: Scan is inteded to be used for its side effects.

Example

Code:

package main

import (
	"bufio"
	"context"
	"fmt"
	"github.com/grailbio/base/log"
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/sliceio"
	"github.com/grailbio/bigslice/slicetest"
	"io/ioutil"
	"os"
	"sort"
)

func init() {
	log.AddFlags()
}

func main() {
	// Use Scan to write a file for each shard of the input shard. Each file
	// will contain a line for each row in the shard.
	const numShards = 2
	slice := bigslice.Const(numShards,
		[]string{"a", "b", "c", "a", "b", "c"},
		[]int{3, 3, 2, 2, 1, 1},
	)
	// For this simple example, use shared memory to store the paths to these
	// files so that we can easily aggregate the files for output. If we were
	// distributing this computation across machines without access to shared
	// memory, we'd need to use a different mechanism, e.g. write files to a
	// common backing store with a known prefix.
	shardPaths := make([]string, numShards)
	slice = bigslice.Scan(slice,
		func(shard int, scanner *sliceio.Scanner) error {
			file, err := ioutil.TempFile("", "example-scan")
			if err != nil {
				return fmt.Errorf("could not open temp file: %v", err)
			}
			shardPaths[shard] = file.Name()
			var (
				s string
				x int
			)
			for scanner.Scan(context.Background(), &s, &x) {
				// Write a line in the file with the labeled elements of the
				// row.
				line := fmt.Sprintf("s:%s x:%d\n", s, x)
				if _, err = file.WriteString(line); err != nil {
					return fmt.Errorf("error writing file %s: %v", file.Name(), err)
				}
			}
			if err = file.Close(); err != nil {
				return fmt.Errorf("error closing file: %v", err)
			}
			return scanner.Err()
		},
	)
	// Print the resulting slice. This forces evaluation of the slice. Notice
	// that this prints no output because slice is empty. Scanning consumes the
	// slice.
	fmt.Println("# slice")
	slicetest.Print(slice)

	// slicetest.Print evaluates the slice, so we now make sure to clean up
	// after ourselves.
	for _, path := range shardPaths {
		defer os.Remove(path)
	}
	fmt.Println("# lines by shard")
	for shard, path := range shardPaths {
		fmt.Printf("## shard %d\n", shard)
		// Read and sort the lines for deterministic output.
		var lines []string
		file, err := os.Open(path)
		if err != nil {
			log.Fatalf("error opening %s for reading: %v", path, err)
		}
		scanner := bufio.NewScanner(file)
		for scanner.Scan() {
			lines = append(lines, scanner.Text())
		}
		if scannerErr := scanner.Err(); scannerErr != nil {
			log.Fatalf("error scanning %s: %v", path, scannerErr)
		}
		sort.Strings(lines)
		for _, line := range lines {
			fmt.Println(line)
		}
	}
}
# slice
# lines by shard
## shard 0
s:a x:2
s:a x:3
s:b x:3
s:c x:2
## shard 1
s:b x:1
s:c x:1

func ScanReader

func ScanReader(nshard int, reader func() (io.ReadCloser, error)) Slice

ScanReader returns a slice of strings that are scanned from the provided reader. ScanReader shards the file by lines. Note that since ScanReader is unaware of the underlying data layout, it may be inefficient for highly parallel access: each shard must read the full file, skipping over data not belonging to the shard.

Example

Code:

package main

import (
	"bytes"
	"fmt"
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
	"io"
	"io/ioutil"
)

func main() {
	var b bytes.Buffer
	for i := 0; i < 10; i++ {
		fmt.Fprint(&b, i, "\n")
	}
	slice := bigslice.ScanReader(2, func() (io.ReadCloser, error) {
		return ioutil.NopCloser(bytes.NewReader(b.Bytes())), nil
	})
	slicetest.Print(slice)
}
0
1
2
3
4
5
6
7
8
9

func Unwrap

func Unwrap(slice Slice) Slice

Unwrap returns the underlying slice if the provided slice is used only to amend the type of the slice it composes.

TODO(marius): this is required to properly compile slices that use the prefix combinator; we should have a more general and robust solution to this.

func WriterFunc

func WriterFunc(slice Slice, write interface{}) Slice

WriterFunc returns a Slice that is functionally equivalent to the input Slice, allowing for computation with side effects by the provided write function. The write function must be of the form:

func(shard int, state stateType, err error, col1 []col1Type, col2 []col2Type, ..., colN []colNType) error

where the input slice is of the form:

Slice<col1Type, col2Type, ..., colNType>

The write function is invoked with every read of the input Slice. Each column slice will be of the same length and will be populated with the data from the read. For performance, the passed column slices share memory with the internal frame of the read. Do not modify the data in them, and assume that they will be modified once write returns.

The write function should return a non-nil error if there is a problem writing, e.g. the write function encounters and error while writing to a file. It should otherwise return nil.

Any error from the read, including EOF, will be passed as err to the write function. Note that err may be EOF when column lengths are >0, similar to the semantics of sliceio.Reader.Read.

If the write function performs I/O, it is recommended that the I/O be buffered to allow downstream computations to progress.

WriterFunc provides the function with a zero-value state upon the first invocation of the function for a given shard. (If the state argument is a pointer, it is allocated.) Subsequent invocations of the function receive the same state value, thus permitting the writer to maintain local state across the write of the whole shard.

Example

Code:

package main

import (
	"bufio"
	"fmt"
	"github.com/grailbio/base/log"
	"github.com/grailbio/bigslice"
	"github.com/grailbio/bigslice/slicetest"
	"io/ioutil"
	"os"
	"sort"
)

func main() {
	// Use WriterFunc to write a file for each shard of the input shard. Each
	// file will contain a line for each row in the shard.
	const numShards = 2
	slice := bigslice.Const(numShards,
		[]string{"a", "b", "c", "a", "b", "c"},
		[]int{3, 3, 2, 2, 1, 1},
	)
	// For this simple example, use shared memory to store the paths to these
	// files so that we can easily aggregate the files for output. If we were
	// distributing this computation across machines without access to shared
	// memory, we'd need to use a different mechanism, e.g. write files to a
	// common backing store with a known prefix.
	shardPaths := make([]string, numShards)
	type writeState struct {
		file *os.File
	}
	slice = bigslice.WriterFunc(slice,
		func(shard int, state *writeState, readErr error, ss []string, xs []int) error {
			if state.file == nil {
				// First call; initialize state.
				var err error
				if state.file, err = ioutil.TempFile("", "example-writer-func"); err != nil {
					return fmt.Errorf("could not open temp file: %v", err)
				}
				shardPaths[shard] = state.file.Name()
			}
			for i := range ss {
				// We can safely assume that ss and xs are of equal length.
				s := ss[i]
				x := xs[i]
				// Write a line in the file with the labeled elements of the
				// row.
				line := fmt.Sprintf("s:%s x:%d\n", s, x)
				if _, err := state.file.WriteString(line); err != nil {
					return fmt.Errorf("error writing file: %v", err)
				}
			}
			if readErr != nil {
				// No more data is coming, so we close our file.
				if err := state.file.Close(); err != nil {
					return fmt.Errorf("error closing file: %v", err)
				}
			}
			return nil
		},
	)
	// Note that the slice passes through unadulterated.
	fmt.Println("# slice")
	slicetest.Print(slice)

	// slicetest.Print evaluates the slice, so we now make sure to clean up
	// after ourselves.
	for _, path := range shardPaths {
		defer os.Remove(path)
	}
	fmt.Println("# lines by shard")
	for shard, path := range shardPaths {
		fmt.Printf("## shard %d\n", shard)
		// Read and sort the lines for deterministic output.
		var lines []string
		file, err := os.Open(path)
		if err != nil {
			log.Fatalf("error opening %s for reading: %v", path, err)
		}
		scanner := bufio.NewScanner(file)
		for scanner.Scan() {
			lines = append(lines, scanner.Text())
		}
		if scannerErr := scanner.Err(); scannerErr != nil {
			log.Fatalf("error scanning %s: %v", path, scannerErr)
		}
		sort.Strings(lines)
		for _, line := range lines {
			fmt.Println(line)
		}
	}
}
# slice
a 2
a 3
b 1
b 3
c 1
c 2
# lines by shard
## shard 0
s:a x:2
s:a x:3
s:b x:3
s:c x:2
## shard 1
s:b x:1
s:c x:1

Package Files

BUGs

  • Fold does not yet support slice grouping

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier