mr

package
v1.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2022 License: Apache-2.0 Imports: 6 Imported by: 0

README

MapReduce

案例:

package main

import (
	"fmt"
	"time"

	"github.com/learninto/gopkg/mr"
)

func main() {
	// 抢票结果
	var resp string

	// 抢票:携程
	primary := func(cancel func(error)) {
		time.Sleep(time.Second * 2) //睡 2 秒
		resp += "通过 携程 抢到票了"
		cancel(nil)
	}

	//抢票:12306
	secondary := func(cancel func(error)) {
		time.Sleep(time.Millisecond * 1500) // 睡 1500 毫秒
		resp += "通过 12306 抢到票了"
		cancel(nil)
	}

	// 放入 MapReduce 同时开抢
	_ = mr.MapReduceVoid(func(source chan<- interface{}) {
		source <- primary
		source <- secondary
	}, func(item interface{}, writer mr.Writer, cancel func(error)) {
		fn := item.(func(func(error)))
		fn(cancel)
	}, func(pipe <-chan interface{}, cancel func(error)) {
		for item := range pipe {
			resp, _ = item.(string)
		}
	})

	// 输出结果:通过 12306 抢到票了 ----
	fmt.Println(resp, "----")
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrCancelWithNil  = errors.New("mapreduce cancelled with nil")
	ErrReduceNoOutput = errors.New("reduce not writing value")
)

Functions

func Finish

func Finish(fns ...func() error) error

func FinishVoid

func FinishVoid(fns ...func())

func Map

func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{}

func MapReduce

func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error)

func MapReduceVoid

func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error

func MapReduceWithSource

func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
	opts ...Option) (interface{}, error)

func MapVoid

func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ...Option)

func Recover

func Recover(cleanups ...func())

func RunSafe

func RunSafe(fn func())

Types

type GenerateFunc

type GenerateFunc func(source chan<- interface{})

type MapFunc

type MapFunc func(item interface{}, writer Writer)

type MapperFunc

type MapperFunc func(item interface{}, writer Writer, cancel func(error))

type Option

type Option func(opts *mapReduceOptions)

func WithWorkers

func WithWorkers(workers int) Option

type ReducerFunc

type ReducerFunc func(pipe <-chan interface{}, writer Writer, cancel func(error))

type VoidMapFunc

type VoidMapFunc func(item interface{})

type VoidReducerFunc

type VoidReducerFunc func(pipe <-chan interface{}, cancel func(error))

type Writer

type Writer interface {
	Write(v interface{})
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL