mr

package
Version: v0.0.0-...-731618e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2016 License: BSD-2-Clause Imports: 11 Imported by: 27

Documentation

Overview

Package mr provides a local concurrent computing model(MapReduce) using Sophie serialization.

A simple word count example is like this:

job := MrJob {
	Source: []Input{...},

	NewMapperF: func(src, part int) Mapper {
		return &MapperStruct {
			NewKeyF: sophie.NewString,
			NewValF: sophie.ReturnNULL,
			MapperF: func(key, val sophie.SophieWriter, c PartCollector) error {
				line := key.(*RawString).String()
				words := strings.Split(line, " ")
				for _, word: range words {
					c.CollectTo(0, sophie.RawString(word), sophie.VInt(1))
				}
			},
		}
	},

	NewReducerF: func(part int) Reducer {
		return &ReducerStruct {
			NewKeyF: sophie.NewRawString,
			NewValF: sophie.NewVInt,
			ReducerF: func((key sophie.SophieWriter, nextVal SophierIterator,
				c []sophie.Collector) error {
				var count sophie.VInt
				for {
					val, err := nextVal()
					if err == io.EOF {
						break
					}
					if err != nil {
						return err
					}
					count += val.(*sophie.VInt).Val()
				}
				return c[0].Collect(key, count)
			},
		}
	},

	Dest: []Output{...},
}

if err := job.Run(); err != nil {
	log.Fatalf("job.Run failed: %v", err)
}

One can also use MapOnlyJob for simple jobs.

Index

Constants

This section is empty.

Variables

View Source
var (
	// end of map, an error returned by a Mapper/OnlyMapper.Map indicating a
	// stop of continuing mapping
	EOM = errors.New("EOM")
)
View Source
var NullOutput = &OutputStruct{}

A helper variable with an Output returning the NullCollectCloser

Functions

This section is empty.

Types

type FileSorter

type FileSorter struct {
	sync.RWMutex
	TmpFolder sophie.FsPath
	// contains filtered or unexported fields
}

FileSorter is a Sorter that stores mapped kv pairs in a TmpFolder and will read to memory, sort and reduce.

func NewFileSorter

func NewFileSorter(TmpFolder sophie.FsPath) *FileSorter

func (*FileSorter) ClosePartCollectors

func (fs *FileSorter) ClosePartCollectors() (err error)

Sorted interface

func (*FileSorter) CollectTo

func (fs *FileSorter) CollectTo(part int, key, val sophie.SophieWriter) error

PartCollector interface

func (*FileSorter) NewPartCollector

func (fs *FileSorter) NewPartCollector(int) (PartCollector, error)

Sorted interface

func (*FileSorter) NewReduceIterator

func (fs *FileSorter) NewReduceIterator(part int) (ReduceIterator, error)

Sorted interface

func (*FileSorter) ReduceParts

func (fs *FileSorter) ReduceParts() []int

Sorted interface

type Input

type Input interface {
	// PartCount returns the number partitions.
	PartCount() (int, error)
	// index range [0, PartCount())
	Iterator(index int) (sophie.IterateCloser, error)
}

Input represents a specified input source for a mr job.

type InputStruct

type InputStruct struct {
	PartCountF func() (int, error)
	IteratorF  func(index int) (sophie.IterateCloser, error)
}

A struct implementing the Input interface by funcs.

func (*InputStruct) Iterator

func (is *InputStruct) Iterator(index int) (sophie.IterateCloser, error)

Input interface

func (*InputStruct) PartCount

func (is *InputStruct) PartCount() (int, error)

Input interface

type MapOnlyJob

type MapOnlyJob struct {
	// The slice of Inputs
	Source []Input

	// The factory for OnlyMappers
	NewMapperF func(src, part int) OnlyMapper

	// The slice of Outputs
	Dest []Output
}

MapOnlyJob is a job with a mapping step only.

func (*MapOnlyJob) Run

func (job *MapOnlyJob) Run() error

Runs the job. If some of the mapper failed, one of the error is returned.

type Mapper

type Mapper interface {
	// NewKey returns a new instance of the key Sophier object.
	NewKey() sophie.Sophier
	// NewVal returns a new instance of the value Sophier object.
	NewVal() sophie.Sophier
	// Map converts the input kv pair to what the Reducer expect and send to
	// the PartCollector.
	Map(key, val sophie.SophieWriter, c PartCollector) error
	// MapEnd is invoked after a partition of the Input is mapped.
	MapEnd(c PartCollector) error
}

The mapping stage in MrJob.

type MapperStruct

type MapperStruct struct {
	// Func for Mapper.NewKey
	NewKeyF func() sophie.Sophier
	// Func for Mapper.NewVal
	NewValF func() sophie.Sophier
	// Func for Mapper.Map
	MapF func(key, val sophie.SophieWriter, c PartCollector) error
	// Func for Mapper.MapEnd
	MapEndF func(c PartCollector) error
}

A MapperStruct implementing Mapper by funcs.

func (*MapperStruct) Map

func (ms *MapperStruct) Map(key, val sophie.SophieWriter,
	c PartCollector) error

Mapper interface

func (*MapperStruct) MapEnd

func (ms *MapperStruct) MapEnd(c PartCollector) error

Mapper interface

func (*MapperStruct) NewKey

func (ms *MapperStruct) NewKey() sophie.Sophier

Mapper interface

func (*MapperStruct) NewVal

func (ms *MapperStruct) NewVal() sophie.Sophier

Mapper interface

type MemSorters

type MemSorters struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MemSorters is a Sorter that stores all kv pairs in memory.

func NewMemSorters

func NewMemSorters() *MemSorters

NewMemSorters creates a new *MemSorters.

func (*MemSorters) ClosePartCollectors

func (*MemSorters) ClosePartCollectors() error

Sorter interface

func (*MemSorters) CollectTo

func (ms *MemSorters) CollectTo(part int, key, val sophie.SophieWriter) error

PartCollector interface

func (*MemSorters) NewPartCollector

func (ms *MemSorters) NewPartCollector(int) (PartCollector, error)

Sorter interface

func (*MemSorters) NewReduceIterator

func (ms *MemSorters) NewReduceIterator(part int) (ReduceIterator, error)

Sorter interface

func (*MemSorters) ReduceParts

func (ms *MemSorters) ReduceParts() []int

Sorter interface

type MrJob

type MrJob struct {
	// The factory for Mappers
	NewMapperF func(src, part int) Mapper
	// The factory for Reducers
	NewReducerF func(part int) Reducer

	// The Sorter that sorts kv pairs mapped by Mappers and provides
	// SophierIterator for Reducers.
	Sorter Sorter

	// The source Inputs
	Source []Input
	// The destination Outputs
	Dest []Output
}

An MrJob contains a mapping step and a reducing step. In reducing step, kv pairs are sorted by keys, and values of a key are reduced using the Reducer.

func (*MrJob) Run

func (job *MrJob) Run() error

Runs the MrJob. If Sorter is not specified, MemSorters is used.

type OnlyMapper

type OnlyMapper interface {
	// NewKey returns a new instance of key for reading from Source
	NewKey() sophie.Sophier
	// NewVal returns a new instance of value for reading from Source
	NewVal() sophie.Sophier
	// Make a map action for a key/val pair, collecting results to c.
	// NOTE the key-value pairs will be reused on next call to Map, so don't
	// make a deep copy if you want to save the contents.
	// If sophie.EOM is returned the mapping is stopped (as sucess).
	// If other non-nil error is returned, the job is aborted as failure.
	// @param c  the slice of Collectors. Same length as Source.
	Map(key, val sophie.SophieWriter, c []sophie.Collector) error
	// Make a map action at final stage, collecting results to c
	// @param c  the slice of Collectors. Same length as Source.
	MapEnd(c []sophie.Collector) error
}

OnlyMapper is an interface defining the map actions for MapOnlyJob

type OnlyMapperStruct

type OnlyMapperStruct struct {
	NewKeyF func() sophie.Sophier
	NewValF func() sophie.Sophier
	MapF    func(key, val sophie.SophieWriter, c []sophie.Collector) error
	MapEndF func(c []sophie.Collector) error
}

a struct implementing OnlyMapper with funcs.

func (*OnlyMapperStruct) Map

func (oms *OnlyMapperStruct) Map(key, val sophie.SophieWriter,
	c []sophie.Collector) error

OnlyMapper interface

func (*OnlyMapperStruct) MapEnd

func (oms *OnlyMapperStruct) MapEnd(c []sophie.Collector) error

OnlyMapper interface

func (*OnlyMapperStruct) NewKey

func (oms *OnlyMapperStruct) NewKey() sophie.Sophier

OnlyMapper interface

func (*OnlyMapperStruct) NewVal

func (oms *OnlyMapperStruct) NewVal() sophie.Sophier

OnlyMapper interface

type Output

type Output interface {
	// Collector generates a sophie.CollectCloser for collecting kv pairs.
	// index is an interger indicating the index to some partition.
	Collector(index int) (sophie.CollectCloser, error)
}

Output represents a specified output destination for a mr job.

type OutputStruct

type OutputStruct struct {
	CollectorF func(int) (sophie.CollectCloser, error)
}

OutputStruct is a struct whose pointer implements Output interface.

func (*OutputStruct) Collector

func (o *OutputStruct) Collector(i int) (sophie.CollectCloser, error)

type PartCollector

type PartCollector interface {
	CollectTo(part int, key, val sophie.SophieWriter) error
}

A collector that collects kv pairs to a specified part.

type ReduceIterator

type ReduceIterator interface {
	// Iterate calls Reducer.Reduce for each key.
	Iterate(c []sophie.Collector, r Reducer) error
}

ReduceIterator is an object for Sort to call Reducer.

type Reducer

type Reducer interface {
	// NewKey returns a new instance of the key Sophier object for reducing.
	NewKey() sophie.Sophier
	// NewVal returns a new instance of the value Sophier object for reducing.
	NewVal() sophie.Sophier
	// to get all values:
	//   for {
	//	 	val, err := nextVal()
	//   	if errorsp.Cause(err) == io.EOF {
	//   		break;
	//   	}
	//      if err != nil {
	//   		return err;
	//   	}
	//      ...
	//   }
	Reduce(key sophie.SophieWriter, nextVal SophierIterator,
		c []sophie.Collector) error
	// MapEnd is invoked after a partition of the reducing kv pairs is reduced.
	ReduceEnd(c []sophie.Collector) error
}

The reducing stage in MrJob.

type ReducerStruct

type ReducerStruct struct {
	NewKeyF func() sophie.Sophier
	NewValF func() sophie.Sophier
	ReduceF func(key sophie.SophieWriter, nextVal SophierIterator,
		c []sophie.Collector) error
	ReduceEndF func(c []sophie.Collector) error
}

A struct implementing Reducer interface by funcs.

func (*ReducerStruct) NewKey

func (rs *ReducerStruct) NewKey() sophie.Sophier

Reducer interface

func (*ReducerStruct) NewVal

func (rs *ReducerStruct) NewVal() sophie.Sophier

Reducer interface

func (*ReducerStruct) Reduce

func (rs *ReducerStruct) Reduce(key sophie.SophieWriter,
	nextVal SophierIterator, c []sophie.Collector) error

Reducer interface

func (*ReducerStruct) ReduceEnd

func (rs *ReducerStruct) ReduceEnd(c []sophie.Collector) error

Reducer interface

type SophierIterator

type SophierIterator func() (sophie.Sophier, error)

An interator for fetching a list of Sophiers. If io.EOF is returned as the error, no further Sophiers are avaiable. Typical usage:

var SophierIterator next
for {
    vl, err := next()
    if err == io.EOF {
        break
    }
    if err != nil {
        return err
    }
    ... consume vl ...
}

type Sorter

type Sorter interface {
	// NewPartCollector returns a PartCollector for receiving kv pairs from
	// Mappers.
	NewPartCollector(inPart int) (PartCollector, error)
	// ClosePartCollectors closes all PartCollectors opened. This should be
	// called when all kv pairs have been collected.
	ClosePartCollectors() error
	// Returns a slice of integers of all the partition indexes.
	ReduceParts() []int
	// NewReduceIterator creates and returns a ReduceIterator for a partition.
	NewReduceIterator(part int) (ReduceIterator, error)
}

A Sorter is responsible for receiving all kv pairs from Mappers, sort them and send to Reducers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL