Bigslice is a serverless cluster data processing system for Go. Bigslice exposes composable API that lets the user express data processing tasks in terms of a series of data transformations that invoke user code. The Bigslice runtime then transparently parallelizes and distributes the work, using the Bigmachine library to create an ad hoc cluster on a cloud provider.

Developing Bigslice

Bigslice uses Go modules to capture its dependencies; no tooling other than the base Go install is required.

$ git clone
$ cd bigslice
$ GO111MODULE=on go test

If tests fail with socket: too many open files errors, try increasing the maximum number of open files by running

$ ulimit -n 2000
Expand ▾ Collapse ▴



Package bigslice implements a distributed data processing system. Users compose computations by operating over large collections ("big slices") of data, transforming them with a handful of combinators. While users express computations using collections-style operations, bigslice takes care of the details of parallel execution and distribution across multiple machines.

Bigslice jobs can run locally, but uses bigmachine for distribution among a cluster of compute nodes. In either case, user code does not change; the details of distribution are handled by the combination of bigmachine and bigslice.

Because Go cannot easily serialize code to be sent over the wire and executed remotely, bigslice programs have to be written with a few constraints:

1. All slices must be constructed by bigslice funcs (bigslice.Func), and all such functions must be instantiated before bigslice.Start is called. This rule is easy to follow: if funcs are global variables, and bigslice.Start is called from a program's main, then the program is compliant.

2. The driver program must be compiled on the same GOOS and GOARCH as the target architecture. When running locally, this is not a concern, but programs that require distribution must be run from a linux/amd64 binary. Bigslice also supports the fat binary format implemented by The bigslice tool ( uses this package to compile portable fat binaries.

Some Bigslice operations may be annotated with runtime pragmas: directives for the Bigslice runtime. See Pragma for details.

User provided functions in Bigslice

Functions provided to the various bigslice combinators (e.g., bigslice.Map) may take an additional argument of type context.Context. If specified, then the lifetime of the context is tied to that of the underlying bigslice task. Additionally, the context carries a metrics scope ( which can be used to update metric values during data processing.



This section is empty.


This section is empty.


func FuncLocations

func FuncLocations() []string

FuncLocations returns a slice of strings that describe the locations of Func creation, in the same order as the Funcs registry. We use this to verify that worker processes have the same Funcs. Note that this is not a precisely correct verification, as it's possible to define multiple Funcs on the same line. However, it's good enough for the scenarios we have encountered or anticipate.

func FuncLocationsDiff

func FuncLocationsDiff(lhs, rhs []string) []string

FuncLocationsDiff returns a slice of strings that describes the differences between lhs and rhs locations slices as returned by FuncLocations. The slice is a unified diff between the slices, so if you print each element on a line, you'll get interpretable output. For example:

for _, edit := FuncLocationsDiff([]string{"a", "b", "c"}, []string{"a", "c"}) {

will produce:

- b

If the slices are identical, it returns nil.

func Helper

func Helper()

Helper is used to mark a function as a helper function: names for newly created slices will be attributed to the caller of the function instead of the function itself.

func String

func String(slice Slice) string

String returns a string describing the slice and its type.


type Accumulator

type Accumulator interface {
	// Accumulate the provided columns of length n.
	Accumulate(in frame.Frame, n int)
	// Read a batch of accumulated values into keys and values. These
	// are slices of the key type and accumulator type respectively.
	Read(keys, values reflect.Value) (int, error)

An Accumulator represents a stateful accumulation of values of a certain type. Accumulators maintain their state in memory.

Accumulators should be read only after accumulation is complete.

type Dep

type Dep struct {
	Shuffle     bool
	Partitioner Partitioner
	// Expand indicates that each shard of a shuffle dependency (i.e.,
	// all the shards of a given partition) should be expanded (i.e.,
	// not merged) when handed to the slice implementation. This is to
	// support merge-sorting of shards of the same partition.
	Expand bool

A Dep is a Slice dependency. Deps comprise a slice and a boolean flag determining whether this is represents a shuffle dependency. Shuffle dependencies must perform a data shuffle step: the dependency must partition its output according to the Slice's partitioner, and, when the dependent Slice is computed, the evaluator must pass in Readers that read a single partition from all dependent shards. If Shuffle is true, then the provided partitioner determines how the output is partitioned. If it is nil, the default (hash by first column) partitioner is used.

type FuncValue

type FuncValue struct {

	// contains filtered or unexported fields


A FuncValue represents a Bigslice function, as returned by Func.

func Func

func Func(fn interface{}) *FuncValue

Func creates a bigslice function from the provided function value. Bigslice funcs must return a single Slice value. Funcs provide bigslice with a means of dynamic abstraction: since Funcs can be invoked remotely, dynamically created slices may be named across process boundaries.

func (*FuncValue) Apply

func (f *FuncValue) Apply(args ...interface{}) Slice

Apply invokes the function f with the provided arguments, returning the computed Slice. Apply panics with a type error if argument type or arity do not match.

func (*FuncValue) Exclusive

func (f *FuncValue) Exclusive() *FuncValue

Exclusive marks this func to require mutually exclusive machine allocation.

NOTE: This is an experimental API that may change.

func (*FuncValue) In

func (f *FuncValue) In(i int) reflect.Type

In returns the i'th argument type of function f.

func (*FuncValue) Invocation

func (f *FuncValue) Invocation(location string, args ...interface{}) Invocation

Invocation creates an invocation representing the function f applied to the provided arguments. Invocation panics with a type error if the provided arguments do not match in type or arity.

func (*FuncValue) NumIn

func (f *FuncValue) NumIn() int

NumIn returns the number of input arguments to f.

type Invocation

type Invocation struct {
	Index     uint64
	Func      uint64
	Args      []interface{}
	Exclusive bool
	Location  string

Invocation represents an invocation of a Bigslice func of the same binary. Invocations can be transmitted across process boundaries and thus may be invoked by remote executors.

Each invocation carries an invocation index, which is a unique index for invocations within a process namespace. It can thus be used to represent a particular function invocation from a driver process.

Invocations must be created by newInvocation.

func (Invocation) Invoke

func (i Invocation) Invoke() Slice

Invoke performs the Func invocation represented by this Invocation instance, returning the resulting slice.

func (Invocation) String

func (inv Invocation) String() string

type Name

type Name struct {
	// Op is the operation that the slice performs (e.g. "reduce", "map")
	Op string
	// File is the file in which the slice was defined.
	File string
	// Line is the line in File at which the slice was defined.
	Line int
	// Index disambiguates slices created on the same File and Line.
	Index int

Name is a unique name for a slice, constructed with useful context for diagnostic or status display.

func MakeName

func MakeName(op string) Name

func (Name) String

func (n Name) String() string

type Partitioner

type Partitioner func(ctx context.Context, frame frame.Frame, nshard int, shards []int)

A Partitioner is used to assign partitions to rows in a frame.

type Pragma

type Pragma interface {
	// Procs returns the number of procs a slice task needs to run. It is
	// superceded by Exclusive and clamped to the maximum number of procs per
	// machine.
	Procs() int
	// Exclusive indicates that a slice task should be given
	// exclusive access to the underlying machine.
	Exclusive() bool
	// Materialize indicates that the result of the slice task should be
	// materialized, i.e. break pipelining.
	Materialize() bool

Pragma comprises runtime directives used during bigslice execution.

var Exclusive Pragma = exclusive{}

Exclusive is a Pragma that indicates the slice task should be given exclusive access to the machine that runs it. Exclusive takes precedence over Procs.

var ExperimentalMaterialize Pragma = materialize{}

ExperimentalMaterialize is a Pragma that indicates the slice task results should be materialized, i.e. not pipelined. You may want to use this to materialize and reuse results of tasks that would normally have been pipelined.

It is tagged "experimental" because we are considering other ways of achieving this.

TODO(jcharumilind): Consider doing this automatically for slices on which multiple slices depend.

func Procs

func Procs(n int) Pragma

Procs returns a pragma that sets the number of procs a slice task needs to run to n. It is superceded by Exclusive and clamped to the maximum number of procs per machine.

type Pragmas

type Pragmas []Pragma

Pragmas composes multiple underlying Pragmas.

func (Pragmas) Exclusive

func (p Pragmas) Exclusive() bool

Exclusive implements Pragma.

func (Pragmas) Materialize

func (p Pragmas) Materialize() bool

Materialize implements Pragma.

func (Pragmas) Procs

func (p Pragmas) Procs() int

Procs implements Pragma. If multiple tasks with Procs pragmas are pipelined, we allocate the maximum to the composed pipeline.

type ShardType

type ShardType int

ShardType indicates the type of sharding used by a Slice.

const (
	// HashShard Slices are partitioned by an (unspecified)
	// hash of an record. That is, the same record should
	// be assigned a stable shard number.
	HashShard ShardType = iota
	// RangeShard Slices are partitioned by the range of a key. The key
	// is always the first column of the slice.

type Slice

type Slice interface {

	// Name returns a unique (composite) name for this Slice that also has
	// useful context for diagnostic or status display.
	Name() Name

	// NumShard returns the number of shards in this Slice.
	NumShard() int
	// ShardType returns the sharding type of this Slice.
	ShardType() ShardType

	// NumDep returns the number of dependencies of this Slice.
	NumDep() int
	// Dep returns the i'th dependency for this Slice.
	Dep(i int) Dep

	// Combiner is an optional function that is used to combine multiple values
	// with the same key from the slice's output. No combination is performed
	// if Nil.
	Combiner() slicefunc.Func

	// Reader returns a Reader for a shard of this Slice. The reader itself
	// computes the shard's values on demand. The caller must provide Readers
	// for all of this shard's dependencies, constructed according to the
	// dependency type (see Dep).
	Reader(shard int, deps []sliceio.Reader) sliceio.Reader

A Slice is a shardable, ordered dataset. Each slice consists of zero or more columns of data distributed over one or more shards. Slices may declare dependencies on other slices from which it is computed. In order to compute a slice, its dependencies must first be computed, and their resulting Readers are passed to a Slice's Reader method.

Since Go does not support generic typing, Slice combinators perform their own dynamic type checking. Schematically we write the n-ary slice with types t1, t2, ..., tn as Slice<t1, t2, ..., tn>.

Types that implement the Slice interface must be comparable.

func Cache

func Cache(ctx context.Context, slice Slice, prefix string) Slice

Cache caches the output of a slice to the given file prefix. Cached data are stored as "prefix-nnnn-of-mmmm" for shards nnnn of mmmm. When the slice is computed, each shard is encoded and written to a separate file with this prefix. If all shards exist, then Cache shortcuts computation and instead reads directly from the previously computed output. The user must guarantee cache consistency: if the cache could be invalid (e.g., because of code changes), the user is responsible for removing existing cached files, or picking a different prefix that correctly represents the operation to be cached.

Cache uses GRAIL's file library, so prefix may refer to URLs to a distributed object store such as S3.

func CachePartial

func CachePartial(ctx context.Context, slice Slice, prefix string) Slice

CachePartial caches the output of the slice to the given file prefix (it uses the same file naming scheme as Cache). However, unlike Cache, if CachePartial finds incomplete cached results (from an earlier failed or interrupted run), it will use them and recompute only the missing data.

WARNING: The user is responsible for ensuring slice's contents are deterministic between bigslice runs. If keys are non-deterministic, for example due to pseudorandom seeding based on time, or reading the state of a modifiable file in S3, CachePartial produces corrupt results.

As with Cache, the user must guarantee cache consistency.

func Cogroup

func Cogroup(slices ...Slice) Slice

Cogroup returns a slice that, for each key in any slice, contains the group of values for that key, in each slice. Schematically:

Cogroup(Slice<tk1, ..., tkp, t11, ..., t1n>, Slice<tk1, ..., tkp, t21, ..., t2n>, ..., Slice<tk1, ..., tkp, tm1, ..., tmn>)
	Slice<tk1, ..., tkp, []t11, ..., []t1n, []t21, ..., []tmn>

It thus implements a form of generalized JOIN and GROUP.

Cogroup uses the prefix columns of each slice as its key; keys must be partitionable.

TODO(marius): don't require spilling to disk when the input data set is small enough.

TODO(marius): consider providing a version that returns scanners in the returned slice, so that we can stream through. This would require some changes downstream, however, so that buffering and encoding functionality also know how to read scanner values.

func Const

func Const(nshard int, columns ...interface{}) Slice

Const returns a Slice representing the provided value. Each column of the Slice should be provided as a Go slice of the column's type. The value is split into nshard shards.

func Filter

func Filter(slice Slice, pred interface{}, prags ...Pragma) Slice

Filter returns a slice where the provided predicate is applied to each element in the given slice. The output slice contains only those entries for which the predicate is true.

The predicate function should receive each column of slice and return a single boolean value.


Filter(Slice<t1, t2, ..., tn>, func(t1, t2, ..., tn) bool) Slice<t1, t2, ..., tn>

func Flatmap

func Flatmap(slice Slice, fn interface{}, prags ...Pragma) Slice

Flatmap returns a Slice that applies the function fn to each record in the slice, flattening the returned slice. That is, the function fn should be of the form:

func(in1 inType1, in2 inType2, ...) (out1 []outType1, out2 []outType2)


Flatmap(Slice<t1, t2, ..., tn>, func(v1 t1, v2 t2, ..., vn tn) ([]r1, []r2, ..., []rn)) Slice<r1, r2, ..., rn>

func Fold

func Fold(slice Slice, fold interface{}) Slice

Fold returns a slice that aggregates values by the first column using a custom aggregation function. For an input slice Slice<t1, t2, ..., tn>, Fold requires that the provided accumulator function follow the form:

func(accum acctype, v2 t2, ..., vn tn) acctype

The function is invoked once for each slice element with the same value for column 1 (t1). On the first invocation, the accumulator is passed the zero value of its accumulator type.

Fold requires that the first column of the slice is partitionable. See the documentation for Keyer for more details.


Fold(Slice<t1, t2, ..., tn>, func(accum acctype, v2 t2, ..., vn tn) acctype) Slice<t1, acctype>

BUG(marius): Fold does not yet support slice grouping

func Head(slice Slice, n int) Slice

Head returns a slice that returns at most the first n items from each shard of the underlying slice. Its type is the same as the provided slice.

func Map

func Map(slice Slice, fn interface{}, prags ...Pragma) Slice

Map transforms a slice by invoking a function for each record. The type of slice must match the arguments of the function fn. The type of the returned slice is the set of columns returned by fn. The returned slice matches the input slice's sharding, but is always hash partitioned.


Map(Slice<t1, t2, ..., tn>, func(v1 t1, v2 t2, ..., vn tn) (r1, r2, ..., rn)) Slice<r1, r2, ..., rn>

func Prefixed

func Prefixed(slice Slice, prefix int) Slice

Prefixed returns a slice with the provided prefix. A prefix determines the number of columns (starting at 0) in the slice that compose the key values for that slice for operations like reduce.

func ReadCache

func ReadCache(ctx context.Context, typ slicetype.Type, numShard int, prefix string) Slice

ReadCache reads from an existing cache but does not write any cache itself. This may be useful if you want to reuse a cache from a previous computation and fail if it does not exist. typ is the type of the cached and returned slice. You may construct typ using slicetype.New or pass a Slice, which embeds slicetype.Type.

func ReaderFunc

func ReaderFunc(nshard int, read interface{}, prags ...Pragma) Slice

ReaderFunc returns a Slice that uses the provided function to read data. The function read must be of the form:

func(shard int, state stateType, col1 []col1Type, col2 []col2Type, ..., colN []colNType) (int, error)

This returns a slice of the form:

Slice<col1Type, col2Type, ..., colNType>

The function is invoked to fill a vector of elements. col1, ..., colN are preallocated slices that should be filled by the reader function. The function should return the number of elements that were filled. The error EOF should be returned when no more data are available.

ReaderFunc provides the function with a zero-value state upon the first invocation of the function for a given shard. (If the state argument is a pointer, it is allocated.) Subsequent invocations of the function receive the same state value, thus permitting the reader to maintain local state across the read of a whole shard.

func Reduce

func Reduce(slice Slice, reduce interface{}) Slice

Reduce returns a slice that reduces elements pairwise. Reduce operations must be commutative and associative. Schematically:

Reduce(Slice<k, v>, func(v1, v2 v) v) Slice<k, v>

The provided reducer function is invoked to aggregate values of type v. Reduce can perform map-side "combining", so that data are reduced to their aggregated value aggressively. This can often speed up computations significantly.

The slice to be reduced must have exactly 1 residual column: that is, its prefix must leave just one column as the value column to be aggregated.

TODO(marius): Reduce currently maintains the working set of keys in memory, and is thus appropriate only where the working set can fit in memory. For situations where this is not the case, Cogroup should be used instead (at an overhead). Reduce should spill to disk when necessary.

TODO(marius): consider pushing combiners into task dependency definitions so that we can combine-read all partitions on one machine simultaneously.

func Repartition

func Repartition(slice Slice, fn interface{}) Slice

Repartition (re-)partitions the slice according to the provided function fn, which is invoked for each record in the slice to assign that record's shard. The function is supplied with the number of shards to partition over as well as the column values; the assigned shard is returned.


Repartition(Slice<t1, t2, ..., tn> func(nshard int, v1 t1, ..., vn tn) int)  Slice<t1, t2, ..., tn>

func Reshard

func Reshard(slice Slice, nshard int) Slice

Reshard returns a slice that is resharded to the given number of shards; this is done by re-shuffling to the provided number of shards.

func Reshuffle

func Reshuffle(slice Slice) Slice

Reshuffle returns a slice that shuffles rows by prefix so that all rows with equal prefix values end up in the same shard. Rows are not sorted within a shard.

The output slice has the same type as the input.

TODO: Add ReshuffleSort, which also sorts keys within each shard.

func Scan

func Scan(slice Slice, scan func(shard int, scanner *sliceio.Scanner) error) Slice

Scan invokes a function for each shard of the input Slice. It returns a unit Slice: Scan is inteded to be used for its side effects.

func ScanReader

func ScanReader(nshard int, reader func() (io.ReadCloser, error)) Slice

ScanReader returns a slice of strings that are scanned from the provided reader. ScanReader shards the file by lines. Note that since ScanReader is unaware of the underlying data layout, it may be inefficient for highly parallel access: each shard must read the full file, skipping over data not belonging to the shard.

func Unwrap

func Unwrap(slice Slice) Slice

Unwrap returns the underlying slice if the provided slice is used only to amend the type of the slice it composes.

TODO(marius): this is required to properly compile slices that use the prefix combinator; we should have a more general and robust solution to this.

func WriterFunc

func WriterFunc(slice Slice, write interface{}) Slice

WriterFunc returns a Slice that is functionally equivalent to the input Slice, allowing for computation with side effects by the provided write function. The write function must be of the form:

func(shard int, state stateType, err error, col1 []col1Type, col2 []col2Type, ..., colN []colNType) error

where the input slice is of the form:

Slice<col1Type, col2Type, ..., colNType>

The write function is invoked with every read of the input Slice. Each column slice will be of the same length and will be populated with the data from the read. For performance, the passed column slices share memory with the internal frame of the read. Do not modify the data in them, and assume that they will be modified once write returns.

Any error from the read, including EOF, will be passed as err to the write function. Note that err may be EOF when column lengths are >0, similar to the semantics of sliceio.Reader.Read.

If the write function performs I/O, it is recommended that the I/O be buffered to allow downstream computations to progress.

WriterFunc provides the function with a zero-value state upon the first invocation of the function for a given shard. (If the state argument is a pointer, it is allocated.) Subsequent invocations of the function receive the same state value, thus permitting the writer to maintain local state across the write of the whole shard.



  • Fold does not yet support slice grouping


Path Synopsis
archive/tarslice Package tarslice implements bigslice operations for reading tar archives.
cmd/badfuncs Badfuncs is a binary that tests various scenarios of Func creation that may fail to satisfy the invariant that all workers share common definitions of Funcs.
cmd/bigslice/bigslicecmd Package bigslicecmd provides the core functionality of the bigslice command as a package for easier integration into external toolchains and setups.
cmd/slicer Slicer is a binary used to test and stress multiple aspects of Bigslice.
cmd/urls Urls is a bigslice demo program that uses the GDELT public data set aggregate counts by domain names mentioned in news event reports.
exec Package exec implements compilation, evaluation, and execution of Bigslice slice operations.
frame Package frame implements a typed, columnar data structure that represents data vectors throughout Bigslice.
internal/zero Package zero provides facilities for efficiently zeroing Go values.
metrics Package metrics defines a set of primitives for declaring and managing metrics within Bigslice.
sliceconfig Package sliceconfig provides a mechanism to create a bigslice session from a shared configuration.
slicefunc Package slicefunc provides types and code to call user-defined functions with Bigslice.
sliceio Package sliceio provides utilities for managing I/O for Bigslice operations.
slicetest Package slicetest provides utilities for testing Bigslice user code.
slicetype Package slicetype implements data types and utilities to describe Bigslice types: Slices, Frames, and Tasks all carry slicetype.Types.
sortio Package sortio provides facilities for sorting slice outputs and merging and reducing sorted record streams.
stats Package stats provides collections of counters.
typecheck Package typecheck contains a number of typechecking and inference utilities for bigslice operators.