Documentation
¶
Index ¶
- func StreamMap[T any, R any](ctx context.Context, concurrency int, name string, startIndex int64, ...) chan *OutputItem[T, R]
- type InputItem
- type MapperFunc
- type OperationType
- type OutputItem
- type ParallelMapImpl
- func (m *ParallelMapImpl[T, R]) Close() error
- func (m *ParallelMapImpl[T, R]) ReadMapperResult(chOutput chan *OutputItem[T, R]) ResultsMap[T, R]
- func (m *ParallelMapImpl[T, R]) Start(inputs []T, mapper MapperFunc[T, R]) chan *OutputItem[T, R]
- func (m *ParallelMapImpl[T, R]) StartWithChan(startIndex int64, chData chan T, mapper MapperFunc[T, R]) chan *OutputItem[T, R]
- type ReducerFunc
- type ResultsMap
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type MapperFunc ¶
type OperationType ¶
type OperationType uint8
const ( Continue OperationType = iota // 继续处理后续 Item, 即使有错也忽略 Stop // 立即停止处理 -- 比如文件拷贝任务,但当目标磁盘满时需要立刻停止 )
func Reduce ¶
func Reduce[T any](ctx context.Context, identity T, inputs []T, accumulator ReducerFunc[T]) (T, OperationType, error)
func (OperationType) String ¶
func (op OperationType) String() string
type OutputItem ¶
type OutputItem[T any, R any] struct { Index int64 Item T OpType OperationType Result R Err error }
func (*OutputItem[T, R]) String ¶
func (oi *OutputItem[T, R]) String() string
type ParallelMapImpl ¶
func NewParallelMapImpl ¶
func (*ParallelMapImpl[T, R]) Close ¶
func (m *ParallelMapImpl[T, R]) Close() error
func (*ParallelMapImpl[T, R]) ReadMapperResult ¶
func (m *ParallelMapImpl[T, R]) ReadMapperResult(chOutput chan *OutputItem[T, R]) ResultsMap[T, R]
func (*ParallelMapImpl[T, R]) Start ¶
func (m *ParallelMapImpl[T, R]) Start(inputs []T, mapper MapperFunc[T, R]) chan *OutputItem[T, R]
func (*ParallelMapImpl[T, R]) StartWithChan ¶
func (m *ParallelMapImpl[T, R]) StartWithChan(startIndex int64, chData chan T, mapper MapperFunc[T, R]) chan *OutputItem[T, R]
type ReducerFunc ¶
type ReducerFunc[T any] func(context.Context, T, T) (T, OperationType, error)
type ResultsMap ¶
type ResultsMap[T any, R any] map[int64]*OutputItem[T, R]
ResultsMap 由于 Map 操作中有可能提前中断(Stop),而且如果是异步处理的话,其结果的顺序是不确定的, 因此返回的结果是: 输入值位置索引(0 基址) => 结果 的 map, 其个数有可能少于输入值的个数(如果发生错误,里面也包含了出错的结果)
func Map ¶
func Map[T any, R any](ctx context.Context, inputs []T, mapper MapperFunc[T, R]) ResultsMap[T, R]
Map 串行的 Map 函数
func ParallelMap ¶
func ParallelMap[T any, R any](ctx context.Context, inputs []T, concurrency int, name string, mapper MapperFunc[T, R]) ResultsMap[T, R]
ParallelMap 并发 Map 方式, 会启动 2 + concurrency 个纤程进行并发的处理.类似纤程池的效果
func (*ResultsMap[T, R]) ConvertResult ¶
func (r *ResultsMap[T, R]) ConvertResult() ([]R, []error, OperationType)
ConvertResult 将 map[int64]*OutputItem[R] 的结果转换成 数组的格式,如果完全正确,则数组的个数和输入数组的个数一样
Click to show internal directories.
Click to hide internal directories.