Documentation ¶
Index ¶
- type Stream
- type Streamer
- func (streamer *Streamer) Count() (int, error)
- func (streamer *Streamer) Filter(filter func(elem interface{}) bool) *Streamer
- func (streamer *Streamer) First(result interface{}) (exist bool, err error)
- func (streamer *Streamer) Foreach(op func(elem interface{}) error) error
- func (streamer *Streamer) GroupBy(getKey func(elem interface{}) interface{}, result interface{}) error
- func (streamer *Streamer) IndexAt(index int, result interface{}) (bool, error)
- func (streamer *Streamer) Last(result interface{}) (bool, error)
- func (streamer *Streamer) Limit(n int) *Streamer
- func (streamer *Streamer) Map(mapper func(elem interface{}) interface{}) *Streamer
- func (streamer *Streamer) Offset(n int) *Streamer
- func (streamer *Streamer) Parallel(parallel int) *Streamer
- func (streamer *Streamer) Scan(result interface{}) error
- func (streamer *Streamer) Sorted(sorter func(elem1, elem2 interface{}) bool) *Streamer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Stream ¶
type Stream interface { // 设置并行度。并行度不是全局的概念,而是一个操作上的概念。 // 比如并行度为k,数据量为n,那么在执行filter/map等操作时,会创建k个goroutine // 让每个goroutine上承担 n/k 的数据量(无法整除则将剩余数据并入最后一个goroutine) // streamer默认继承上一个streamer的并行度,如果没有上一个streamer,那么默认并行度为1。 // 在某个操作上设置新的并行度,不会影响之前的操作的并行度。 // 例如: // 源数据较多,执行filter时可以设置较大的并行度,从而提高效率; // 经过filter后的数据量已经不多了,那么可以在map上设置较小的并行度; // 从而避免创建过多goroutine。 // 上面说到并行度不是全局的概念,但可以通过某些操作实现全局的并行度设置。 // 即可以在最初的streamer上设置全局并行度k,随后不再设置并行度,从而实现全局并行度k。 Parallel(parallel int) Stream // 根据filter func过滤符合条件的elem Filter(filter func(elem interface{}) bool) Stream // 根据mapper func将stream中的elem对象转化成另一种对象 Map(mapper func(elem interface{}) interface{}) Stream // 跳过前n条记录 Offset(n int) Stream // 取前n条记录 Limit(n int) Stream // 根据sorter的排序规则进行排序,sorter的结果为true则为降序,为false为升序 Sorted(sorter func(elem1, elem2 interface{}) bool) Stream // 遍历所有结果,对每个结果执行希望的op func Foreach(op func(elem interface{}) error) error // 将结果读取出来,调用者根据stream中的元素类型,传入相应的slice pointer Scan(result interface{}) error // 根据getKey func获取key,并做聚合。聚合结果由result带出。 GroupBy(getKey func(elem interface{}) interface{}, result interface{}) error // 获取结果中的第一个 First(result interface{}) (bool, error) // 获取结果中的最后一个 Last(result interface{}) (bool, error) // 获取结果中的第index个(从0开始计数) IndexAt(index int, result interface{}) (bool, error) // 获取元素数 Count() int }
Stream Stream
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
Streamer Streamer 在Streamer上链式惰性操作,会形成一个链表的结构(通过lastStreamer连接) 在这个链表上的每一个节点(除了头节点持有了data slice),都不持有具体的数据。 即不保存数据本身,而是保存操作。
func NewStreamerWithData ¶
NewStreamerWithData 只接受slice类型
func (*Streamer) Filter ¶
Filter 过滤规则,filter的参数elem是stream中的元素 若调用者在filter中进行转型断言,需要调用者自己保证stream中的元素可以被转型断言
func (*Streamer) GroupBy ¶
func (streamer *Streamer) GroupBy(getKey func(elem interface{}) interface{}, result interface{}) error
GroupBy 根据getKey函数获取key,并将group by结果作为一个result map带回
func (*Streamer) Map ¶
Map 转化规则,mapper的参数elem是stream中的元素,mapper返回值则会继续进入stream 若调用者在mapper中进行转型断言,需要调用者自己保证stream中的元素可以被转型断言
Click to show internal directories.
Click to hide internal directories.