Documentation
¶
Index ¶
- type Stream
- type Streamer
- func (streamer *Streamer) Count() (int, error)
- func (streamer *Streamer) Error() error
- func (streamer *Streamer) Filter(filter interface{}) *Streamer
- func (streamer *Streamer) First(result interface{}) (exist bool, err error)
- func (streamer *Streamer) Foreach(foreachOp interface{}) error
- func (streamer *Streamer) GroupBy(keyer interface{}, result interface{}) error
- func (streamer *Streamer) IndexAt(index int, result interface{}) (bool, error)
- func (streamer *Streamer) Last(result interface{}) (bool, error)
- func (streamer *Streamer) Limit(n int) *Streamer
- func (streamer *Streamer) Map(mapper interface{}) *Streamer
- func (streamer *Streamer) Offset(n int) *Streamer
- func (streamer *Streamer) Parallel(parallel int) *Streamer
- func (streamer *Streamer) Scan(result interface{}) error
- func (streamer *Streamer) Sorted(sorter interface{}) *Streamer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Stream ¶
type Stream interface {
// 设置并行度。并行度不是全局的概念,而是一个操作上的概念。
// 比如并行度为k,数据量为n,那么在执行filter/map等操作时,会创建k个goroutine
// 让每个goroutine上承担 n/k 的数据量(无法整除则将剩余数据并入最后一个goroutine)
// streamer默认继承上一个streamer的并行度,如果没有上一个streamer,那么默认并行度为1。
// 在某个操作上设置新的并行度,不会影响之前的操作的并行度。
// 例如:
// 源数据较多,执行filter时可以设置较大的并行度,从而提高效率;
// 经过filter后的数据量已经不多了,那么可以在map上设置较小的并行度;
// 从而避免创建过多goroutine。
// 上面说到并行度不是全局的概念,但可以通过某些操作实现全局的并行度设置。
// 即可以在最初的streamer上设置全局并行度k,随后不再设置并行度,从而实现全局并行度k。
Parallel(parallel int) Stream
// 根据filter func过滤符合条件的elem
// filter参数应为 func (item T) bool,T为上游数据类型
Filter(filter interface{}) Stream
// 根据mapper func将stream中的elem对象转化成另一种对象
// mapper参数应为 func (item T) O,T为上游数据类型,O为产出的新数据类型
Map(mapper interface{}) Stream
// 跳过前n条记录
Offset(n int) Stream
// 取前n条记录
Limit(n int) Stream
// 根据sorter的排序规则进行排序,sorter的结果为true则为降序,为false为升序
// sorter参数应为 func (item1, item2 T) bool,T为上游数据类型
Sorted(sorter interface{}) Stream
// 遍历所有结果,对每个结果执行希望的op func
// foreachOp参数应为 func (item T) err,T为上游数据类型,如果err != nil 会提前中止
Foreach(foreachOp interface{}) error
// 将结果读取出来,调用者根据stream中的元素类型,传入相应的slice pointer
// result参数应为 []T类型,T为上游数据类型
Scan(result interface{}) error
// 根据getKey func获取key,并做聚合。聚合结果由result带出。
// keyer参数应为 func (item T) K ,T为上游数据类型,K为 groupby key的类型
// result参数应为map[K][]T
GroupBy(keyer interface{}, result interface{}) error
// 获取结果中的第一个
// result参数应为T类型,T为上游数据类型
First(result interface{}) (bool, error)
// 获取结果中的最后一个
// result参数应为T类型,T为上游数据类型
Last(result interface{}) (bool, error)
// 获取结果中的第index个(从0开始计数)
// result参数应为T类型,T为上游数据类型
IndexAt(index int, result interface{}) (bool, error)
// 获取元素数
Count() int
/*
* 辅助方法
*/
// 返回stream过程中的err
Error() error
}
Stream Stream
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
Streamer Streamer 在Streamer上链式惰性操作,会形成一个链表的结构(通过lastStreamer连接) 在这个链表上的每一个节点(除了头节点持有了data slice),都不持有具体的数据。 即不保存数据本身,而是保存操作。
func NewStreamerWithData ¶
func NewStreamerWithData(data interface{}) *Streamer
NewStreamerWithData 只接受slice类型
func (*Streamer) Filter ¶
Filter 过滤规则,filter的参数elem是stream中的元素 若调用者在filter中进行转型断言,需要调用者自己保证stream中的元素可以被转型断言
func (*Streamer) Map ¶
Map 转化规则,mapper的参数elem是stream中的元素,mapper返回值则会继续进入stream 若调用者在mapper中进行转型断言,需要调用者自己保证stream中的元素可以被转型断言
Click to show internal directories.
Click to hide internal directories.