Documentation ¶
Index ¶
- Variables
- func DefaultFloat64Comparator(a, b float64) int64
- func DefaultFloat64LessThanComparator(a, b float64) bool
- func DefaultInt64Comparator(a, b int64) int64
- func DefaultInt64LessThanComparator(a, b int64) bool
- func DefaultStringComparator(a, b string) int64
- func DefaultStringLessThanComparator(a, b string) bool
- func HashByKey(input reflect.Value, shard int) int
- func RegisterContextRunner(r ContextRunner)
- func RegisterTaskRunner(r TaskRunner)
- type ContextRunner
- type Dataset
- func (d *Dataset) Filter(f interface{}) *Dataset
- func (d *Dataset) GetShards() []*DatasetShard
- func (d *Dataset) Join(other *Dataset) *Dataset
- func (this *Dataset) JoinHashedSorted(that *Dataset, compareFunc interface{}, isLeftOuterJoin, isRightOuterJoin bool) (ret *Dataset)
- func (d *Dataset) LocalReduce(f interface{}) *Dataset
- func (d *Dataset) LocalReduceByKey(f interface{}) *Dataset
- func (d *Dataset) LocalSort(f interface{}) *Dataset
- func (d *Dataset) Map(f interface{}) *Dataset
- func (d *Dataset) MergeReduce(f interface{}) (ret *Dataset)
- func (d *Dataset) MergeSorted(f interface{}) (ret *Dataset)
- func (d *Dataset) Partition(shard int) *Dataset
- func (d *Dataset) Reduce(f interface{}) (ret *Dataset)
- func (d *Dataset) RunSelf(stepId int)
- func (d *Dataset) SetupShard(n int)
- func (d *Dataset) Sort(f interface{}) (ret *Dataset)
- type DatasetShard
- type FlowContext
- func (f *FlowContext) AddAllToOneStep(input *Dataset, output *Dataset) (s *Step)
- func (f *FlowContext) AddEveryNToOneStep(input *Dataset, m int, output *Dataset) (s *Step)
- func (f *FlowContext) AddOneToAllStep(input *Dataset, output *Dataset) (s *Step)
- func (f *FlowContext) AddOneToEveryNStep(input *Dataset, n int, output *Dataset) (s *Step)
- func (f *FlowContext) AddOneToOneStep(input *Dataset, output *Dataset) (s *Step)
- func (f *FlowContext) MergeDatasets1ShardTo1Step(inputs []*Dataset, output *Dataset) (s *Step)
- func (fc *FlowContext) Run()
- func (fc *FlowContext) Source(f interface{}, shard int) (ret *Dataset)
- func (fc *FlowContext) TextFile(fname string, shard int) (ret *Dataset)
- type Step
- type StepType
- type Task
- type TaskRunner
Constants ¶
This section is empty.
Variables ¶
var Contexts []*FlowContext
reference by task runner on remote mode
Functions ¶
func DefaultInt64Comparator ¶
func DefaultStringComparator ¶
func RegisterContextRunner ¶
func RegisterContextRunner(r ContextRunner)
Invoked by driver task runner
func RegisterTaskRunner ¶
func RegisterTaskRunner(r TaskRunner)
Types ¶
type ContextRunner ¶
type ContextRunner interface { Run(fc *FlowContext) IsDriverMode() bool }
type Dataset ¶
type Dataset struct { Id int Type reflect.Type Shards []*DatasetShard Step *Step ErrorChan chan error Generator func() // contains filtered or unexported fields }
func NewDataset ¶
func NewDataset(context *FlowContext, t reflect.Type) *Dataset
func (*Dataset) GetShards ¶
func (d *Dataset) GetShards() []*DatasetShard
func (*Dataset) JoinHashedSorted ¶
func (this *Dataset) JoinHashedSorted(that *Dataset, compareFunc interface{}, isLeftOuterJoin, isRightOuterJoin bool, ) (ret *Dataset)
Join multiple datasets that are sharded by the same key, and locally sorted within the shard "this" dataset is the driving dataset and should have more data than "that"
func (*Dataset) LocalReduce ¶
f(V, V) V : less than function New Dataset contains V
func (*Dataset) LocalReduceByKey ¶
func (*Dataset) Map ¶
map can work with multiple kinds of inputs and outputs 1. If 2 inputs, the first input is key, the second input is value 2. If 1 input, the input is value. 3. If last input is channel, the output goes into the channel 4. If 2 output, the first output is key, the second output is value 5. If 1 output, the output is value. 6. A map function may not necessarily have any output.
f(A, chan B) input, type is same as parent Dataset's type output chan, element type is same as current Dataset's type
func (*Dataset) MergeReduce ¶
func (*Dataset) MergeSorted ¶
func (*Dataset) Partition ¶
hash data or by data key, return a new dataset This is devided into 2 steps: 1. Each record is sharded to a local shard 2. The destination shard will collect its child shards and merge into one
func (*Dataset) SetupShard ¶
type DatasetShard ¶
type FlowContext ¶
func NewContext ¶
func NewContext() (fc *FlowContext)
func (*FlowContext) AddAllToOneStep ¶
func (f *FlowContext) AddAllToOneStep(input *Dataset, output *Dataset) (s *Step)
the task should run on the destination dataset shard
func (*FlowContext) AddEveryNToOneStep ¶
func (f *FlowContext) AddEveryNToOneStep(input *Dataset, m int, output *Dataset) (s *Step)
func (*FlowContext) AddOneToAllStep ¶
func (f *FlowContext) AddOneToAllStep(input *Dataset, output *Dataset) (s *Step)
the task should run on the source dataset shard input is nil for initial source dataset
func (*FlowContext) AddOneToEveryNStep ¶
func (f *FlowContext) AddOneToEveryNStep(input *Dataset, n int, output *Dataset) (s *Step)
func (*FlowContext) AddOneToOneStep ¶
func (f *FlowContext) AddOneToOneStep(input *Dataset, output *Dataset) (s *Step)
the tasks should run on the source dataset shard
func (*FlowContext) MergeDatasets1ShardTo1Step ¶
func (f *FlowContext) MergeDatasets1ShardTo1Step(inputs []*Dataset, output *Dataset) (s *Step)
All dataset should have the same number of shards.
func (*FlowContext) Run ¶
func (fc *FlowContext) Run()
func (*FlowContext) Source ¶
func (fc *FlowContext) Source(f interface{}, shard int) (ret *Dataset)
f(chan A)
type Step ¶
type Task ¶
type Task struct { Id int Inputs []*DatasetShard Outputs []*DatasetShard Step *Step }
type TaskRunner ¶
type TaskRunner interface { Run(fc *FlowContext) IsTaskMode() bool }