package module
Version: v0.0.0-...-391d0c8 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2021 License: Apache-2.0 Imports: 26 Imported by: 0


Go Mapreduce for Appengine

This is a feature-complete (though not performance or reliability complete) implementation of map/reduce for the go language on the appengine platform. It make heavy use of task queues to provide load scaling, and a sample application is available from

This is an BETA release of this package. While it is being used for production use it certainly still has shortcomings and bugs.



Package mapreduce provides a mapreduce pipeline for Google's appengine environment. For a sample application, see

Package mapreduce provides a mapreduce pipeline for Google's appengine environment



View Source
const (
	TaskStatusPending = TaskStatus("pending")
	TaskStatusRunning = TaskStatus("running")
	TaskStatusDone    = TaskStatus("done")
	TaskStatusFailed  = TaskStatus("failed")
View Source
const (
	StageFormation = JobStage("forming")
	StageMapping   = JobStage("map")
	StageReducing  = JobStage("reduce")
	StageDone      = JobStage("done")
	StageFailed    = JobStage("failed")
View Source
const (
	TaskTypeMap    = TaskType("map")
	TaskTypeReduce = TaskType("reduce")

TaskTypes defines the type of task, map or reduce

View Source
const JobEntity = "MapReduceJob"

Datastore entity kinds for jobs and tasks

View Source
const TaskEntity = "MapReduceTask"


This section is empty.


func ConsoleHandler

func ConsoleHandler(w http.ResponseWriter, r *http.Request)

func GetJobTaskResults

func GetJobTaskResults(ds appwrap.Datastore, job JobInfo) ([]interface{}, error)

func MapReduceHandler

func MapReduceHandler(baseUrl string, pipeline MapReducePipeline,
	getContext func(r *http.Request) context.Context) http.Handler

MapReduceHandler returns an http.Handler which is responsible for all of the urls pertaining to the mapreduce job. The baseUrl acts as the name for the type of job being run.

func ReduceFunc

func ReduceFunc(c context.Context, mr MapReducePipeline, writer SingleOutputWriter, shardNames []string,
	separateReduceItems bool, statusFunc StatusUpdateFunc, log appwrap.Logging) error

func RemoveJob

func RemoveJob(ds appwrap.Datastore, jobId int64) error


type FatalError

type FatalError struct{ Err error }

FatalError wraps an error. If Map or Reduce returns a FatalError the task will not be retried

func (FatalError) Error

func (fe FatalError) Error() string

type FileLineInputReader

type FileLineInputReader struct {
	Paths []string

func (FileLineInputReader) ReaderFromName

func (m FileLineInputReader) ReaderFromName(c context.Context, path string) (SingleInputReader, error)

func (FileLineInputReader) ReaderNames

func (m FileLineInputReader) ReaderNames() ([]string, error)

type IgnoreTaskStatusChange

type IgnoreTaskStatusChange struct{}

IgnoreTaskStatusChange is an implementation of TaskStatusChange which ignores the call

func (*IgnoreTaskStatusChange) Status

func (e *IgnoreTaskStatusChange) Status(jobId int64, task JobTask)

type InputReader

type InputReader interface {
	// ReaderNames() returns a list of reader instance names;
	ReaderNames() ([]string, error)

	// ReaderFromName() creates the SingleInputReader for the given name
	ReaderFromName(c context.Context, name string) (SingleInputReader, error)

InputReader is responsible for providing unique names for each of the input sources for a job, and creating individual SingleInputReader objects from those unique names. The number of unique names for the inputs determines the number of map tasks

type Int64KeyHandler

type Int64KeyHandler struct{}

Int64KeyHandler provides a KeyHandler for int64 keys. A hash is used for computing the shards to distribute evenly. We encode things are strings for readability.

func (Int64KeyHandler) Equal

func (s Int64KeyHandler) Equal(a, b interface{}) bool

func (Int64KeyHandler) KeyDump

func (s Int64KeyHandler) KeyDump(a interface{}) []byte

func (Int64KeyHandler) KeyLoad

func (s Int64KeyHandler) KeyLoad(a []byte) (interface{}, error)

func (Int64KeyHandler) Less

func (s Int64KeyHandler) Less(a, b interface{}) bool

func (Int64KeyHandler) SetShardParameters

func (s Int64KeyHandler) SetShardParameters(jsonParameters string)

func (Int64KeyHandler) Shard

func (s Int64KeyHandler) Shard(strInt interface{}, shardCount int) int

type Int64ValueHandler

type Int64ValueHandler struct{}

Int64ValueHandler provides a ValueHandler for int values

func (Int64ValueHandler) ValueDump

func (j Int64ValueHandler) ValueDump(a interface{}) ([]byte, error)

func (Int64ValueHandler) ValueLoad

func (j Int64ValueHandler) ValueLoad(val []byte) (interface{}, error)

type IntermediateStorage

type IntermediateStorage interface {
	CreateIntermediate(c context.Context, handler KeyValueHandler) (SingleIntermediateStorageWriter, error)
	Iterator(c context.Context, name string, handler KeyValueHandler) (IntermediateStorageIterator, error)
	RemoveIntermediate(c context.Context, name string) error

IntermediateStorage defines how intermediare results are saved and read. If keys need to be serialized KeyValueHandler.Load and KeyValueHandler.Save must be used.

type IntermediateStorageIterator

type IntermediateStorageIterator interface {
	// Returns mapped data item, a bool saying if it's valid, and an error if one occurred
	// probably cause use error = EOF instead, but we don't
	Next() (MappedData, bool, error)
	Close() error

func NewReaderIterator

func NewReaderIterator(reader io.ReadCloser, handler KeyValueHandler) IntermediateStorageIterator

type JobInfo

type JobInfo struct {
	UrlPrefix           string
	Stage               JobStage
	UpdatedAt           time.Time
	StartTime           time.Time
	TaskCount           int      `datastore:"TasksRunning,noindex"`
	FirstTaskId         int64    `datastore:",noindex"` // 0 here means to use task keys like "%d.%d" (Id, taskNum)
	RetryCount          int      `datastore:",noindex"`
	SeparateReduceItems bool     `datastore:",noindex"`
	OnCompleteUrl       string   `datastore:",noindex"`
	WriterNames         []string `datastore:",noindex"`
	JsonParameters      string   `datastore:",noindex"`

	// filled in by getJob
	Id int64 `datastore:"-"`

JobInfo is the entity stored in the datastore defining the MapReduce Job

func GetJob

func GetJob(ds appwrap.Datastore, jobId int64) (JobInfo, error)

type JobStage

type JobStage string

type JobTask

type JobTask struct {
	Status              TaskStatus `datastore:,noindex`
	Job                 *appwrap.DatastoreKey
	Done                *appwrap.DatastoreKey // nil if the task isn't done, job if it is
	Info                string                `datastore:,"noindex"`
	StartTime           time.Time             `datastore:,"noindex"`
	UpdatedAt           time.Time             `datastore:,"noindex"`
	Type                TaskType              `datastore:,"noindex"`
	Retries             int                   `datastore:,"noindex"`
	SeparateReduceItems bool
	// this is named intermediate storage sources, and only used for reduce tasks
	ReadFrom []byte `datastore:",noindex"`
	Url      string `datastore:",noindex"`
	Result   string `datastore:",noindex"`

JobTask is the entity stored in the datastore defining a single MapReduce task. They have JobInfo entities as their parents.

func GetJobTasks

func GetJobTasks(ds appwrap.Datastore, job JobInfo) ([]JobTask, error)

type KeyHandler

type KeyHandler interface {
	// Less returns a< b
	Less(a, b interface{}) bool

	// Equals returns a == b
	Equal(a, b interface{}) bool

	// KeyDump converts a key into a byte array
	KeyDump(a interface{}) []byte

	// KeyDump converts a byte array into a key
	KeyLoad([]byte) (interface{}, error)

	// Shard returns the shard number a key belongs to, given the total number of shards
	// which are being used for the job
	Shard(a interface{}, shardCount int) int

	// Provides the (probably json) parameters for the job; may be useful for sharding strategy
	SetShardParameters(jsonParameters string)

KeyHandler must be implemented for each key type to enable shuffling and storing of map keys

type KeyValueHandler

type KeyValueHandler interface {

type LineOutputWriter

type LineOutputWriter struct {
	// contains filtered or unexported fields

func NewLineOutputWriter

func NewLineOutputWriter(w io.WriteCloser, handler KeyValueHandler) LineOutputWriter

func (LineOutputWriter) Close

func (LineOutputWriter) Write

func (o LineOutputWriter) Write(data interface{}) error

func (LineOutputWriter) WriteMappedData

func (o LineOutputWriter) WriteMappedData(item MappedData) error

type MapReduceJob

type MapReduceJob struct {
	Inputs  InputReader
	Outputs OutputWriter

	// UrlPrefix is the base url path used for mapreduce jobs posted into
	// task queues, and must match the baseUrl passed into MapReduceHandler()
	UrlPrefix string

	// OnCompleteUrl is the url to post to when a job is completed. The full url will include
	// multiple query parameters, including status=(done|error) and id=(jobId). If
	// an error occurred the error parameter will also be displayed. If this is empty, no
	// complete notification is given; it is assumed the caller will poll for results.
	OnCompleteUrl string

	// RetryCount is the number of times individual map/reduce tasks should be retried. Tasks that
	// return errors which are of type FatalError are not retried (defaults to 3, 1
	// means it will never retry).
	RetryCount int

	// SeparateReduceItems means that instead of collapsing all rows with the same key into
	// one call to the reduce function, each row is passed individually (though wrapped in
	// an array of length one to keep the reduce function signature the same)
	SeparateReduceItems bool

	// JobParameters is passed to map and reduce job. They are assumed to be json encoded, though
	// absolutely no effort is made to enforce that.
	JobParameters string

MapReduceJob defines a complete map reduce job, which is the pipeline and the parameters the job needs. The types for Inputs and Outputs must match the types for the InputReader and OutputWriter in the pipeline.

type MapReducePipeline

type MapReducePipeline interface {
	// The basic pipeline of read, map, shuffle, reduce, save

	// Serialization and sorting primatives for keys and values


MapReducePipeline defines the complete pipeline for a map reduce job (but not the job itself). No per-job information is available for the pipeline functions other than what gets passed in via the various interfaces.

type MappedData

type MappedData struct {
	Key   interface{}
	Value interface{}

MappedData items are key/value pairs returned from the Map stage. The items are rearranged by the shuffle, and (Key, []Value) pairs are passed into the shuffle. KeyHandler interfaces provide the operations on MappedData items which are needed by the pipeline, and ValueHandler interfaces provide serialization operatons for the values.

type Mapper

type Mapper interface {
	Map(item interface{}, statusUpdate StatusUpdateFunc) ([]MappedData, error)

	// Called once with the job parameters for each mapper task
	SetMapParameters(jsonParameters string)

	// Called when the map is complete. Return is same as for Map()
	// to the output writer
	MapComplete(statusUpdate StatusUpdateFunc) ([]MappedData, error)

Mapper defines a map function; it is passed an item from the input and returns a list of mapped items.

type NilOutputWriter

type NilOutputWriter struct {
	Count int

NilOutputWriter collects output and throws it away. Useful for reduce tasks which only have side affects

func (NilOutputWriter) WriterFromName

func (n NilOutputWriter) WriterFromName(c context.Context, name string) (SingleOutputWriter, error)

func (NilOutputWriter) WriterNames

func (n NilOutputWriter) WriterNames(c context.Context) ([]string, error)

type OutputWriter

type OutputWriter interface {
	WriterNames(c context.Context) ([]string, error)
	WriterFromName(c context.Context, name string) (SingleOutputWriter, error)

type ReaderIterator

type ReaderIterator struct {
	// contains filtered or unexported fields

func (*ReaderIterator) Close

func (r *ReaderIterator) Close() error

func (*ReaderIterator) Next

func (r *ReaderIterator) Next() (MappedData, bool, error)

type Reducer

type Reducer interface {
	Reduce(key interface{}, values []interface{}, statusUpdate StatusUpdateFunc) (result interface{}, err error)

	// Called once with the job parameters for each mapper task
	SetReduceParameters(jsonParameters string)

	// Called when the reduce is complete. Each item in the results array will be passed separately
	// to the output writer
	ReduceComplete(statusUpdate StatusUpdateFunc) ([]interface{}, error)

Reducer defines the reduce function; it is called once for each key and is given a list of all of the values for that key.

type SingleInputReader

type SingleInputReader interface {
	Next() (interface{}, error)
	Close() error

func NewSingleLineInputReader

func NewSingleLineInputReader(r io.ReadCloser) SingleInputReader

type SingleIntermediateStorageWriter

type SingleIntermediateStorageWriter interface {
	WriteMappedData(data MappedData) error
	Close(c context.Context) error
	ToName() string

this overlaps a great deal with SingleOutputWriter; they often share an implementation

type SingleLineReader

type SingleLineReader struct {
	// contains filtered or unexported fields

func (SingleLineReader) Close

func (ir SingleLineReader) Close() (err error)

func (SingleLineReader) Next

func (ir SingleLineReader) Next() (interface{}, error)

type SingleOutputWriter

type SingleOutputWriter interface {
	Write(data interface{}) error
	Close(c context.Context) error
	ToName() string

type StatusUpdateFunc

type StatusUpdateFunc func(format string, paramList ...interface{})

StatusUpdateFunc functions are passed into Map and Reduce handlers to allow those handlers to post arbitrary status messages which are stored in the datastore

type StringKeyHandler

type StringKeyHandler struct{}

StringKeyHandler provides a KeyHandler for string keys

func (StringKeyHandler) Equal

func (s StringKeyHandler) Equal(a, b interface{}) bool

func (StringKeyHandler) KeyDump

func (s StringKeyHandler) KeyDump(a interface{}) []byte

func (StringKeyHandler) KeyLoad

func (s StringKeyHandler) KeyLoad(a []byte) (interface{}, error)

func (StringKeyHandler) Less

func (s StringKeyHandler) Less(a, b interface{}) bool

func (StringKeyHandler) SetShardParameters

func (s StringKeyHandler) SetShardParameters(jsonParameters string)

func (StringKeyHandler) Shard

func (s StringKeyHandler) Shard(strInt interface{}, shardCount int) int

type StringValueHandler

type StringValueHandler struct{}

StringValueHandler provides a ValueHandler for string values

func (StringValueHandler) ValueDump

func (j StringValueHandler) ValueDump(a interface{}) ([]byte, error)

func (StringValueHandler) ValueLoad

func (j StringValueHandler) ValueLoad(val []byte) (interface{}, error)

type TaskInterface

type TaskInterface interface {
	PostTask(c context.Context, fullUrl string, jsonParameters string, log appwrap.Logging) error
	PostStatus(c context.Context, fullUrl string, log appwrap.Logging) error

TaskInterface defines how the map and reduce tasks and controlled, and how they report their status.

type TaskStatus

type TaskStatus string

type TaskStatusChange

type TaskStatusChange interface {
	Status(jobId int64, task JobTask)

TaskStatusChange allows the map reduce framework to notify tasks when their status has changed to RUNNING or DONE. Handy for callbacks. Always called after SetMapParameters() and SetReduceParameters()

type TaskType

type TaskType string

type ValueHandler

type ValueHandler interface {
	ValueDump(a interface{}) ([]byte, error)
	ValueLoad([]byte) (interface{}, error)

ValueHandler converts values from a map step to []byte and back again

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL