Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DefaultProcessor ¶ added in v1.0.1
type DefaultProcessor struct {
// contains filtered or unexported fields
}
func (*DefaultProcessor) Proccess ¶ added in v1.0.1
func (p *DefaultProcessor) Proccess(inTaskChan InTaskChan, outTaskChan OutTaskChan, ctx context.Context) (cancelAllProcess bool)
type Flow ¶ added in v0.0.4
type Flow interface {
// AddNodeProcessors 添加流处理节点。
// outChanSize 表示该节点的输出通道大小。
// processors 为处理逻辑,每个Processor占用单独的goroutine,此方法开放性好。
AddNodeProcessors(outChanSize int, processors ...Processor) Node
// AddNodeTaskHandlers 添加流处理节点。
// outChanSize 表示该节点的输出通道大小。
// taskHandlers 为处理逻辑,每个TaskHandler占用单独的goroutine,当需要追踪任务执行轨迹时,可以使用该方法。
AddNodeTaskHandlers(outChanSize int, taskHandlers ...TaskHandler) Node
// Start 启动流处理引擎。
Start()
// Await 等待流处理引擎执行完成。
Await()
// Result 等待执行结果。该方法会阻塞,直到有结果输出。当ok为true时,结果有效。
// 当任务的执行结果有多个时,可循环调用该方法获取结果。
Result() (result interface{}, ok bool)
}
Flow 表示流处理引擎。
type InTaskChan ¶ added in v0.0.3
type InTaskChan <-chan Task
type Node ¶ added in v1.0.0
type Node interface {
Done() <-chan struct{}
// Cancel cancels the task execution,the submitted tasks will be executed.
Cancel()
// SetProcessorSelector
SetProcessorSelector(processorSelector ProcessorSelector)
SubmitTask(task Task) error
// contains filtered or unexported methods
}
Node defines a node to process tasks
type OutTaskChan ¶ added in v0.0.3
type OutTaskChan chan<- Task
type Processor ¶
type Processor interface {
Proccess(inTasks InTaskChan, outTask OutTaskChan, ctx context.Context) (cancelAllProcess bool)
}
Processor processes task from the previous FlowNode, and place the result into outTask chan which will be proccessed by the next FlowNode. A node includes multiple processors which run concurrently.
func NewTaskHandlerProcessors ¶ added in v1.0.1
func NewTaskHandlerProcessors( taskAt func(task TraceableTask, nodeId int), onNewTaskCreated func(task TraceableTask, nodeId int), onTaskFinished func(task TraceableTask, nodeId int, err error), taskhandlers ...TaskHandler) []Processor
type ProcessorSelector ¶ added in v1.2.2
type ProcessorSelector interface {
// DefineProcessorInTaskChan 为每个processor定义一个输入chan,返回值为输入chan的id
DefineProcessorInTaskChan() (processorInTaskChanIndexes []int)
// SelectInTaskChanIndex 将task分配给某个chan。
SelectInTaskChanIndex(task Task) (inTaskChanIndex int)
}
ProcessorSelector 根据任务属性,决定将任务分发给哪个processor。
针对某个node,将输入TaskChan分解成多个TaskChan,作为node内processor的输入。node内的每个processor,都对应一个输入TaskChan。多个node可以使用同一个输入TaskChan。 应用场景: 1、当task之间存在依赖关系,即:task1执行完成之后才能执行task2,此时需要将task1和task2分配到同一个processor
type TaskHandler ¶ added in v1.0.1
type TaskHandler interface {
// Handle 处理任务。如果返回的err不为nil,则会中断流处理。
// dispatch 用于转发任务至下一个节点。
Handle(inTask Task, dispatch func(outTask Task) error) (err error)
// OnCompleted 当前节点的任务处理完毕时,回调该方法。如果返回的err不为nil,则会中断流处理。
OnCompleted(dispatch func(outTask Task) error) (err error)
}
TaskHandler 定义任务处理逻辑。
func NewSortTaskHandler ¶ added in v1.2.2
func NewSortTaskHandler(taskStartId uint64, maxBlocking int) TaskHandler
NewSortTaskHandler 对任务按照taskId进行排序,taskId越小,越先被转发给下一个节点。 taskStartId:第一个任务的id maxBlocking:队列的最大长度。如果超过最大长度,则会转发队列中taskId最小的任务。
type TaskHandlerAdapter ¶ added in v1.1.0
type TaskHandlerAdapter struct {
}
func (TaskHandlerAdapter) OnCompleted ¶ added in v1.1.0
func (h TaskHandlerAdapter) OnCompleted(dispatch func(outTask Task) error) (err error)
type TraceableFlow ¶ added in v1.2.2
type TraceableFlow interface {
Flow
// taskAt 用于跟踪任务。当TraceableTask流转到某个节点时,回调该方法。
SetTaskAt(taskAt func(task TraceableTask, nodeId int))
// onNewTaskCreated 当新的TraceableTask被创建时,回调该方法。
SetOnNewTaskCreated(onNewTaskCreated func(task TraceableTask, nodeId int))
// onTaskFinished 用于跟踪任务。当TraceableTask结束时,回调该方法。
SetOnTaskFinished(onTaskFinished func(task TraceableTask, nodeId int, err error))
}
type TraceableTask ¶ added in v1.1.0
type TraceableTask interface {
// TaskId 用于获取TaskId,便于追踪。
TaskId() uint64
// Inner 用于获取底层的Task。
Inner() Task
}
TraceableTask 表示可追踪的task。
func ToTraceableTask ¶ added in v1.1.0
func ToTraceableTask(taskId uint64, task Task) TraceableTask
ToTraceableTask 将Task转为TraceableTask。
Click to show internal directories.
Click to hide internal directories.
