flow

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2021 License: Apache-2.0 Imports: 7 Imported by: 0

README

Flow

流处理

Documentation

Index

Constants

View Source
const DefaultAntsPoolSize = 10000

DefaultAntsPoolSize The default capacity of the default goroutine pool

Variables

View Source
var (
	Error = errors.New("error")
)

Functions

This section is empty.

Types

type ChanContext added in v0.0.5

type ChanContext chan *Context

ChanContext Data stream channel

type Context added in v0.0.5

type Context struct {
	// contains filtered or unexported fields
}

Context 流处理上下文

func (*Context) Data added in v0.0.6

func (c *Context) Data() interface{}

Data Return data

func (*Context) Err added in v0.0.5

func (c *Context) Err() error

Err 返回错误信息

func (*Context) FlowId added in v0.0.5

func (c *Context) FlowId() string

FlowId 返回流处理ID

func (*Context) GetCache added in v0.0.6

func (c *Context) GetCache(key string) (value interface{})

GetCache 返回对应 key 的缓存值

func (*Context) SetCache added in v0.0.6

func (c *Context) SetCache(key string, value interface{})

SetCache 设置缓存

func (*Context) SetData added in v0.0.6

func (c *Context) SetData(data interface{})

SetData Change the data

func (*Context) SetErr added in v0.0.5

func (c *Context) SetErr(err error)

SetErr 设置错误信息 只能被设置一次非nil错误信息

func (*Context) String added in v0.0.5

func (c *Context) String() string

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

Flow Implements a flow

func NewFlow

func NewFlow(buff int) *Flow

NewFlow Create a new stream processing

func (*Flow) Feed

func (f *Flow) Feed(data interface{}) string

Feed Feed stream processing data

func (*Flow) Run

func (f *Flow) Run(resultFunc Func, options ...Option)

Run Establish a stream processing channel

func (*Flow) To

func (f *Flow) To(funcNode Func) Node

To Data flows into the function flow node

func (*Flow) ToNode

func (f *Flow) ToNode(node Node) Node

ToNode Data flows into the flow node

func (*Flow) Wait

func (f *Flow) Wait()

Wait Wait for all streams to end

type Func

type Func func(ctx *Context)

Func 节点处理函数

type FuncNode

type FuncNode struct {
	// contains filtered or unexported fields
}

FuncNode 函数流节点

func (*FuncNode) Next

func (f *FuncNode) Next() Node

Next 下一个流节点

func (*FuncNode) Run

func (f *FuncNode) Run(in *Context)

Run 执行流节点函数

func (*FuncNode) To

func (f *FuncNode) To(funcNode Func) Node

To 数据流入函数流节点

func (*FuncNode) ToNode

func (f *FuncNode) ToNode(node Node) Node

ToNode 数据流入流节点

type Node

type Node interface {
	Next() Node      // 子计算流
	Run(in *Context) //
	To(funcNode Func) Node
	ToNode(node Node) Node
}

Node 实现该接口的是计算流

func NewFuncNode

func NewFuncNode(funcNode Func) Node

NewFuncNode 新建一个函数流节点

type NodeData added in v0.1.0

type NodeData struct {
	// contains filtered or unexported fields
}

NodeData Node data

type Option added in v0.1.0

type Option func(options *Options)

func WithDisablePool added in v0.1.0

func WithDisablePool() Option

WithDisablePool return a Option with pool closed

func WithEnablePool added in v0.1.0

func WithEnablePool(enable bool) Option

WithEnablePool return a Option

func WithOption added in v0.1.0

func WithOption(options *Options) Option

WithOption return a Option interface

func WithPoolSize added in v0.1.0

func WithPoolSize(size int) Option

WithPoolSize return a Option that set pool size

type Options added in v0.1.0

type Options struct {
	// contains filtered or unexported fields
}

Options Options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL