simplemr

package
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2021 License: BSD-3-Clause Imports: 6 Imported by: 4

Documentation

Overview

Package simplemr provides a simple map reduce framework for use by commandline and other tools and consequently can only be used from within a single process. It is specifically not intended to support large datasets, but mappers are run concurrently so that long running tasks (e.g. external shell commands will be run in parallel). The current implementation supoorts only a single reducer however future implementations are likely to run multiple reducers and hence reducers should be coded accordingly.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrMRCancelled = errors.New("MR cancelled")

Functions

This section is empty.

Types

type Identity

type Identity struct{}

func (*Identity) Map

func (i *Identity) Map(mr *MR, key string, val interface{}) error

func (*Identity) Reduce

func (i *Identity) Reduce(mr *MR, key string, values []interface{}) error

type MR

type MR struct {

	// The number of conccurent mappers to use. A value of 0 instructs
	// the implementation to use an appropriate number, such as the number
	// of available CPUs.
	NumMappers int
	// The time to wait for the map reduce to complete. A value of 0 implies
	// no timeout - i.e. an infinite wait.
	Timeout time.Duration
	// contains filtered or unexported fields
}

MR represents the Map Reduction.

Example
package main

import (
	"fmt"

	"v.io/x/lib/simplemr"
)

func main() {
	in, out := make(chan *simplemr.Record, 2), make(chan *simplemr.Record, 2)
	mr := &simplemr.MR{}
	identity := &simplemr.Identity{}
	go mr.Run(in, out, identity, identity)
	in <- &simplemr.Record{"1", []interface{}{"hello\n"}}
	in <- &simplemr.Record{"2", []interface{}{"world\n"}}
	close(in)
	k := <-out
	fmt.Printf("%s: %s", k.Key, k.Values[0].(string))
	k = <-out
	fmt.Printf("%s: %s", k.Key, k.Values[0].(string))
	if err := mr.Error(); err != nil {
		fmt.Printf("mr failed: %v", err)
	}
}
Output:

1: hello
2: world

func (*MR) Cancel

func (mr *MR) Cancel()

Cancel closes the channel intended to be used for monitoring cancellation requests. If Cancel is called before any reducers have been run then no reducers will be run. It can only be called after mr.Run has been called, generally by a mapper or a reducer.

func (*MR) CancelCh

func (mr *MR) CancelCh() <-chan struct{}

CancelCh returns a channel that will be closed when the Cancel method is called. It should only be called by a mapper or reducer.

func (*MR) Error

func (mr *MR) Error() error

Error returns any error that was returned by the Run method. It is safe to read its value once the output channel passed to Run has been closed.

func (*MR) IsCancelled

func (mr *MR) IsCancelled() bool

IsCancelled returns true if this MR has been cancelled.

func (*MR) MapOut

func (mr *MR) MapOut(key string, values ...interface{})

MapOut outputs the key and associated values for subsequent processing by a Reducer. It should only be called from a mapper.

func (*MR) ReduceOut

func (mr *MR) ReduceOut(key string, values ...interface{})

ReduceOut outputs the key and associated values to the specified output stream. It should only be called from a reducer.

func (*MR) Run

func (mr *MR) Run(input <-chan *Record, output chan<- *Record, mapper Mapper, reducer Reducer) error

Run runs the map reduction using the supplied mapper and reducer reading from input and writing to output. The caller must close the input channel when there is no more input data. The implementation of Run will close the output channel when the Reducer has processed all intermediate data. Run may only be called once per MR receiver.

type Mapper

type Mapper interface {
	// Map is called by the framework for every key, value pair read
	// from the specified input.
	Map(mr *MR, key string, value interface{}) error
}

Mapper is in the interface that must be implemented by all mappers.

type Record

type Record struct {
	Key    string
	Values []interface{}
}

Record represents all input and output data.

type Reducer

type Reducer interface {
	// Reduce is called by the framework for every key and associated
	// values that are emitted by the Mappers.
	Reduce(mr *MR, key string, values []interface{}) error
}

Reducer is the interface that must be implemented by the reducer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL