Version: v0.0.0-...-4413804 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Sep 14, 2015 License: Apache-2.0 Imports: 7 Imported by: 0




This section is empty.


View Source
var Contexts []*FlowContext

reference by task runner on remote mode


func DefaultFloat64Comparator

func DefaultFloat64Comparator(a, b float64) int64

func DefaultFloat64LessThanComparator

func DefaultFloat64LessThanComparator(a, b float64) bool

func DefaultInt64Comparator

func DefaultInt64Comparator(a, b int64) int64

func DefaultInt64LessThanComparator

func DefaultInt64LessThanComparator(a, b int64) bool

func DefaultStringComparator

func DefaultStringComparator(a, b string) int64

func DefaultStringLessThanComparator

func DefaultStringLessThanComparator(a, b string) bool

func HashByKey

func HashByKey(input reflect.Value, shard int) int

func RegisterContextRunner

func RegisterContextRunner(r ContextRunner)

Invoked by driver task runner

func RegisterTaskRunner

func RegisterTaskRunner(r TaskRunner)


type ContextRunner

type ContextRunner interface {
	Run(fc *FlowContext)
	IsDriverMode() bool

type Dataset

type Dataset struct {
	Id int

	Type   reflect.Type
	Shards []*DatasetShard
	Step   *Step

	ErrorChan chan error
	Generator func()
	// contains filtered or unexported fields

func NewDataset

func NewDataset(context *FlowContext, t reflect.Type) *Dataset

func (*Dataset) Filter

func (d *Dataset) Filter(f interface{}) *Dataset


func (*Dataset) GetShards

func (d *Dataset) GetShards() []*DatasetShard

func (*Dataset) Join

func (d *Dataset) Join(other *Dataset) *Dataset

assume nothing about these two dataset

func (*Dataset) JoinHashedSorted

func (this *Dataset) JoinHashedSorted(that *Dataset,
	compareFunc interface{}, isLeftOuterJoin, isRightOuterJoin bool,
) (ret *Dataset)

Join multiple datasets that are sharded by the same key, and locally sorted within the shard "this" dataset is the driving dataset and should have more data than "that"

func (*Dataset) LocalReduce

func (d *Dataset) LocalReduce(f interface{}) *Dataset

f(V, V) V : less than function New Dataset contains V

func (*Dataset) LocalReduceByKey

func (d *Dataset) LocalReduceByKey(f interface{}) *Dataset

func (*Dataset) LocalSort

func (d *Dataset) LocalSort(f interface{}) *Dataset

f(V, V) bool : less than function New Dataset contains K,V

func (*Dataset) Map

func (d *Dataset) Map(f interface{}) *Dataset

map can work with multiple kinds of inputs and outputs 1. If 2 inputs, the first input is key, the second input is value 2. If 1 input, the input is value. 3. If last input is channel, the output goes into the channel 4. If 2 output, the first output is key, the second output is value 5. If 1 output, the output is value. 6. A map function may not necessarily have any output.

f(A, chan B) input, type is same as parent Dataset's type output chan, element type is same as current Dataset's type

func (*Dataset) MergeReduce

func (d *Dataset) MergeReduce(f interface{}) (ret *Dataset)

func (*Dataset) MergeSorted

func (d *Dataset) MergeSorted(f interface{}) (ret *Dataset)

func (*Dataset) Partition

func (d *Dataset) Partition(shard int) *Dataset

hash data or by data key, return a new dataset This is devided into 2 steps: 1. Each record is sharded to a local shard 2. The destination shard will collect its child shards and merge into one

func (*Dataset) Reduce

func (d *Dataset) Reduce(f interface{}) (ret *Dataset)

func (*Dataset) RunSelf

func (d *Dataset) RunSelf(stepId int)

func (*Dataset) SetupShard

func (d *Dataset) SetupShard(n int)

func (*Dataset) Sort

func (d *Dataset) Sort(f interface{}) (ret *Dataset)

type DatasetShard

type DatasetShard struct {
	Id        int
	Parent    *Dataset
	ReadChan  chan reflect.Value
	WriteChan reflect.Value

type FlowContext

type FlowContext struct {
	Id       int
	Steps    []*Step
	Datasets []*Dataset

func NewContext

func NewContext() (fc *FlowContext)

func (*FlowContext) AddAllToOneStep

func (f *FlowContext) AddAllToOneStep(input *Dataset, output *Dataset) (s *Step)

the task should run on the destination dataset shard

func (*FlowContext) AddEveryNToOneStep

func (f *FlowContext) AddEveryNToOneStep(input *Dataset, m int, output *Dataset) (s *Step)

func (*FlowContext) AddOneToAllStep

func (f *FlowContext) AddOneToAllStep(input *Dataset, output *Dataset) (s *Step)

the task should run on the source dataset shard input is nil for initial source dataset

func (*FlowContext) AddOneToEveryNStep

func (f *FlowContext) AddOneToEveryNStep(input *Dataset, n int, output *Dataset) (s *Step)

func (*FlowContext) AddOneToOneStep

func (f *FlowContext) AddOneToOneStep(input *Dataset, output *Dataset) (s *Step)

the tasks should run on the source dataset shard

func (*FlowContext) MergeDatasets1ShardTo1Step

func (f *FlowContext) MergeDatasets1ShardTo1Step(inputs []*Dataset, output *Dataset) (s *Step)

All dataset should have the same number of shards.

func (*FlowContext) Run

func (fc *FlowContext) Run()

func (*FlowContext) Source

func (fc *FlowContext) Source(f interface{}, shard int) (ret *Dataset)

f(chan A)

func (*FlowContext) TextFile

func (fc *FlowContext) TextFile(fname string, shard int) (ret *Dataset)

type Step

type Step struct {
	Id       int
	Inputs   []*Dataset
	Output   *Dataset
	Function func(*Task)
	Tasks    []*Task
	Type     StepType

func (*Step) Run

func (s *Step) Run()

type StepType

type StepType int
const (
	Local StepType = 1 + iota

type Task

type Task struct {
	Id      int
	Inputs  []*DatasetShard
	Outputs []*DatasetShard
	Step    *Step

func (*Task) InputChan

func (t *Task) InputChan() chan reflect.Value

func (*Task) Run

func (t *Task) Run()

type TaskRunner

type TaskRunner interface {
	Run(fc *FlowContext)
	IsTaskMode() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL