sdk

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2021 License: MIT Imports: 3 Imported by: 2

README

SDK - The core module of faas-flow project

Build GoDoc

Documentation

Index

Constants

View Source
const (
	// StateSuccess denotes success state
	StateSuccess = "success"
	// StateFailure denotes failure state
	StateFailure = "failure"
	// StateOngoing denotes ongoing state
	StateOngoing = "ongoing"
)
View Source
const (
	DEPTH_INCREMENT = 1
	DEPTH_DECREMENT = -1
	DEPTH_SAME      = 0
)

Variables

View Source
var (
	// ERR_NO_VERTEX
	ERR_NO_VERTEX = fmt.Errorf("dag has no vertex set")
	// ERR_CYCLIC denotes that dag has a cycle
	ERR_CYCLIC = fmt.Errorf("dag has cyclic dependency")
	// ERR_DUPLICATE_EDGE denotes that a dag edge is duplicate
	ERR_DUPLICATE_EDGE = fmt.Errorf("edge redefined")
	// ERR_DUPLICATE_VERTEX denotes that a dag edge is duplicate
	ERR_DUPLICATE_VERTEX = fmt.Errorf("vertex redefined")
	// ERR_MULTIPLE_START denotes that a dag has more than one start point
	ERR_MULTIPLE_START = fmt.Errorf("only one start vertex is allowed")
	// ERR_RECURSIVE_DEP denotes that dag has a recursive dependecy
	ERR_RECURSIVE_DEP = fmt.Errorf("dag has recursive dependency")
	// Default forwarder
	DefaultForwarder = func(data []byte) []byte { return data }
)

Functions

func GetPipelineDefinition

func GetPipelineDefinition(pipeline *Pipeline) string

GetPipelineDefinition generate pipeline DAG defintion as a json

Types

type Aggregator

type Aggregator func(map[string][]byte) ([]byte, error)

Aggregator definition for the data aggregator of nodes

type BlankOperation

type BlankOperation struct {
}

func (*BlankOperation) Encode

func (ops *BlankOperation) Encode() []byte

func (*BlankOperation) Execute

func (ops *BlankOperation) Execute(data []byte, option map[string]interface{}) ([]byte, error)

func (*BlankOperation) GetId

func (ops *BlankOperation) GetId() string

func (*BlankOperation) GetProperties

func (ops *BlankOperation) GetProperties() map[string][]string

type Condition

type Condition func([]byte) []string

Condition definition for the condition function

type Context

type Context struct {
	Query url.Values // provides request Query
	State string     // state of the request
	Name  string     // name of the faas-flow

	NodeInput map[string][]byte // stores inputs form each node
	// contains filtered or unexported fields
}

Context execution context and execution state

func CreateContext

func CreateContext(id string, node string, name string,
	dstore DataStore) *Context

CreateContext create request context (used by template)

func (*Context) Del

func (context *Context) Del(key string) error

Del deletes a value from the context using DataStore

func (*Context) Get

func (context *Context) Get(key string) (interface{}, error)

Get retrieve a value from the context using DataStore

func (*Context) GetBool

func (context *Context) GetBool(key string) bool

GetBool retrieve a boolean value from the context using DataStore

func (*Context) GetBytes

func (context *Context) GetBytes(key string) []byte

GetBytes retrieve a byte array from the context using DataStore

func (*Context) GetInt

func (context *Context) GetInt(key string) int

GetInt retrieve a integer value from the context using DataStore

func (*Context) GetNode

func (context *Context) GetNode() string

GetPhase return the node no

func (*Context) GetRequestId

func (context *Context) GetRequestId() string

GetRequestId returns the request id

func (*Context) GetString

func (context *Context) GetString(key string) string

GetString retrieve a string value from the context using DataStore

func (*Context) Set

func (context *Context) Set(key string, data interface{}) error

Set put a value in the context using DataStore

type Dag

type Dag struct {
	Id string
	// contains filtered or unexported fields
}

Dag The whole dag

func NewDag

func NewDag() *Dag

NewDag Creates a Dag

func (*Dag) AddEdge

func (this *Dag) AddEdge(from, to string) error

AddEdge add a directed edge as (from)->(to) If vertex doesn't exists creates them

func (*Dag) AddVertex

func (this *Dag) AddVertex(id string, operations []Operation) *Node

AddVertex create a vertex with id and operations

func (*Dag) Append

func (this *Dag) Append(dag *Dag) error

Append appends another dag into an existing dag Its a way to define and reuse subdags append causes disconnected dag which must be linked with edge in order to execute

func (*Dag) GetEndNode

func (this *Dag) GetEndNode() *Node

GetEndNode gets the end node

func (*Dag) GetInitialNode

func (this *Dag) GetInitialNode() *Node

GetInitialNode gets the initial node

func (*Dag) GetNode

func (this *Dag) GetNode(id string) *Node

GetNode get a node by Id

func (*Dag) GetNodes

func (this *Dag) GetNodes(dynamicOption string) []string

GetNodes returns a list of nodes (including subdags) belong to the dag

func (*Dag) GetParentNode

func (this *Dag) GetParentNode() *Node

GetParentNode returns parent node for a subdag

func (*Dag) HasBranch

func (this *Dag) HasBranch() bool

HasBranch check if dag or its subdags has branch

func (*Dag) HasEdge

func (this *Dag) HasEdge() bool

HasEdge check if dag or its subdags has edge

func (*Dag) IsExecutionFlow

func (this *Dag) IsExecutionFlow() bool

IsExecutionFlow check if a dag doesn't use intermediate data

func (*Dag) Validate

func (this *Dag) Validate() error

Validate validates a dag and all subdag as per faas-flow dag requirments A validated graph has only one initialNode and one EndNode set if a graph has more than one endnode, a seperate endnode gets added

type DagExporter

type DagExporter struct {
	Id               string                   `json:"id"`
	StartNode        string                   `json:"start-node"`
	EndNode          string                   `json:"end-node"`
	HasBranch        bool                     `json:"has-branch"`
	HasEdge          bool                     `json:"has-edge"`
	ExecutionOnlyDag bool                     `json:"exec-only-dag"`
	Nodes            map[string]*NodeExporter `json:"nodes"`

	IsValid         bool   `json:"is-valid"`
	ValidationError string `json:"validation-error,omitempty"`
}

type DataStore

type DataStore interface {
	// Configure the DaraStore with flow name and request ID
	Configure(flowName string, requestId string)
	// Initialize the DataStore (called only once in a request span)
	Init() error
	// Set store a value for key, in failure returns error
	Set(key string, value []byte) error
	// Get retrieves a value by key, if failure returns error
	Get(key string) ([]byte, error)
	// Del deletes a value by a key
	Del(key string) error
	// Cleanup all the resources in DataStore
	Cleanup() error
}

DataStore for Storing Data

type EventHandler

type EventHandler interface {
	// Configure the EventHandler with flow name and request ID
	Configure(flowName string, requestId string)
	// Initialize an EventHandler (called only once in a request span)
	Init() error
	// ReportRequestStart report a start of request
	ReportRequestStart(requestId string)
	// ReportRequestEnd reports an end of request
	ReportRequestEnd(requestId string)
	// ReportRequestFailure reports a failure of a request with error
	ReportRequestFailure(requestId string, err error)
	// ReportExecutionForward report that an execution is forwarded
	ReportExecutionForward(nodeId string, requestId string)
	// ReportExecutionContinuation report that an execution is being continued
	ReportExecutionContinuation(requestId string)
	// ReportNodeStart report a start of a Node execution
	ReportNodeStart(nodeId string, requestId string)
	// ReportNodeStart report an end of a node execution
	ReportNodeEnd(nodeId string, requestId string)
	// ReportNodeFailure report a Node execution failure with error
	ReportNodeFailure(nodeId string, requestId string, err error)
	// ReportOperationStart reports start of an operation
	ReportOperationStart(operationId string, nodeId string, requestId string)
	// ReportOperationEnd reports an end of an operation
	ReportOperationEnd(operationId string, nodeId string, requestId string)
	// ReportOperationFailure reports failure of an operation with error
	ReportOperationFailure(operationId string, nodeId string, requestId string, err error)
	// Flush flush the reports
	Flush()
}

EventHandler handle flow events

type ForEach

type ForEach func([]byte) map[string][]byte

ForEach definition for the foreach function

type Forwarder

type Forwarder func([]byte) []byte

Forwarder definition for the data forwarder of nodes

type Logger

type Logger interface {
	// Configure configure a logger with flowname and requestID
	Configure(flowName string, requestId string)
	// Init initialize a logger
	Init() error
	// Log logs a flow log
	Log(str string)
}

Logger logs the flow logs

type Node

type Node struct {
	Id string // The id of the vertex
	// contains filtered or unexported fields
}

Node The vertex

func (*Node) AddAggregator

func (this *Node) AddAggregator(aggregator Aggregator)

AddAggregator add a aggregator to a node

func (*Node) AddCondition

func (this *Node) AddCondition(condition Condition)

AddCondition add a condition to a node

func (*Node) AddConditionalDag

func (this *Node) AddConditionalDag(condition string, dag *Dag)

AddConditionalDag adds conditional dag to node

func (*Node) AddForEach

func (this *Node) AddForEach(foreach ForEach)

AddForEach add a aggregator to a node

func (*Node) AddForEachDag

func (this *Node) AddForEachDag(subDag *Dag) error

AddForEachDag adds a foreach subdag to the node

func (*Node) AddForwarder

func (this *Node) AddForwarder(children string, forwarder Forwarder)

AddForwarder adds a forwarder for a specific children

func (*Node) AddOperation

func (this *Node) AddOperation(operation Operation)

AddOperation adds an operation

func (*Node) AddSubAggregator

func (this *Node) AddSubAggregator(aggregator Aggregator)

AddSubAggregator add a foreach aggregator to a node

func (*Node) AddSubDag

func (this *Node) AddSubDag(subDag *Dag) error

AddSubDag adds a subdag to the node

func (*Node) Children

func (this *Node) Children() []*Node

Children get all children node for a node

func (*Node) Dependency

func (this *Node) Dependency() []*Node

Dependency get all dependency node for a node

func (*Node) Dynamic

func (this *Node) Dynamic() bool

Dynamic checks if the node is dynamic

func (*Node) DynamicIndegree

func (this *Node) DynamicIndegree() int

DynamicIndegree returns the no of dynamic input in a node

func (*Node) GetAggregator

func (this *Node) GetAggregator() Aggregator

GetAggregator get a aggregator from a node

func (*Node) GetAllConditionalDags

func (this *Node) GetAllConditionalDags() map[string]*Dag

GetAllConditionalDags get all the subdags for all conditions

func (*Node) GetCondition

func (this *Node) GetCondition() Condition

GetCondition get the condition function

func (*Node) GetConditionalDag

func (this *Node) GetConditionalDag(condition string) *Dag

GetConditionalDag get the sundag for a specific condition

func (*Node) GetForEach

func (this *Node) GetForEach() ForEach

GetForEach get the foreach function

func (*Node) GetForwarder

func (this *Node) GetForwarder(children string) Forwarder

GetForwarder gets a forwarder for a children

func (*Node) GetSubAggregator

func (this *Node) GetSubAggregator() Aggregator

GetSubAggregator gets the subaggregator for condition and foreach

func (*Node) GetUniqueId

func (this *Node) GetUniqueId() string

GetUniqueId returns a unique ID of the node

func (*Node) Indegree

func (this *Node) Indegree() int

Indegree returns the no of input in a node

func (*Node) Operations

func (this *Node) Operations() []Operation

Value provides the ordered list of functions for a node

func (*Node) Outdegree

func (this *Node) Outdegree() int

Outdegree returns the no of output in a node

func (*Node) ParentDag

func (this *Node) ParentDag() *Dag

ParentDag returns the parent dag of the node

func (*Node) SubDag

func (this *Node) SubDag() *Dag

SubDag returns the subdag added in a node

type NodeExporter

type NodeExporter struct {
	Id       string `json:"id"`
	Index    int    `json:"node-index"`
	UniqueId string `json:"unique-id"` // required to fetch intermediate data and state

	IsDynamic        bool `json:"is-dynamic"`
	IsCondition      bool `json:"is-condition"`
	IsForeach        bool `json:"is-foreach"`
	HasAggregator    bool `json:"has-aggregator"`
	HasSubAggregator bool `json:"has-sub-aggregator"`
	HasSubDag        bool `json:"has-subdag"`
	InDegree         int  `json:"in-degree"`
	OutDegree        int  `json:"out-degree"`

	SubDag          *DagExporter            `json:"sub-dag,omitempty"`
	ForeachDag      *DagExporter            `json:"foreach-dag,omitempty"`
	ConditionalDags map[string]*DagExporter `json:"conditional-dags,omitempty"`
	DynamicExecOnly bool                    `json:"dynamic-exec-only"`
	Operations      []*OperationExporter    `json:"operations,omitempty"`

	Children         []string        `json:"childrens,omitempty"`
	ChildrenExecOnly map[string]bool `json:"child-exec-only"`
}

type Operation

type Operation interface {
	GetId() string
	Encode() []byte
	GetProperties() map[string][]string
	// Execute executes an operation, executor can pass configuration
	Execute([]byte, map[string]interface{}) ([]byte, error)
}

type OperationExporter

type OperationExporter struct {
	Name       string              `json:"name"`
	Properties map[string][]string `json:"properties"`
}

type Pipeline

type Pipeline struct {
	Dag *Dag `json:"-"` // Dag that will be executed

	ExecutionPosition map[string]string `json:"pipeline-execution-position"` // Denotes the node that is executing now
	ExecutionDepth    int               `json:"pipeline-execution-depth"`    // Denotes the depth of subgraph its executing

	CurrentDynamicOption map[string]string `json:"pipeline-dynamic-option"` // Denotes the current dynamic option mapped against the dynamic Node UQ id

	FailureHandler PipelineErrorHandler `json:"-"`
	Finally        PipelineHandler      `json:"-"`
}

func CreatePipeline

func CreatePipeline() *Pipeline

CreatePipeline creates a faasflow pipeline

func (*Pipeline) ApplyState

func (pipeline *Pipeline) ApplyState(state string)

ApplyState apply a state to a pipeline by from encoded JSON pipeline

func (*Pipeline) CountNodes

func (pipeline *Pipeline) CountNodes() int

CountNodes counts the no of node added in the Pipeline Dag. It doesn't count subdags node

func (*Pipeline) GetAllNodesUniqueId

func (pipeline *Pipeline) GetAllNodesUniqueId() []string

GetAllNodesId returns a recursive list of all nodes that belongs to the pipeline

func (*Pipeline) GetCurrentNodeDag

func (pipeline *Pipeline) GetCurrentNodeDag() (*Node, *Dag)

GetCurrentNodeDag returns the current node and current dag based on execution position

func (*Pipeline) GetInitialNodeId

func (pipeline *Pipeline) GetInitialNodeId() string

GetInitialNodeId Get the very first node of the pipeline

func (*Pipeline) GetNodeExecutionUniqueId

func (pipeline *Pipeline) GetNodeExecutionUniqueId(node *Node) string

GetNodeExecutionUniqueId provide a ID that is unique in an execution

func (*Pipeline) GetState

func (pipeline *Pipeline) GetState() string

GetState get a state of a pipeline by encoding in JSON

func (*Pipeline) SetDag

func (pipeline *Pipeline) SetDag(dag *Dag)

SetDag overrides the default dag

func (*Pipeline) UpdatePipelineExecutionPosition

func (pipeline *Pipeline) UpdatePipelineExecutionPosition(depthAdjustment int, vertex string)

UpdatePipelineExecutionPosition updates pipeline execution position specified depthAdjustment and vertex denotes how the ExecutionPosition must be altered

type PipelineErrorHandler

type PipelineErrorHandler func(error) ([]byte, error)

PipelineErrorHandler the error handler OnFailure() registration on pipeline

type PipelineHandler

type PipelineHandler func(string)

PipelineHandler definition for the Finally() registration on pipeline

type StateStore

type StateStore interface {
	// Configure the StateStore with flow name and request ID
	Configure(flowName string, requestId string)
	// Initialize the StateStore (called only once in a request span)
	Init() error
	// Set a value (override existing, or create one)
	Set(key string, value string) error
	// Get a value
	Get(key string) (string, error)
	// Compare and Update a value
	Update(key string, oldValue string, newValue string) error
	// Cleanup all the resources in StateStore (called only once in a request span)
	Cleanup() error
}

StateStore for saving execution state

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL