stream

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2020 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Stream

type Stream interface {

	// 设置并行度。并行度不是全局的概念,而是一个操作上的概念。
	// 比如并行度为k,数据量为n,那么在执行filter/map等操作时,会创建k个goroutine
	// 让每个goroutine上承担 n/k 的数据量(无法整除则将剩余数据并入最后一个goroutine)
	// streamer默认继承上一个streamer的并行度,如果没有上一个streamer,那么默认并行度为1。
	// 在某个操作上设置新的并行度,不会影响之前的操作的并行度。
	// 例如:
	// 		源数据较多,执行filter时可以设置较大的并行度,从而提高效率;
	// 		经过filter后的数据量已经不多了,那么可以在map上设置较小的并行度;
	// 		从而避免创建过多goroutine。
	// 上面说到并行度不是全局的概念,但可以通过某些操作实现全局的并行度设置。
	// 即可以在最初的streamer上设置全局并行度k,随后不再设置并行度,从而实现全局并行度k。
	Parallel(parallel int) Stream
	// 根据filter func过滤符合条件的elem
	Filter(filter func(elem interface{}) bool) Stream
	// 根据mapper func将stream中的elem对象转化成另一种对象
	Map(mapper func(elem interface{}) interface{}) Stream
	// 跳过前n条记录
	Offset(n int) Stream
	// 取前n条记录
	Limit(n int) Stream
	// 根据sorter的排序规则进行排序,sorter的结果为true则为降序,为false为升序
	Sorted(sorter func(elem1, elem2 interface{}) bool) Stream

	// 遍历所有结果,对每个结果执行希望的op func
	Foreach(op func(elem interface{}) error) error
	// 将结果读取出来,调用者根据stream中的元素类型,传入相应的slice pointer
	Scan(result interface{}) error
	// 根据getKey func获取key,并做聚合。聚合结果由result带出。
	GroupBy(getKey func(elem interface{}) interface{}, result interface{}) error
	// 获取结果中的第一个
	First(result interface{}) (bool, error)
	// 获取结果中的最后一个
	Last(result interface{}) (bool, error)
	// 获取结果中的第index个(从0开始计数)
	IndexAt(index int, result interface{}) (bool, error)
	// 获取元素数
	Count() int
}

Stream Stream

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

Streamer Streamer 在Streamer上链式惰性操作,会形成一个链表的结构(通过lastStreamer连接) 在这个链表上的每一个节点(除了头节点持有了data slice),都不持有具体的数据。 即不保存数据本身,而是保存操作。

func NewStreamerWithData

func NewStreamerWithData(data interface{}) (*Streamer, error)

NewStreamerWithData 只接受slice类型

func (*Streamer) Count

func (streamer *Streamer) Count() (int, error)

Count 计数

func (*Streamer) Filter

func (streamer *Streamer) Filter(filter func(elem interface{}) bool) *Streamer

Filter 过滤规则,filter的参数elem是stream中的元素 若调用者在filter中进行转型断言,需要调用者自己保证stream中的元素可以被转型断言

func (*Streamer) First

func (streamer *Streamer) First(result interface{}) (exist bool, err error)

First 取第一个结果

func (*Streamer) Foreach

func (streamer *Streamer) Foreach(op func(elem interface{}) error) error

Foreach 遍历streamer中的每个元素

func (*Streamer) GroupBy

func (streamer *Streamer) GroupBy(getKey func(elem interface{}) interface{}, result interface{}) error

GroupBy 根据getKey函数获取key,并将group by结果作为一个result map带回

func (*Streamer) IndexAt

func (streamer *Streamer) IndexAt(index int, result interface{}) (bool, error)

IndexAt 取第index个结果(从0开始计数)

func (*Streamer) Last

func (streamer *Streamer) Last(result interface{}) (bool, error)

Last 取最后一个结果

func (*Streamer) Limit

func (streamer *Streamer) Limit(n int) *Streamer

Limit 取前n条记录,惰性操作,只在执行了终结操作时起作用

func (*Streamer) Map

func (streamer *Streamer) Map(mapper func(elem interface{}) interface{}) *Streamer

Map 转化规则,mapper的参数elem是stream中的元素,mapper返回值则会继续进入stream 若调用者在mapper中进行转型断言,需要调用者自己保证stream中的元素可以被转型断言

func (*Streamer) Offset

func (streamer *Streamer) Offset(n int) *Streamer

Offset 跳过前n条记录,惰性操作,只在执行了终结操作时起作用

func (*Streamer) Parallel

func (streamer *Streamer) Parallel(parallel int) *Streamer

Parallel 设置并行度

func (*Streamer) Scan

func (streamer *Streamer) Scan(result interface{}) error

Scan 将结果带出

func (*Streamer) Sorted

func (streamer *Streamer) Sorted(sorter func(elem1, elem2 interface{}) bool) *Streamer

Sorted 排序

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL