Documentation
¶
Index ¶
- Constants
- Variables
- type Aggregator
- type BlankOperation
- type Condition
- type Context
- func (context *Context) Del(key string) error
- func (context *Context) Get(key string) (interface{}, error)
- func (context *Context) GetBool(key string) bool
- func (context *Context) GetBytes(key string) []byte
- func (context *Context) GetInt(key string) int
- func (context *Context) GetNode() string
- func (context *Context) GetRequestId() string
- func (context *Context) GetString(key string) string
- func (context *Context) Set(key string, data interface{}) error
- type Dag
- func (this *Dag) AddEdge(from, to string) error
- func (this *Dag) AddVertex(id string, operations []Operation) *Node
- func (this *Dag) Append(dag *Dag) error
- func (dag *Dag) GetDefinition() (*DagExporter, error)
- func (dag *Dag) GetDefinitionJson() ([]byte, error)
- func (this *Dag) GetEndNode() *Node
- func (this *Dag) GetInitialNode() *Node
- func (this *Dag) GetNode(id string) *Node
- func (this *Dag) GetNodes(dynamicOption string) []string
- func (this *Dag) GetParentNode() *Node
- func (this *Dag) HasBranch() bool
- func (this *Dag) HasEdge() bool
- func (this *Dag) IsExecutionFlow() bool
- func (this *Dag) Validate() error
- type DagExporter
- type DataStore
- type EventHandler
- type ForEach
- type Forwarder
- type Logger
- type Node
- func (this *Node) AddAggregator(aggregator Aggregator)
- func (this *Node) AddCondition(condition Condition)
- func (this *Node) AddConditionalDag(condition string, dag *Dag)
- func (this *Node) AddForEach(foreach ForEach)
- func (this *Node) AddForEachDag(subDag *Dag) error
- func (this *Node) AddForwarder(children string, forwarder Forwarder)
- func (this *Node) AddOperation(operation Operation)
- func (this *Node) AddSubAggregator(aggregator Aggregator)
- func (this *Node) AddSubDag(subDag *Dag) error
- func (this *Node) Children() []*Node
- func (this *Node) Dependency() []*Node
- func (this *Node) Dynamic() bool
- func (this *Node) DynamicIndegree() int
- func (this *Node) GetAggregator() Aggregator
- func (this *Node) GetAllConditionalDags() map[string]*Dag
- func (this *Node) GetCondition() Condition
- func (this *Node) GetConditionalDag(condition string) *Dag
- func (this *Node) GetForEach() ForEach
- func (this *Node) GetForwarder(children string) Forwarder
- func (this *Node) GetSubAggregator() Aggregator
- func (this *Node) GetUniqueId() string
- func (this *Node) Indegree() int
- func (this *Node) Operations() []Operation
- func (this *Node) Outdegree() int
- func (this *Node) ParentDag() *Dag
- func (this *Node) SubDag() *Dag
- type NodeExporter
- type Operation
- type OperationExporter
- type Pipeline
- func (pipeline *Pipeline) ApplyState(state string)
- func (pipeline *Pipeline) CountNodes() int
- func (pipeline *Pipeline) GetAllNodesUniqueId() []string
- func (pipeline *Pipeline) GetCurrentNodeDag() (*Node, *Dag)
- func (pipeline *Pipeline) GetInitialNodeId() string
- func (pipeline *Pipeline) GetNodeExecutionUniqueId(node *Node) string
- func (pipeline *Pipeline) GetState() string
- func (pipeline *Pipeline) SetDag(dag *Dag)
- func (pipeline *Pipeline) UpdatePipelineExecutionPosition(depthAdjustment int, vertex string)
- type PipelineErrorHandler
- type PipelineHandler
- type StateStore
Constants ¶
const ( // StateSuccess denotes success state StateSuccess = "success" // StateFailure denotes failure state StateFailure = "failure" // StateOngoing denotes ongoing state StateOngoing = "ongoing" )
const ( DEPTH_INCREMENT = 1 DEPTH_DECREMENT = -1 DEPTH_SAME = 0 )
Variables ¶
var ( // ERR_NO_VERTEX ERR_NO_VERTEX = fmt.Errorf("dag has no vertex set") // ERR_CYCLIC denotes that dag has a cycle ERR_CYCLIC = fmt.Errorf("dag has cyclic dependency") // ERR_DUPLICATE_EDGE denotes that a dag edge is duplicate ERR_DUPLICATE_EDGE = fmt.Errorf("edge redefined") // ERR_DUPLICATE_VERTEX denotes that a dag edge is duplicate ERR_DUPLICATE_VERTEX = fmt.Errorf("vertex redefined") // ERR_MULTIPLE_START denotes that a dag has more than one start point ERR_MULTIPLE_START = fmt.Errorf("only one start vertex is allowed") // ERR_RECURSIVE_DEP denotes that dag has a recursive dependecy ERR_RECURSIVE_DEP = fmt.Errorf("dag has recursive dependency") // Default forwarder DefaultForwarder = func(data []byte) []byte { return data } )
Functions ¶
This section is empty.
Types ¶
type Aggregator ¶
Aggregator definition for the data aggregator of nodes
type BlankOperation ¶
type BlankOperation struct {
}
func (*BlankOperation) Encode ¶
func (ops *BlankOperation) Encode() []byte
func (*BlankOperation) Execute ¶
func (ops *BlankOperation) Execute(data []byte, option map[string]interface{}) ([]byte, error)
func (*BlankOperation) GetId ¶
func (ops *BlankOperation) GetId() string
func (*BlankOperation) GetProperties ¶
func (ops *BlankOperation) GetProperties() map[string][]string
type Context ¶
type Context struct {
Query url.Values // provides request Query
State string // state of the request
Name string // name of the faas-flow
NodeInput map[string][]byte // stores inputs form each node
// contains filtered or unexported fields
}
Context execution context and execution state
func CreateContext ¶
CreateContext create request context (used by template)
func (*Context) GetRequestId ¶
GetRequestId returns the request id
type Dag ¶
type Dag struct {
Id string
// contains filtered or unexported fields
}
Dag The whole dag
func (*Dag) AddEdge ¶
AddEdge add a directed edge as (from)->(to) If vertex doesn't exists creates them
func (*Dag) Append ¶
Append appends another dag into an existing dag Its a way to define and reuse subdags append causes disconnected dag which must be linked with edge in order to execute
func (*Dag) GetDefinition ¶
func (dag *Dag) GetDefinition() (*DagExporter, error)
GetDefinition represent DAG definition with exporter
func (*Dag) GetDefinitionJson ¶
GetDefinitionJson generate DAG definition as a json
func (*Dag) GetInitialNode ¶
GetInitialNode gets the initial node
func (*Dag) GetParentNode ¶
GetParentNode returns parent node for a subdag
func (*Dag) IsExecutionFlow ¶
IsExecutionFlow check if a dag doesn't use intermediate data
type DagExporter ¶
type DagExporter struct {
Id string `json:"id"`
StartNode string `json:"start-node"`
EndNode string `json:"end-node"`
HasBranch bool `json:"has-branch"`
HasEdge bool `json:"has-edge"`
ExecutionOnlyDag bool `json:"exec-only-dag"`
Nodes map[string]*NodeExporter `json:"nodes"`
IsValid bool `json:"is-valid"`
ValidationError string `json:"validation-error,omitempty"`
}
type DataStore ¶
type DataStore interface {
// Configure the DaraStore with flow name and request ID
Configure(flowName string, requestId string)
// Initialize the DataStore (called only once in a request span)
Init() error
// Set store a value for key, in failure returns error
Set(key string, value []byte) error
// Get retrieves a value by key, if failure returns error
Get(key string) ([]byte, error)
// Del deletes a value by a key
Del(key string) error
// Cleanup all the resources in DataStore
Cleanup() error
//Copy a DataSoure
CopyStore() (DataStore, error)
}
DataStore for Storing Data
type EventHandler ¶
type EventHandler interface {
// Configure the EventHandler with flow name and request ID
Configure(flowName string, requestId string)
// Initialize an EventHandler (called only once in a request span)
Init() error
//copy Store
Copy() (EventHandler, error)
// ReportRequestStart report a start of request
ReportRequestStart(requestId string)
// ReportRequestEnd reports an end of request
ReportRequestEnd(requestId string)
// ReportRequestFailure reports a failure of a request with error
ReportRequestFailure(requestId string, err error)
// ReportExecutionForward report that an execution is forwarded
ReportExecutionForward(nodeId string, requestId string)
// ReportExecutionContinuation report that an execution is being continued
ReportExecutionContinuation(requestId string)
// ReportNodeStart report a start of a Node execution
ReportNodeStart(nodeId string, requestId string)
// ReportNodeStart report an end of a node execution
ReportNodeEnd(nodeId string, requestId string)
// ReportNodeFailure report a Node execution failure with error
ReportNodeFailure(nodeId string, requestId string, err error)
// ReportOperationStart reports start of an operation
ReportOperationStart(operationId string, nodeId string, requestId string)
// ReportOperationEnd reports an end of an operation
ReportOperationEnd(operationId string, nodeId string, requestId string)
// ReportOperationFailure reports failure of an operation with error
ReportOperationFailure(operationId string, nodeId string, requestId string, err error)
// Flush flush the reports
Flush()
}
EventHandler handle flow events
type Logger ¶
type Logger interface {
// Configure configure a logger with flowname and requestID
Configure(flowName string, requestId string)
// Init initialize a logger
Init() error
// Log logs a flow log
Log(str string)
}
Logger logs the flow logs
type Node ¶
type Node struct {
Id string // The id of the vertex
// contains filtered or unexported fields
}
Node The vertex
func (*Node) AddAggregator ¶
func (this *Node) AddAggregator(aggregator Aggregator)
AddAggregator add a aggregator to a node
func (*Node) AddCondition ¶
AddCondition add a condition to a node
func (*Node) AddConditionalDag ¶
AddConditionalDag adds conditional dag to node
func (*Node) AddForEach ¶
AddForEach add a aggregator to a node
func (*Node) AddForEachDag ¶
AddForEachDag adds a foreach subdag to the node
func (*Node) AddForwarder ¶
AddForwarder adds a forwarder for a specific children
func (*Node) AddOperation ¶
AddOperation adds an operation
func (*Node) AddSubAggregator ¶
func (this *Node) AddSubAggregator(aggregator Aggregator)
AddSubAggregator add a foreach aggregator to a node
func (*Node) Dependency ¶
Dependency get all dependency node for a node
func (*Node) DynamicIndegree ¶
DynamicIndegree returns the no of dynamic input in a node
func (*Node) GetAggregator ¶
func (this *Node) GetAggregator() Aggregator
GetAggregator get a aggregator from a node
func (*Node) GetAllConditionalDags ¶
GetAllConditionalDags get all the subdags for all conditions
func (*Node) GetCondition ¶
GetCondition get the condition function
func (*Node) GetConditionalDag ¶
GetConditionalDag get the sundag for a specific condition
func (*Node) GetForEach ¶
GetForEach get the foreach function
func (*Node) GetForwarder ¶
GetForwarder gets a forwarder for a children
func (*Node) GetSubAggregator ¶
func (this *Node) GetSubAggregator() Aggregator
GetSubAggregator gets the subaggregator for condition and foreach
func (*Node) GetUniqueId ¶
GetUniqueId returns a unique ID of the node
func (*Node) Operations ¶
Value provides the ordered list of functions for a node
type NodeExporter ¶
type NodeExporter struct {
Id string `json:"id"`
Index int `json:"node-index"`
UniqueId string `json:"unique-id"` // required to fetch intermediate data and state
IsDynamic bool `json:"is-dynamic"`
IsCondition bool `json:"is-condition"`
IsForeach bool `json:"is-foreach"`
HasAggregator bool `json:"has-aggregator"`
HasSubAggregator bool `json:"has-sub-aggregator"`
HasSubDag bool `json:"has-subdag"`
InDegree int `json:"in-degree"`
OutDegree int `json:"out-degree"`
SubDag *DagExporter `json:"sub-dag,omitempty"`
ForeachDag *DagExporter `json:"foreach-dag,omitempty"`
ConditionalDags map[string]*DagExporter `json:"conditional-dags,omitempty"`
DynamicExecOnly bool `json:"dynamic-exec-only"`
Operations []*OperationExporter `json:"operations,omitempty"`
Children []string `json:"childrens,omitempty"`
ChildrenExecOnly map[string]bool `json:"child-exec-only"`
}
type OperationExporter ¶
type Pipeline ¶
type Pipeline struct {
Dag *Dag `json:"-"` // Dag that will be executed
ExecutionPosition map[string]string `json:"pipeline-execution-position"` // Denotes the node that is executing now
ExecutionDepth int `json:"pipeline-execution-depth"` // Denotes the depth of subgraph its executing
CurrentDynamicOption map[string]string `json:"pipeline-dynamic-option"` // Denotes the current dynamic option mapped against the dynamic Node UQ id
FailureHandler PipelineErrorHandler `json:"-"`
Finally PipelineHandler `json:"-"`
}
func (*Pipeline) ApplyState ¶
ApplyState apply a state to a pipeline by from encoded JSON pipeline
func (*Pipeline) CountNodes ¶
CountNodes counts the no of node added in the Pipeline Dag. It doesn't count subdags node
func (*Pipeline) GetAllNodesUniqueId ¶
GetAllNodesId returns a recursive list of all nodes that belongs to the pipeline
func (*Pipeline) GetCurrentNodeDag ¶
GetCurrentNodeDag returns the current node and current dag based on execution position
func (*Pipeline) GetInitialNodeId ¶
GetInitialNodeId Get the very first node of the pipeline
func (*Pipeline) GetNodeExecutionUniqueId ¶
GetNodeExecutionUniqueId provide a ID that is unique in an execution
func (*Pipeline) UpdatePipelineExecutionPosition ¶
UpdatePipelineExecutionPosition updates pipeline execution position specified depthAdjustment and vertex denotes how the ExecutionPosition must be altered
type PipelineErrorHandler ¶
PipelineErrorHandler the error handler OnFailure() registration on pipeline
type PipelineHandler ¶
type PipelineHandler func(string)
PipelineHandler definition for the Finally() registration on pipeline
type StateStore ¶
type StateStore interface {
// Configure the StateStore with flow name and request ID
Configure(flowName string, requestId string)
// Initialize the StateStore (called only once in a request span)
Init() error
// Set a value (override existing, or create one)
Set(key string, value string) error
// Get a value
Get(key string) (string, error)
// Increase the value of key with a given increment
Incr(key string, value int64) (int64, error)
// Compare and Update a value
Update(key string, oldValue string, newValue string) error
// Cleanup all the resources in StateStore (called only once in a request span)
Cleanup() error
//copy Store
CopyStore() (StateStore, error)
}
StateStore for saving execution state