Documentation ¶
Index ¶
- Constants
- Variables
- type BaseStep
- type BroadcastStep
- type ConditionalStep
- type ConstantStep
- type Dataflow
- type DataflowMarshaller
- type DataflowNoFn
- type DataflowRun
- type DataflowRunContextKeyType
- type DataflowRunID
- type DataflowRunState
- type DistributeStep
- type DoerStep
- type Executor
- type Flow
- type FlowContextKeyType
- type FlowID
- type FlowNoData
- type FlowQueue
- type FlowSplit
- type FlowSplitID
- type FlowSplitIndexType
- type FlowState
- type HTTPClientFactory
- type JoinStep
- type JoinerStep
- type Logger
- type RaceStep
- type SelectStep
- type SplitterStep
- type Step
- type StepContextKeyType
- type StepType
- type Storage
- type WebMethodStep
Constants ¶
const ( RunStateNew = DataflowRunState("New") RunStateActive = DataflowRunState("Active") RunStateInterrupted = DataflowRunState("Interrupted") RunStateCompleted = DataflowRunState("Completed") RunStateError = DataflowRunState("Error") )
list of workflow run states
Variables ¶
var ( FlowContextKey = FlowContextKeyType{} DataflowRunContextKey = DataflowRunContextKeyType{} StepContextKey = StepContextKeyType{} )
Used to store info in context
Functions ¶
This section is empty.
Types ¶
type BaseStep ¶
type BaseStep struct { ID string `json:"id,omitempty"` Description string `json:"description,omitempty"` Type StepType `json:"type,omitempty"` Next Step `json:"-"` NextID string `json:"next,omitempty"` }
BaseStep holds the basic step details. ID must be unique within a Dataflow. Type is used for serialization. Next points to the next step in the workflow (except for BroadcastStep which forwards to multiples). AcceptContent indicates the content type the step requires, if any. ContentType indicates the content type of the output, if any. If KeepOutput is true, the outputs are copied to the workflow run so they are available when the workflow completes. If HandleErrorAs is not nil, then errors do not stop the flow but instead the given JSON is passed to the next task(s)
func (*BaseStep) PrepareMarshal ¶
func (s *BaseStep) PrepareMarshal()
PrepareMarshal for Step impl in BaseStep
func (*BaseStep) ResolveIDs ¶
ResolveIDs for Step impl in BaseStep
type BroadcastStep ¶
type BroadcastStep struct { BaseStep ForwardTo []Step `json:"-"` ForwardToIDs []string `json:"forwardTo,omitempty"` }
BroadcastStep describes a step that takes its input and forwards to multiple steps
func (*BroadcastStep) PrepareMarshal ¶
func (s *BroadcastStep) PrepareMarshal()
PrepareMarshal sets the step type
func (*BroadcastStep) ResolveIDs ¶
func (s *BroadcastStep) ResolveIDs(stepMap map[string]Step) error
ResolveIDs resolve the ForwardToIDs
type ConditionalStep ¶
ConditionalStep only forwards to the next step if condition is satisfied. Input must be JSON.
func (*ConditionalStep) PrepareMarshal ¶
func (s *ConditionalStep) PrepareMarshal()
PrepareMarshal sets the step type
func (*ConditionalStep) Validate ¶
func (s *ConditionalStep) Validate() []error
Validate checks the condition is set and can be compiled
type ConstantStep ¶
type ConstantStep struct { BaseStep Value json.RawMessage `json:"value,omitempty"` }
ConstantStep takes its value (JSON) and sends it to the Next step
func (*ConstantStep) PrepareMarshal ¶
func (s *ConstantStep) PrepareMarshal()
PrepareMarshal sets the step type
func (*ConstantStep) Validate ¶
func (s *ConstantStep) Validate() []error
Validate checks that the constant step has a value
type Dataflow ¶
type Dataflow struct { ID string `json:"id,omitempty"` Description string `json:"description,omitempty"` Steps []Step `json:"-"` StartAt Step `json:"-"` StepMap map[string]Step `json:"-"` }
Dataflow defines a workflow
func (Dataflow) MarshalJSON ¶
MarshalJSON implements Marshaller for Dataflow
func (*Dataflow) UnmarshalJSON ¶
UnmarshalJSON implements Unmarshaller for Dataflow
type DataflowMarshaller ¶
type DataflowMarshaller struct { DataflowNoFn StepsJSON []json.RawMessage `json:"steps"` StartID string `json:"startAt,omitempty"` }
DataflowMarshaller is used for marshaling a workflow into JSON
type DataflowNoFn ¶
type DataflowNoFn Dataflow
DataflowNoFn erases the marshaling functions to avoid recursion
type DataflowRun ¶
type DataflowRun struct { ID DataflowRunID Dataflow *Dataflow State DataflowRunState }
DataflowRun describes a running workflow
type DataflowRunContextKeyType ¶
type DataflowRunContextKeyType struct{}
DataflowRunContextKeyType used to store workflow run id in context
type DataflowRunID ¶
type DataflowRunID string
DataflowRunID identifies a workflow run (generated UUID)
type DataflowRunState ¶
type DataflowRunState string
DataflowRunState is the string type of workflow run states
type DistributeStep ¶
type DistributeStep struct {
BaseStep
}
DistributeStep takes its input, which must be a JSON array, and sends each element of the array to the Next step
func (*DistributeStep) PrepareMarshal ¶
func (s *DistributeStep) PrepareMarshal()
PrepareMarshal sets the step type
type Executor ¶
type Executor interface { Start(ctx context.Context, workflow *Dataflow) (*DataflowRun, []error) Validate(ctx context.Context, workflow *Dataflow) []error Interrupt(ctx context.Context, run *DataflowRun) GetHTTPClientFactory() HTTPClientFactory GetLogger() Logger GetStorage() Storage }
Executor is the interface implemented by the executing engine
func NewExecutor ¶
func NewExecutor(httpClientFactory HTTPClientFactory, logger Logger, storage Storage, flowQueue FlowQueue) Executor
NewExecutor creates an instance of the execution engine
type Flow ¶
type Flow struct { FlowNoData // helps with serialization Data interface{} // if State is Completed, this has the result }
Flow represents an execution unit for a workflow Dataflow runs start with one flow, set at the starting step. When a step completes, the flow transitions to the next step. Flows can split in Distribute and Broadcast steps. Flows can merge in the Join and Race steps.
type FlowContextKeyType ¶
type FlowContextKeyType struct{}
FlowContextKeyType used to store flow id in context
type FlowNoData ¶
type FlowNoData struct { ID FlowID // UUID DataflowRunID DataflowRunID // identifies the run instance PreviousStepID string NextStepID string State FlowState Message string // if State is Error, this has the explanation ContentType string // content type of data Splits []FlowSplitID // identifies the splits that led to this flow SplitKey string // if the current split is from dictionary, the key SplitIndex int // if the current split is from array, the index }
type FlowQueue ¶
type FlowQueue interface { SetDequeueCb(func(ctx context.Context, flow *Flow) error) Enqueue(ctx context.Context, flow *Flow) error }
FlowQueue is the interface implemented by external queue service
type FlowSplit ¶
type FlowSplit struct { ID FlowSplitID // uuid DataflowRunID DataflowRunID // identifies the run instance SplitStepID string // this would be a broadcast or distribute step ParentFlowID FlowID // the flow that was split IndexType FlowSplitIndexType // the type of index used in the split FlowIDs []FlowID // lists the flows generated by the split }
FlowSplit holds information about an instance of a split flow
type FlowSplitIndexType ¶
type FlowSplitIndexType string
FlowSplitIndexType represents the type of flow split index (key or numerical)
const ( FlowSplitNumericalIndex FlowSplitIndexType = "Numerical" FlowSplitKeyIndex FlowSplitIndexType = "Key" )
These are the index types for a flow split
type FlowState ¶
type FlowState string
FlowState represents the state of a flow
const ( FlowStateActive FlowState = "Active" FlowStateError FlowState = "Error" // flow stopped due to error FlowStateCompleted FlowState = "Completed" // flow dead-ended FlowStateSplit FlowState = "Split" // there are child flows FlowStateInterrupted FlowState = "Interrupted" // e.g. from a conditional )
These are the states a flow can be in, w.r.t. to the previous step ID
type HTTPClientFactory ¶
type HTTPClientFactory interface {
GetHTTPClient(ctx context.Context, disableSSLValidation bool) *http.Client
}
HTTPClientFactory is used to abstract HTTP client creation
type JoinStep ¶
type JoinStep struct {
BaseStep
}
JoinStep waits until all the steps providing input to it complete, combines the inputs into a single result and forwards to the Next step. The combination is a JSON object whose keys are the IDs of the steps providing the input, and the values are the inputs. If the input value is not JSON then it is given as a base64-encoded string. If the input is from a step receiving a distribution, the value is an array.
func (*JoinStep) Join ¶
func (s *JoinStep) Join(ctx context.Context, exec Executor, flow *Flow) (joinedFlow *Flow, err error)
Join implements Joiner interface for join step
func (*JoinStep) PrepareMarshal ¶
func (s *JoinStep) PrepareMarshal()
PrepareMarshal sets the step type
type JoinerStep ¶
type JoinerStep interface {
Join(ctx context.Context, exec Executor, flow *Flow) (joinedFlow *Flow, err error)
}
JoinerStep is implemented by steps that join flows
type Logger ¶
type Logger interface { Debugf(ctx context.Context, fmt string, params ...interface{}) Infof(ctx context.Context, fmt string, params ...interface{}) Warnf(ctx context.Context, fmt string, params ...interface{}) Errorf(ctx context.Context, fmt string, params ...interface{}) }
Logger is passed to other services for pluggable logging
type RaceStep ¶
type RaceStep struct {
BaseStep
}
RaceStep waits until it receives its first active flow input and forwards it to the Next step. Subsequent inputs are discarded
func (*RaceStep) Join ¶
func (s *RaceStep) Join(ctx context.Context, exec Executor, flow *Flow) (joinedFlow *Flow, err error)
Join implements Joiner interface for race step
func (*RaceStep) PrepareMarshal ¶
func (s *RaceStep) PrepareMarshal()
PrepareMarshal sets the step type
type SelectStep ¶
SelectStep expects the flow data to be parseable as JSON. It then uses the selector expression (see https://github.com/oliveagle/jsonpath) and returns the results of applying the expression to the flow data.
func (*SelectStep) PrepareMarshal ¶
func (s *SelectStep) PrepareMarshal()
PrepareMarshal sets the step type
func (*SelectStep) Validate ¶
func (s *SelectStep) Validate() (errList []error)
Validate checks the selector can be compiled
type SplitterStep ¶
type SplitterStep interface {
Split(ctx context.Context, exec Executor, flow *Flow) (outflows []*Flow, split *FlowSplit, err error)
}
SplitterStep is implemented by steps that split flows
type Step ¶
type Step interface { GetID() string GetNextID() string PrepareMarshal() ResolveIDs(map[string]Step) error Validate() []error }
Step is the interface implemented by all steps
func UnmarshalStep ¶
func UnmarshalStep(raw json.RawMessage) (Step, error)
UnmarshalStep returns a specialized step based on the raw JSON
type StepContextKeyType ¶
type StepContextKeyType struct{}
StepContextKeyType used to store step ID in context
type StepType ¶
type StepType string
StepType is an enum for the known step types
const ( TypeWebMethod StepType = "web-method" TypeDistribute StepType = "distribute" TypeBroadcast StepType = "broadcast" TypeSelect StepType = "select" TypeConditional StepType = "conditional" TypeJoin StepType = "join" TypeRace StepType = "race" TypeConstant StepType = "constant" )
These are the known step types
type Storage ¶
type Storage interface { StoreDataflowRun(ctx context.Context, run *DataflowRun) error RetrieveDataflowRuns(ctx context.Context, keys []DataflowRunID) map[DataflowRunID]*DataflowRun DeleteDataflowRun(ctx context.Context, key DataflowRunID) error StoreFlow(ctx context.Context, flow *Flow) error RetrieveFlows(ctx context.Context, keys []FlowID) map[FlowID]*Flow DeleteFlow(ctx context.Context, key FlowID) error StoreFlowSplit(ctx context.Context, flowSplit *FlowSplit) error RetrieveFlowSplits(ctx context.Context, keys []FlowSplitID) map[FlowSplitID]*FlowSplit DeleteFlowSplit(ctx context.Context, key FlowSplitID) error Increment(ctx context.Context, key string, initialValue int64, increment int64) int64 IncrementWithError(ctx context.Context, key string, increment int64, errIncrement int64) (count int64, errCount int64) }
Storage is the interface implemented by external storage service
type WebMethodStep ¶
type WebMethodStep struct { BaseStep Method string `json:"method,omitempty"` URL string `json:"url,omitempty"` }
WebMethodStep describes a step that makes an HTTP request, and sends the response to the Next step
func (*WebMethodStep) PrepareMarshal ¶
func (s *WebMethodStep) PrepareMarshal()
PrepareMarshal sets the step type
func (*WebMethodStep) Validate ¶
func (s *WebMethodStep) Validate() []error
Validate checks the method and URL are OK