Documentation ¶
Overview ¶
Package mr provides a local concurrent computing model(MapReduce) using Sophie serialization.
A simple word count example is like this:
job := MrJob { Source: []Input{...}, NewMapperF: func(src, part int) Mapper { return &MapperStruct { NewKeyF: sophie.NewString, NewValF: sophie.ReturnNULL, MapperF: func(key, val sophie.SophieWriter, c PartCollector) error { line := key.(*RawString).String() words := strings.Split(line, " ") for _, word: range words { c.CollectTo(0, sophie.RawString(word), sophie.VInt(1)) } }, } }, NewReducerF: func(part int) Reducer { return &ReducerStruct { NewKeyF: sophie.NewRawString, NewValF: sophie.NewVInt, ReducerF: func((key sophie.SophieWriter, nextVal SophierIterator, c []sophie.Collector) error { var count sophie.VInt for { val, err := nextVal() if err == io.EOF { break } if err != nil { return err } count += val.(*sophie.VInt).Val() } return c[0].Collect(key, count) }, } }, Dest: []Output{...}, } if err := job.Run(); err != nil { log.Fatalf("job.Run failed: %v", err) }
One can also use MapOnlyJob for simple jobs.
Index ¶
- Variables
- type FileSorter
- func (fs *FileSorter) ClosePartCollectors() (err error)
- func (fs *FileSorter) CollectTo(part int, key, val sophie.SophieWriter) error
- func (fs *FileSorter) NewPartCollector(int) (PartCollector, error)
- func (fs *FileSorter) NewReduceIterator(part int) (ReduceIterator, error)
- func (fs *FileSorter) ReduceParts() []int
- type Input
- type InputStruct
- type MapOnlyJob
- type Mapper
- type MapperStruct
- type MemSorters
- func (*MemSorters) ClosePartCollectors() error
- func (ms *MemSorters) CollectTo(part int, key, val sophie.SophieWriter) error
- func (ms *MemSorters) NewPartCollector(int) (PartCollector, error)
- func (ms *MemSorters) NewReduceIterator(part int) (ReduceIterator, error)
- func (ms *MemSorters) ReduceParts() []int
- type MrJob
- type OnlyMapper
- type OnlyMapperStruct
- type Output
- type OutputStruct
- type PartCollector
- type ReduceIterator
- type Reducer
- type ReducerStruct
- type SophierIterator
- type Sorter
Constants ¶
This section is empty.
Variables ¶
var ( // end of map, an error returned by a Mapper/OnlyMapper.Map indicating a // stop of continuing mapping EOM = errors.New("EOM") )
var NullOutput = &OutputStruct{}
A helper variable with an Output returning the NullCollectCloser
Functions ¶
This section is empty.
Types ¶
type FileSorter ¶
type FileSorter struct { sync.RWMutex TmpFolder sophie.FsPath // contains filtered or unexported fields }
FileSorter is a Sorter that stores mapped kv pairs in a TmpFolder and will read to memory, sort and reduce.
func NewFileSorter ¶
func NewFileSorter(TmpFolder sophie.FsPath) *FileSorter
func (*FileSorter) ClosePartCollectors ¶
func (fs *FileSorter) ClosePartCollectors() (err error)
Sorted interface
func (*FileSorter) CollectTo ¶
func (fs *FileSorter) CollectTo(part int, key, val sophie.SophieWriter) error
PartCollector interface
func (*FileSorter) NewPartCollector ¶
func (fs *FileSorter) NewPartCollector(int) (PartCollector, error)
Sorted interface
func (*FileSorter) NewReduceIterator ¶
func (fs *FileSorter) NewReduceIterator(part int) (ReduceIterator, error)
Sorted interface
type Input ¶
type Input interface { // PartCount returns the number partitions. PartCount() (int, error) // index range [0, PartCount()) Iterator(index int) (sophie.IterateCloser, error) }
Input represents a specified input source for a mr job.
type InputStruct ¶
type InputStruct struct { PartCountF func() (int, error) IteratorF func(index int) (sophie.IterateCloser, error) }
A struct implementing the Input interface by funcs.
func (*InputStruct) Iterator ¶
func (is *InputStruct) Iterator(index int) (sophie.IterateCloser, error)
Input interface
type MapOnlyJob ¶
type MapOnlyJob struct { // The slice of Inputs Source []Input // The factory for OnlyMappers NewMapperF func(src, part int) OnlyMapper // The slice of Outputs Dest []Output }
MapOnlyJob is a job with a mapping step only.
func (*MapOnlyJob) Run ¶
func (job *MapOnlyJob) Run() error
Runs the job. If some of the mapper failed, one of the error is returned.
type Mapper ¶
type Mapper interface { // NewKey returns a new instance of the key Sophier object. NewKey() sophie.Sophier // NewVal returns a new instance of the value Sophier object. NewVal() sophie.Sophier // Map converts the input kv pair to what the Reducer expect and send to // the PartCollector. Map(key, val sophie.SophieWriter, c PartCollector) error // MapEnd is invoked after a partition of the Input is mapped. MapEnd(c PartCollector) error }
The mapping stage in MrJob.
type MapperStruct ¶
type MapperStruct struct { // Func for Mapper.NewKey NewKeyF func() sophie.Sophier // Func for Mapper.NewVal NewValF func() sophie.Sophier // Func for Mapper.Map MapF func(key, val sophie.SophieWriter, c PartCollector) error // Func for Mapper.MapEnd MapEndF func(c PartCollector) error }
A MapperStruct implementing Mapper by funcs.
func (*MapperStruct) Map ¶
func (ms *MapperStruct) Map(key, val sophie.SophieWriter, c PartCollector) error
Mapper interface
func (*MapperStruct) MapEnd ¶
func (ms *MapperStruct) MapEnd(c PartCollector) error
Mapper interface
type MemSorters ¶
MemSorters is a Sorter that stores all kv pairs in memory.
func (*MemSorters) ClosePartCollectors ¶
func (*MemSorters) ClosePartCollectors() error
Sorter interface
func (*MemSorters) CollectTo ¶
func (ms *MemSorters) CollectTo(part int, key, val sophie.SophieWriter) error
PartCollector interface
func (*MemSorters) NewPartCollector ¶
func (ms *MemSorters) NewPartCollector(int) (PartCollector, error)
Sorter interface
func (*MemSorters) NewReduceIterator ¶
func (ms *MemSorters) NewReduceIterator(part int) (ReduceIterator, error)
Sorter interface
type MrJob ¶
type MrJob struct { // The factory for Mappers NewMapperF func(src, part int) Mapper // The factory for Reducers NewReducerF func(part int) Reducer // The Sorter that sorts kv pairs mapped by Mappers and provides // SophierIterator for Reducers. Sorter Sorter // The source Inputs Source []Input // The destination Outputs Dest []Output }
An MrJob contains a mapping step and a reducing step. In reducing step, kv pairs are sorted by keys, and values of a key are reduced using the Reducer.
type OnlyMapper ¶
type OnlyMapper interface { // NewKey returns a new instance of key for reading from Source NewKey() sophie.Sophier // NewVal returns a new instance of value for reading from Source NewVal() sophie.Sophier // Make a map action for a key/val pair, collecting results to c. // NOTE the key-value pairs will be reused on next call to Map, so don't // make a deep copy if you want to save the contents. // If sophie.EOM is returned the mapping is stopped (as sucess). // If other non-nil error is returned, the job is aborted as failure. // @param c the slice of Collectors. Same length as Source. Map(key, val sophie.SophieWriter, c []sophie.Collector) error // Make a map action at final stage, collecting results to c // @param c the slice of Collectors. Same length as Source. MapEnd(c []sophie.Collector) error }
OnlyMapper is an interface defining the map actions for MapOnlyJob
type OnlyMapperStruct ¶
type OnlyMapperStruct struct { NewKeyF func() sophie.Sophier NewValF func() sophie.Sophier MapF func(key, val sophie.SophieWriter, c []sophie.Collector) error MapEndF func(c []sophie.Collector) error }
a struct implementing OnlyMapper with funcs.
func (*OnlyMapperStruct) Map ¶
func (oms *OnlyMapperStruct) Map(key, val sophie.SophieWriter, c []sophie.Collector) error
OnlyMapper interface
func (*OnlyMapperStruct) MapEnd ¶
func (oms *OnlyMapperStruct) MapEnd(c []sophie.Collector) error
OnlyMapper interface
func (*OnlyMapperStruct) NewKey ¶
func (oms *OnlyMapperStruct) NewKey() sophie.Sophier
OnlyMapper interface
func (*OnlyMapperStruct) NewVal ¶
func (oms *OnlyMapperStruct) NewVal() sophie.Sophier
OnlyMapper interface
type Output ¶
type Output interface { // Collector generates a sophie.CollectCloser for collecting kv pairs. // index is an interger indicating the index to some partition. Collector(index int) (sophie.CollectCloser, error) }
Output represents a specified output destination for a mr job.
type OutputStruct ¶
type OutputStruct struct {
CollectorF func(int) (sophie.CollectCloser, error)
}
OutputStruct is a struct whose pointer implements Output interface.
func (*OutputStruct) Collector ¶
func (o *OutputStruct) Collector(i int) (sophie.CollectCloser, error)
type PartCollector ¶
type PartCollector interface {
CollectTo(part int, key, val sophie.SophieWriter) error
}
A collector that collects kv pairs to a specified part.
type ReduceIterator ¶
type ReduceIterator interface { // Iterate calls Reducer.Reduce for each key. Iterate(c []sophie.Collector, r Reducer) error }
ReduceIterator is an object for Sort to call Reducer.
type Reducer ¶
type Reducer interface { // NewKey returns a new instance of the key Sophier object for reducing. NewKey() sophie.Sophier // NewVal returns a new instance of the value Sophier object for reducing. NewVal() sophie.Sophier // to get all values: // for { // val, err := nextVal() // if errorsp.Cause(err) == io.EOF { // break; // } // if err != nil { // return err; // } // ... // } Reduce(key sophie.SophieWriter, nextVal SophierIterator, c []sophie.Collector) error // MapEnd is invoked after a partition of the reducing kv pairs is reduced. ReduceEnd(c []sophie.Collector) error }
The reducing stage in MrJob.
type ReducerStruct ¶
type ReducerStruct struct { NewKeyF func() sophie.Sophier NewValF func() sophie.Sophier ReduceF func(key sophie.SophieWriter, nextVal SophierIterator, c []sophie.Collector) error ReduceEndF func(c []sophie.Collector) error }
A struct implementing Reducer interface by funcs.
func (*ReducerStruct) Reduce ¶
func (rs *ReducerStruct) Reduce(key sophie.SophieWriter, nextVal SophierIterator, c []sophie.Collector) error
Reducer interface
type SophierIterator ¶
An interator for fetching a list of Sophiers. If io.EOF is returned as the error, no further Sophiers are avaiable. Typical usage:
var SophierIterator next for { vl, err := next() if err == io.EOF { break } if err != nil { return err } ... consume vl ... }
type Sorter ¶
type Sorter interface { // NewPartCollector returns a PartCollector for receiving kv pairs from // Mappers. NewPartCollector(inPart int) (PartCollector, error) // ClosePartCollectors closes all PartCollectors opened. This should be // called when all kv pairs have been collected. ClosePartCollectors() error // Returns a slice of integers of all the partition indexes. ReduceParts() []int // NewReduceIterator creates and returns a ReduceIterator for a partition. NewReduceIterator(part int) (ReduceIterator, error) }
A Sorter is responsible for receiving all kv pairs from Mappers, sort them and send to Reducers.