Documentation ¶
Overview ¶
Package simplemr provides a simple map reduce framework for use by commandline and other tools and consequently can only be used from within a single process. It is specifically not intended to support large datasets, but mappers are run concurrently so that long running tasks (e.g. external shell commands will be run in parallel). The current implementation supoorts only a single reducer however future implementations are likely to run multiple reducers and hence reducers should be coded accordingly.
Index ¶
- Variables
- type Identity
- type MR
- func (mr *MR) Cancel()
- func (mr *MR) CancelCh() <-chan struct{}
- func (mr *MR) Error() error
- func (mr *MR) IsCancelled() bool
- func (mr *MR) MapOut(key string, values ...interface{})
- func (mr *MR) ReduceOut(key string, values ...interface{})
- func (mr *MR) Run(input <-chan *Record, output chan<- *Record, mapper Mapper, reducer Reducer) error
- type Mapper
- type Record
- type Reducer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrMRCancelled = errors.New("MR cancelled")
Functions ¶
This section is empty.
Types ¶
type MR ¶
type MR struct { // The number of conccurent mappers to use. A value of 0 instructs // the implementation to use an appropriate number, such as the number // of available CPUs. NumMappers int // The time to wait for the map reduce to complete. A value of 0 implies // no timeout - i.e. an infinite wait. Timeout time.Duration // contains filtered or unexported fields }
MR represents the Map Reduction.
Example ¶
package main import ( "fmt" "v.io/x/lib/simplemr" ) func main() { in, out := make(chan *simplemr.Record, 2), make(chan *simplemr.Record, 2) mr := &simplemr.MR{} identity := &simplemr.Identity{} go mr.Run(in, out, identity, identity) in <- &simplemr.Record{"1", []interface{}{"hello\n"}} in <- &simplemr.Record{"2", []interface{}{"world\n"}} close(in) k := <-out fmt.Printf("%s: %s", k.Key, k.Values[0].(string)) k = <-out fmt.Printf("%s: %s", k.Key, k.Values[0].(string)) if err := mr.Error(); err != nil { fmt.Printf("mr failed: %v", err) } }
Output: 1: hello 2: world
func (*MR) Cancel ¶
func (mr *MR) Cancel()
Cancel closes the channel intended to be used for monitoring cancellation requests. If Cancel is called before any reducers have been run then no reducers will be run. It can only be called after mr.Run has been called, generally by a mapper or a reducer.
func (*MR) CancelCh ¶
func (mr *MR) CancelCh() <-chan struct{}
CancelCh returns a channel that will be closed when the Cancel method is called. It should only be called by a mapper or reducer.
func (*MR) Error ¶
Error returns any error that was returned by the Run method. It is safe to read its value once the output channel passed to Run has been closed.
func (*MR) IsCancelled ¶
IsCancelled returns true if this MR has been cancelled.
func (*MR) MapOut ¶
MapOut outputs the key and associated values for subsequent processing by a Reducer. It should only be called from a mapper.
func (*MR) ReduceOut ¶
ReduceOut outputs the key and associated values to the specified output stream. It should only be called from a reducer.
func (*MR) Run ¶
func (mr *MR) Run(input <-chan *Record, output chan<- *Record, mapper Mapper, reducer Reducer) error
Run runs the map reduction using the supplied mapper and reducer reading from input and writing to output. The caller must close the input channel when there is no more input data. The implementation of Run will close the output channel when the Reducer has processed all intermediate data. Run may only be called once per MR receiver.
type Mapper ¶
type Mapper interface { // Map is called by the framework for every key, value pair read // from the specified input. Map(mr *MR, key string, value interface{}) error }
Mapper is in the interface that must be implemented by all mappers.