emr

package
v0.0.0-...-4f40efb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2015 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Overview

code for elastic mapreduce (streaming).

Index

Constants

View Source
const TICKER = 60 * time.Second
View Source
const VARS_PREFIX = "EMR_VARS_"

Variables

This section is empty.

Functions

func AlphaNumFilter

func AlphaNumFilter(s string) string

func IdentityMap

func IdentityMap(ctx MapContext)

func IdentityReduce

func IdentityReduce(ctx ReduceContext)

func IntegerSumReduce

func IntegerSumReduce(ctx ReduceContext)

func LapackToolChecker

func LapackToolChecker(path string, t tool.Interface) error

func List

func List(ss3 s3.Interface, output *StepLocation, ch chan s3.ListedObject)

randomize order of each listing batch

func LoadLines

func LoadLines(ss3 s3.Interface, output *StepLocation, f func(string, *KeyValue))

func LoadLines2

func LoadLines2(ss3 s3.Interface, output *StepLocation, threads int, decider UrlDeciderFunc, f func(string, *KeyValue))

func LoadLines3

func LoadLines3(ss3 s3.Interface, output *StepLocation, threads int, proc FileProcessor)

enables transactional processing of files

func NullChecker

func NullChecker(path string, t tool.Interface) error

func ReassembleLine

func ReassembleLine(kv KeyValue) string

func SlurpLines

func SlurpLines(r io.Reader, f func(string)) error

reads all lines, returns error, or nil on EOF

func StreamUrl

func StreamUrl(u string, max int, backoff time.Duration) (io.ReadCloser, error)

func UrlHasExtension

func UrlHasExtension(u, ext string) bool

Types

type Context

type Context struct {
	Vars     map[string]string
	Filename string
}

type Count

type Count struct {
	Group, Counter string
	Amount         int
}

type Error

type Error struct {
	Error error
}

type FileKeyValue

type FileKeyValue struct {
	Filename string
	Item     *KeyValue
}

type FileProcessor

type FileProcessor interface {

	// should return function to process keyvalue's from file, or nil if no processing
	ForFile(url string, size int) KeyValueProcessor

	// indicates the given file was successfully processed
	Success(url string)

	// called upon failure processing a file; should return a processor if we want to retry
	Failure(url string, size int, err error) KeyValueProcessor
}

each KeyValueProcessor called from just a single thread

type FloatSumReducer

type FloatSumReducer struct {
	Format string
}

func (FloatSumReducer) Reduce

func (f FloatSumReducer) Reduce(ctx ReduceContext)

type Flow

type Flow struct {
	IsSpot             bool
	Auth               aws.Auth
	Steps              []Step
	Instances          int
	MasterInstanceType string
	MasterSpotPrice    float64 `json:",omitempty"`
	SlaveInstanceType  string
	SlaveSpotPrice     float64 `json:",omitempty"`
	ScriptBucket       string
	LogBucket          string
	KeepAlive          bool
	KeyName            string
	AvailabilityZone   string
}

type FlowListener

type FlowListener func(id, state string)

type FlowsResponse

type FlowsResponse struct {
	State     string       `xml:"DescribeJobFlowsResult>JobFlows>member>ExecutionStatusDetail>State"`
	MasterDNS string       `xml:"DescribeJobFlowsResult>JobFlows>member>Instances>MasterPublicDnsName"`
	Steps     []StepMember `xml:"DescribeJobFlowsResult>JobFlows>member>Steps>member"`
}

func FetchFlow

func FetchFlow(a aws.Auth, flow string) *FlowsResponse

func (*FlowsResponse) GetStep

func (f *FlowsResponse) GetStep(name string) *StepMember

type IdentityMapperTool

type IdentityMapperTool struct {
	Id      string
	Taglist []string
}

func (*IdentityMapperTool) Description

func (m *IdentityMapperTool) Description() string

func (*IdentityMapperTool) MarshalJSON

func (t *IdentityMapperTool) MarshalJSON() ([]byte, error)

func (*IdentityMapperTool) Name

func (m *IdentityMapperTool) Name() string

func (*IdentityMapperTool) Run

func (m *IdentityMapperTool) Run(args []string)

func (*IdentityMapperTool) String

func (m *IdentityMapperTool) String() string

func (*IdentityMapperTool) Tags

func (t *IdentityMapperTool) Tags() []string

type IdentityReducerTool

type IdentityReducerTool struct {
	Id      string
	Taglist []string
}

func (*IdentityReducerTool) Description

func (m *IdentityReducerTool) Description() string

func (*IdentityReducerTool) MarshalJSON

func (t *IdentityReducerTool) MarshalJSON() ([]byte, error)

func (*IdentityReducerTool) Name

func (m *IdentityReducerTool) Name() string

func (*IdentityReducerTool) Run

func (m *IdentityReducerTool) Run(args []string)

func (*IdentityReducerTool) String

func (m *IdentityReducerTool) String() string

func (*IdentityReducerTool) Tags

func (t *IdentityReducerTool) Tags() []string

type KeyValue

type KeyValue struct {
	Key, Value string
}

func ParseLine

func ParseLine(line string) KeyValue

func ParseLineSep

func ParseLineSep(line, sep string) KeyValue

func (KeyValue) String

func (kv KeyValue) String() string

type KeyValueProcessor

type KeyValueProcessor func(*KeyValue)

type MapContext

type MapContext struct {
	Input <-chan KeyValue
	Output
	Context
	Error
}

type MapTool

type MapTool struct {
	// contains filtered or unexported fields
}

func NewHiddenMapTool

func NewHiddenMapTool(m Mapper, name, description string) *MapTool

func NewMapTool

func NewMapTool(m Mapper, name, description string) *MapTool

func (*MapTool) Description

func (m *MapTool) Description() string

func (*MapTool) MarshalJSON

func (t *MapTool) MarshalJSON() ([]byte, error)

func (*MapTool) Name

func (m *MapTool) Name() string

func (*MapTool) Run

func (m *MapTool) Run(args []string)

func (*MapTool) String

func (m *MapTool) String() string

func (*MapTool) Tags

func (t *MapTool) Tags() []string

type Mapper

type Mapper func(ctx MapContext)

type MonFlow

type MonFlow struct {
	Auth map[string]aws.Auth
	FlowListener
}

func NewMonFlow

func NewMonFlow(a map[string]aws.Auth, l FlowListener) *MonFlow

func (*MonFlow) Name

func (m *MonFlow) Name() string

func (*MonFlow) Run

func (m *MonFlow) Run(args []string)

type Output

type Output struct {
	Collector chan<- KeyValue
	Counters  chan<- Count
}

mappers and reducers should not close these channels!

type ReduceContext

type ReduceContext struct {
	Input <-chan ReduceJob
	Output
	Context
}

type ReduceJob

type ReduceJob struct {
	Key    string
	Values <-chan string
}

type ReduceTool

type ReduceTool struct {
	// contains filtered or unexported fields
}

func NewHiddenReduceTool

func NewHiddenReduceTool(r Reducer, name, description string) *ReduceTool

func NewReduceTool

func NewReduceTool(r Reducer, name, description string) *ReduceTool

func (*ReduceTool) Description

func (m *ReduceTool) Description() string

func (*ReduceTool) MarshalJSON

func (t *ReduceTool) MarshalJSON() ([]byte, error)

func (*ReduceTool) Name

func (m *ReduceTool) Name() string

func (*ReduceTool) Run

func (m *ReduceTool) Run(args []string)

func (*ReduceTool) String

func (m *ReduceTool) String() string

func (*ReduceTool) Tags

func (t *ReduceTool) Tags() []string

type Reducer

type Reducer func(ctx ReduceContext)

type RunFlowResponse

type RunFlowResponse struct {
	FlowId string `xml:"RunJobFlowResult>JobFlowId"`
}

func ParseEmrResponse

func ParseEmrResponse(r io.Reader) (*RunFlowResponse, error)

func Run

func Run(flow Flow) (*RunFlowResponse, error)

type ShowFlow

type ShowFlow struct {
	Auth map[string]aws.Auth
}

should have a "default" key

func NewShowFlow

func NewShowFlow(a aws.Auth) *ShowFlow

func (*ShowFlow) Description

func (m *ShowFlow) Description() string

func (*ShowFlow) Name

func (m *ShowFlow) Name() string

func (*ShowFlow) Run

func (m *ShowFlow) Run(args []string)

type Slurper

type Slurper struct {
}

func (*Slurper) Name

func (*Slurper) Name() string

func (*Slurper) Run

func (t *Slurper) Run(args []string)

type Step

type Step struct {
	Name               string
	Inputs             []string
	Output             string
	Reducers           int           `json:",omitempty"`
	Timeout            time.Duration `json:",omitempty"`
	Mapper, Reducer    Streaming
	Compress           bool              `json:",omitempty"`
	CompressMapOutput  bool              `json:",omitempty"`
	SortSecondKeyField bool              `json:",omitempty"`
	ToolChecker        ToolChecker       `json:",omitempty"`
	Vars               map[string]string `json:",omitempty"`

	// additional args on streaming command
	Args []string `json:",omitempty"`

	// this is a big one: determines whether input files are lists of url's or not
	IndirectMapJob bool `json:",omitempty"`
}

type StepLocation

type StepLocation struct {
	Bucket string
	Prefix string
}

type StepMember

type StepMember struct {
	Name string   `xml:"StepConfig>Name"`
	Args []string `xml:"StepConfig>HadoopJarStep>Args>member"`
}

func (*StepMember) ExtractVars

func (f *StepMember) ExtractVars() map[string]string

func (*StepMember) Input

func (s *StepMember) Input() *StepLocation

func (*StepMember) Output

func (s *StepMember) Output() *StepLocation

type Streaming

type Streaming interface {
	tool.Interface
	// contains filtered or unexported methods
}

streaming interface is basically just tool.Interface, but with a private method just to make sure that nobody outside of this package can implement it! i.e., we control which kinds of tools can be mappers and reducers

type ToolChecker

type ToolChecker func(path string, t tool.Interface) error

returns false if somehow tool doesn't check out

func (*ToolChecker) MarshalJSON

func (t *ToolChecker) MarshalJSON() ([]byte, error)

type UrlDeciderFunc

type UrlDeciderFunc func(url string) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL