Documentation ¶
Index ¶
- Constants
- type BaseManager
- type BasePlugin
- type BasePluginManager
- type BaseRetryPolicy
- type BaseRetryableExecutor
- type BaseStepPlugin
- func (p *BaseStepPlugin) Execute(ctx context.Context, data *Message) (*Message, error)
- func (p *BaseStepPlugin) GetName() string
- func (p *BaseStepPlugin) GetRetryPolicy() (RetryPolicy, bool)
- func (p *BaseStepPlugin) GetType() StepType
- func (p *BaseStepPlugin) ShouldContinue() bool
- func (p *BaseStepPlugin) ShouldExecute(data *Message) (bool, error)
- func (p *BaseStepPlugin) ShouldReturn() bool
- type BaseWorkflowManager
- type BaseWorkflowPlugin
- func (p *BaseWorkflowPlugin) Execute(ctx context.Context, input *Message) (*Message, error)
- func (p *BaseWorkflowPlugin) ExecuteStep(ctx context.Context, stepName string, data *Message) (*Message, error)
- func (p *BaseWorkflowPlugin) GetName() string
- func (p *BaseWorkflowPlugin) GetStep(name string) (StepPlugin, error)
- func (p *BaseWorkflowPlugin) GetSteps() []StepPlugin
- func (p *BaseWorkflowPlugin) GetVersion() int
- type BloblangPlugin
- type ExecuteFunc
- type ExecutionManager
- type ExecutionStatus
- type ExecutionStatusName
- type Executor
- type ExpressionPlugin
- type IsErrorRetryableFunc
- type Manager
- type Message
- type OrchestratorPlugin
- type Plugin
- func NewBasePlugin(name string, executor Executor) Plugin
- func NewBasePluginWithRetryPolicy(name string, executor Executor, policy RetryPolicy) Plugin
- func NewBaseRetryablePlugin(plugin Plugin, policy RetryPolicy) Plugin
- func NewTransformPlugin(name string, fn func(*Message) (*Message, error)) Plugin
- type PluginManager
- type RetryPolicy
- type RetryableExecutor
- type StepConfig
- type StepPlugin
- type StepType
- type TransformFunc
- type WorkflowConfig
- type WorkflowManager
- type WorkflowPlugin
Constants ¶
View Source
const (
WorkflowOutputsKey = "outputs"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseManager ¶ added in v0.3.0
func (*BaseManager[T]) Add ¶ added in v0.3.0
func (m *BaseManager[T]) Add(plugin T)
func (*BaseManager[T]) Get ¶ added in v0.3.0
func (m *BaseManager[T]) Get(name string) (T, error)
func (*BaseManager[T]) Has ¶ added in v0.7.0
func (m *BaseManager[T]) Has(name string) bool
type BasePlugin ¶
*
- This is a base plugin that can be used to create a plugin from a function.
func (*BasePlugin) GetName ¶
func (p *BasePlugin) GetName() string
type BasePluginManager ¶ added in v0.2.2
func NewBasePluginManager ¶ added in v0.2.2
func NewBasePluginManager() *BasePluginManager
func (*BasePluginManager) AddOrchestrator ¶ added in v0.2.2
func (m *BasePluginManager) AddOrchestrator(plugin Plugin)
type BaseRetryPolicy ¶ added in v0.3.0
type BaseRetryPolicy struct { RetryCount uint64 Errors []string IsErrorRetryableFunc }
func (*BaseRetryPolicy) GetRetryCount ¶ added in v0.3.0
func (r *BaseRetryPolicy) GetRetryCount() uint64
func (*BaseRetryPolicy) IsErrorRetryable ¶ added in v0.3.0
func (r *BaseRetryPolicy) IsErrorRetryable(err error) bool
type BaseRetryableExecutor ¶ added in v0.3.0
type BaseRetryableExecutor struct { Executor Policy RetryPolicy }
func NewBaseRetryableExecutorFromFunc ¶ added in v0.3.0
func NewBaseRetryableExecutorFromFunc(executor Executor, retryCount uint64, IsErrorRetryable IsErrorRetryableFunc) *BaseRetryableExecutor
type BaseStepPlugin ¶
type BaseStepPlugin struct { Name string Type StepType Check Executor Main Executor Continue bool Return bool RetryPolicy RetryPolicy }
func (*BaseStepPlugin) GetName ¶
func (p *BaseStepPlugin) GetName() string
func (*BaseStepPlugin) GetRetryPolicy ¶ added in v0.3.0
func (p *BaseStepPlugin) GetRetryPolicy() (RetryPolicy, bool)
func (*BaseStepPlugin) GetType ¶ added in v0.2.2
func (p *BaseStepPlugin) GetType() StepType
func (*BaseStepPlugin) ShouldContinue ¶ added in v0.2.2
func (p *BaseStepPlugin) ShouldContinue() bool
func (*BaseStepPlugin) ShouldExecute ¶
func (p *BaseStepPlugin) ShouldExecute(data *Message) (bool, error)
func (*BaseStepPlugin) ShouldReturn ¶ added in v0.2.2
func (p *BaseStepPlugin) ShouldReturn() bool
type BaseWorkflowManager ¶ added in v0.3.0
type BaseWorkflowManager struct { Manager[WorkflowPlugin] }
func NewBaseWorkflowManager ¶ added in v0.3.0
func NewBaseWorkflowManager() *BaseWorkflowManager
type BaseWorkflowPlugin ¶ added in v0.2.2
type BaseWorkflowPlugin struct { Name string Version int Steps []StepPlugin }
func (*BaseWorkflowPlugin) ExecuteStep ¶ added in v0.2.2
func (*BaseWorkflowPlugin) GetName ¶ added in v0.2.2
func (p *BaseWorkflowPlugin) GetName() string
func (*BaseWorkflowPlugin) GetStep ¶ added in v0.2.2
func (p *BaseWorkflowPlugin) GetStep(name string) (StepPlugin, error)
func (*BaseWorkflowPlugin) GetSteps ¶ added in v0.2.2
func (p *BaseWorkflowPlugin) GetSteps() []StepPlugin
func (*BaseWorkflowPlugin) GetVersion ¶ added in v0.5.0
func (p *BaseWorkflowPlugin) GetVersion() int
type BloblangPlugin ¶
type BloblangPlugin struct { Name string // contains filtered or unexported fields }
*
- This is transforms the data using bloblang template.
func NewBloblangPlugin ¶
func NewBloblangPlugin(name, template string) (*BloblangPlugin, error)
func (*BloblangPlugin) GetName ¶
func (p *BloblangPlugin) GetName() string
type ExecuteFunc ¶ added in v0.2.2
type ExecutionManager ¶ added in v0.5.0
type ExecutionStatus ¶ added in v0.6.0
type ExecutionStatus struct { Status ExecutionStatusName `json:"status" yaml:"status"` Message string `json:"message" yaml:"message"` // This is used for workflow execution to store the last completed step index LastCompletedStepIndex int `json:"last_completed_step_index" yaml:"last_completed_step_index"` }
func (*ExecutionStatus) GetStatus ¶ added in v0.6.0
func (s *ExecutionStatus) GetStatus() ExecutionStatusName
func (*ExecutionStatus) IsCompleted ¶ added in v0.6.0
func (s *ExecutionStatus) IsCompleted() bool
func (*ExecutionStatus) IsFailed ¶ added in v0.6.0
func (s *ExecutionStatus) IsFailed() bool
func (*ExecutionStatus) IsUnprocessed ¶ added in v0.6.0
func (s *ExecutionStatus) IsUnprocessed() bool
func (*ExecutionStatus) SetError ¶ added in v0.6.0
func (s *ExecutionStatus) SetError(err error)
type ExecutionStatusName ¶ added in v0.6.0
type ExecutionStatusName string
const ( ExecutionStatusUnprocessed ExecutionStatusName = "unprocessed" ExecutionStatusCompleted ExecutionStatusName = "completed" ExecutionStatusFailed ExecutionStatusName = "failed" )
type Executor ¶ added in v0.2.2
func NewBaseRetryableExecutor ¶ added in v0.3.0
func NewBaseRetryableExecutor(executor Executor, policy RetryPolicy) Executor
type ExpressionPlugin ¶ added in v0.8.0
type ExpressionPlugin struct { Name string // contains filtered or unexported fields }
*
- This is transforms the data using expr template.
- Ref: github.com/antonmedv/expr
func NewExpressionPlugin ¶ added in v0.8.0
func NewExpressionPlugin(name, template string) (*ExpressionPlugin, error)
func (*ExpressionPlugin) GetName ¶ added in v0.8.0
func (p *ExpressionPlugin) GetName() string
type IsErrorRetryableFunc ¶ added in v0.3.0
var AllErrorsRetryable IsErrorRetryableFunc = func(error) bool { return true }
type Manager ¶ added in v0.3.0
type Manager[T Plugin] interface { ExecutionManager Get(name string) (T, error) Has(name string) bool Add(plugin T) }
func NewBaseManager ¶ added in v0.3.0
type Message ¶ added in v0.2.2
type Message struct { Data any `json:"data"` // This will be used in workflows to store the original input message Input any `json:"input"` Version int `json:"version"` Status ExecutionStatus `json:"status"` Metadata map[string]any `json:"metadata"` }
func NewMessage ¶ added in v0.2.2
func NextPluginMessage ¶ added in v0.2.2
func (*Message) SetMetadata ¶ added in v0.2.2
type OrchestratorPlugin ¶ added in v0.2.1
type OrchestratorPlugin struct {
// contains filtered or unexported fields
}
func NewOrchestratorPlugin ¶ added in v0.2.1
func NewOrchestratorPlugin(manager PluginManager, plugin Plugin) *OrchestratorPlugin
func (*OrchestratorPlugin) GetName ¶ added in v0.2.1
func (p *OrchestratorPlugin) GetName() string
type Plugin ¶ added in v0.2.2
func NewBasePlugin ¶
func NewBasePluginWithRetryPolicy ¶ added in v0.4.0
func NewBasePluginWithRetryPolicy(name string, executor Executor, policy RetryPolicy) Plugin
func NewBaseRetryablePlugin ¶ added in v0.4.0
func NewBaseRetryablePlugin(plugin Plugin, policy RetryPolicy) Plugin
type PluginManager ¶
type RetryPolicy ¶ added in v0.3.0
type RetryableExecutor ¶ added in v0.3.0
type StepConfig ¶ added in v0.2.2
type StepConfig struct { Name string `json:"name" yaml:"name"` CheckBlobl string `json:"check_blobl" yaml:"check_blobl"` CheckExpr string `json:"check_expr" yaml:"check_expr"` Return bool `json:"return" yaml:"return"` Continue bool `json:"continue" yaml:"continue"` Plugin string `json:"plugin" yaml:"plugin"` Bloblang string `json:"bloblang" yaml:"bloblang"` Expr string `json:"expr" yaml:"expr"` Retry *BaseRetryPolicy `json:"retry" yaml:"retry"` }
func (*StepConfig) GetType ¶ added in v0.2.2
func (c *StepConfig) GetType() StepType
type StepPlugin ¶
type StepPlugin interface { Plugin GetType() StepType ShouldExecute(*Message) (bool, error) ShouldReturn() bool ShouldContinue() bool GetRetryPolicy() (RetryPolicy, bool) }
func NewBaseStepPlugin ¶ added in v0.2.2
func NewBaseStepPlugin(pluginManager PluginManager, config StepConfig) (StepPlugin, error)
type TransformFunc ¶ added in v0.2.2
type WorkflowConfig ¶ added in v0.2.2
type WorkflowConfig struct { Name string `json:"name" yaml:"name"` Version int `json:"version" yaml:"version"` Steps []StepConfig `json:"steps" yaml:"steps"` }
func LoadWorkflowFile ¶ added in v0.4.0
func LoadWorkflowFile(workflowFile string) (*WorkflowConfig, error)
func (*WorkflowConfig) GetVersion ¶ added in v0.6.0
func (c *WorkflowConfig) GetVersion() int
type WorkflowManager ¶ added in v0.3.0
type WorkflowManager Manager[WorkflowPlugin]
type WorkflowPlugin ¶
type WorkflowPlugin interface { Plugin GetVersion() int GetSteps() []StepPlugin GetStep(name string) (StepPlugin, error) ExecuteStep(ctx context.Context, stepName string, data *Message) (*Message, error) }
func NewBaseWorkflowPlugin ¶ added in v0.2.2
func NewBaseWorkflowPlugin(pluginManager PluginManager, config WorkflowConfig) (WorkflowPlugin, error)
Click to show internal directories.
Click to hide internal directories.