streamv2

package
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2021 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Stream

type Stream interface {

	// 设置并行度。并行度不是全局的概念,而是一个操作上的概念。
	// 比如并行度为k,数据量为n,那么在执行filter/map等操作时,会创建k个goroutine
	// 让每个goroutine上承担 n/k 的数据量(无法整除则将剩余数据并入最后一个goroutine)
	// streamer默认继承上一个streamer的并行度,如果没有上一个streamer,那么默认并行度为1。
	// 在某个操作上设置新的并行度,不会影响之前的操作的并行度。
	// 例如:
	// 		源数据较多,执行filter时可以设置较大的并行度,从而提高效率;
	// 		经过filter后的数据量已经不多了,那么可以在map上设置较小的并行度;
	// 		从而避免创建过多goroutine。
	// 上面说到并行度不是全局的概念,但可以通过某些操作实现全局的并行度设置。
	// 即可以在最初的streamer上设置全局并行度k,随后不再设置并行度,从而实现全局并行度k。
	Parallel(parallel int) Stream
	// 根据filter func过滤符合条件的elem
	// filter参数应为 func (item T) bool,T为上游数据类型
	Filter(filter interface{}) Stream
	// 根据mapper func将stream中的elem对象转化成另一种对象
	// mapper参数应为 func (item T) O,T为上游数据类型,O为产出的新数据类型
	Map(mapper interface{}) Stream
	// 跳过前n条记录
	Offset(n int) Stream
	// 取前n条记录
	Limit(n int) Stream
	// 根据sorter的排序规则进行排序,sorter的结果为true则为降序,为false为升序
	// sorter参数应为 func (item1, item2 T) bool,T为上游数据类型
	Sorted(sorter interface{}) Stream

	// 遍历所有结果,对每个结果执行希望的op func
	// foreachOp参数应为 func (item T) err,T为上游数据类型,如果err != nil 会提前中止
	Foreach(foreachOp interface{}) error
	// 将结果读取出来,调用者根据stream中的元素类型,传入相应的slice pointer
	// result参数应为 []T类型,T为上游数据类型
	Scan(result interface{}) error
	// 根据getKey func获取key,并做聚合。聚合结果由result带出。
	// keyer参数应为 func (item T) K ,T为上游数据类型,K为 groupby key的类型
	// result参数应为map[K][]T
	GroupBy(keyer interface{}, result interface{}) error
	// 获取结果中的第一个
	// result参数应为T类型,T为上游数据类型
	First(result interface{}) (bool, error)
	// 获取结果中的最后一个
	// result参数应为T类型,T为上游数据类型
	Last(result interface{}) (bool, error)
	// 获取结果中的第index个(从0开始计数)
	// result参数应为T类型,T为上游数据类型
	IndexAt(index int, result interface{}) (bool, error)
	// 获取元素数
	Count() int

	/*
	 * 辅助方法
	 */
	// 返回stream过程中的err
	Error() error
}

Stream Stream

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

Streamer Streamer 在Streamer上链式惰性操作,会形成一个链表的结构(通过lastStreamer连接) 在这个链表上的每一个节点(除了头节点持有了data slice),都不持有具体的数据。 即不保存数据本身,而是保存操作。

func NewStreamerWithData

func NewStreamerWithData(data interface{}) *Streamer

NewStreamerWithData 只接受slice类型

func (*Streamer) Count

func (streamer *Streamer) Count() (int, error)

Count 计数

func (*Streamer) Error

func (streamer *Streamer) Error() error

func (*Streamer) Filter

func (streamer *Streamer) Filter(filter interface{}) *Streamer

Filter 过滤规则,filter的参数elem是stream中的元素 若调用者在filter中进行转型断言,需要调用者自己保证stream中的元素可以被转型断言

func (*Streamer) First

func (streamer *Streamer) First(result interface{}) (exist bool, err error)

First 取第一个结果

func (*Streamer) Foreach

func (streamer *Streamer) Foreach(foreachOp interface{}) error

Foreach 遍历streamer中的每个元素

func (*Streamer) GroupBy

func (streamer *Streamer) GroupBy(keyer interface{}, result interface{}) error

GroupBy 根据getKey函数获取key,并将group by结果作为一个result map带回

func (*Streamer) IndexAt

func (streamer *Streamer) IndexAt(index int, result interface{}) (bool, error)

IndexAt 取第index个结果(从0开始计数)

func (*Streamer) Last

func (streamer *Streamer) Last(result interface{}) (bool, error)

Last 取最后一个结果

func (*Streamer) Limit

func (streamer *Streamer) Limit(n int) *Streamer

Limit 取前n条记录,惰性操作,只在执行了终结操作时起作用

func (*Streamer) Map

func (streamer *Streamer) Map(mapper interface{}) *Streamer

Map 转化规则,mapper的参数elem是stream中的元素,mapper返回值则会继续进入stream 若调用者在mapper中进行转型断言,需要调用者自己保证stream中的元素可以被转型断言

func (*Streamer) Offset

func (streamer *Streamer) Offset(n int) *Streamer

Offset 跳过前n条记录,惰性操作,只在执行了终结操作时起作用

func (*Streamer) Parallel

func (streamer *Streamer) Parallel(parallel int) *Streamer

Parallel 设置并行度

func (*Streamer) Scan

func (streamer *Streamer) Scan(result interface{}) error

Scan 将结果带出

func (*Streamer) Sorted

func (streamer *Streamer) Sorted(sorter interface{}) *Streamer

Sorted 排序

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL