plugins

package
v0.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2023 License: MIT Imports: 12 Imported by: 1

Documentation

Index

Constants

View Source
const (
	WorkflowOutputsKey = "outputs"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseManager added in v0.3.0

type BaseManager[T Plugin] struct {
	Type string
	// contains filtered or unexported fields
}

func (*BaseManager[T]) Add added in v0.3.0

func (m *BaseManager[T]) Add(plugin T)

func (*BaseManager[T]) Execute added in v0.3.0

func (p *BaseManager[T]) Execute(ctx context.Context, name string, data *Message) (*Message, error)

func (*BaseManager[T]) Get added in v0.3.0

func (m *BaseManager[T]) Get(name string) (T, error)

func (*BaseManager[T]) Has added in v0.7.0

func (m *BaseManager[T]) Has(name string) bool

type BasePlugin

type BasePlugin struct {
	Name     string
	Executor Executor
}

*

  • This is a base plugin that can be used to create a plugin from a function.

func (*BasePlugin) Execute

func (p *BasePlugin) Execute(ctx context.Context, data *Message) (*Message, error)

func (*BasePlugin) GetName

func (p *BasePlugin) GetName() string

type BasePluginManager added in v0.2.2

type BasePluginManager struct {
	Manager[Plugin]
}

func NewBasePluginManager added in v0.2.2

func NewBasePluginManager() *BasePluginManager

func (*BasePluginManager) AddOrchestrator added in v0.2.2

func (m *BasePluginManager) AddOrchestrator(plugin Plugin)

type BaseRetryPolicy added in v0.3.0

type BaseRetryPolicy struct {
	RetryCount uint64
	Errors     []string
	IsErrorRetryableFunc
}

func (*BaseRetryPolicy) GetRetryCount added in v0.3.0

func (r *BaseRetryPolicy) GetRetryCount() uint64

func (*BaseRetryPolicy) IsErrorRetryable added in v0.3.0

func (r *BaseRetryPolicy) IsErrorRetryable(err error) bool

type BaseRetryableExecutor added in v0.3.0

type BaseRetryableExecutor struct {
	Executor
	Policy RetryPolicy
}

func NewBaseRetryableExecutorFromFunc added in v0.3.0

func NewBaseRetryableExecutorFromFunc(executor Executor, retryCount uint64, IsErrorRetryable IsErrorRetryableFunc) *BaseRetryableExecutor

func (*BaseRetryableExecutor) Execute added in v0.3.0

func (e *BaseRetryableExecutor) Execute(ctx context.Context, data *Message) (*Message, error)

type BaseStepPlugin

type BaseStepPlugin struct {
	Name        string
	Type        StepType
	Check       Executor
	Main        Executor
	Continue    bool
	Return      bool
	RetryPolicy RetryPolicy
}

func (*BaseStepPlugin) Execute

func (p *BaseStepPlugin) Execute(ctx context.Context, data *Message) (*Message, error)

func (*BaseStepPlugin) GetName

func (p *BaseStepPlugin) GetName() string

func (*BaseStepPlugin) GetRetryPolicy added in v0.3.0

func (p *BaseStepPlugin) GetRetryPolicy() (RetryPolicy, bool)

func (*BaseStepPlugin) GetType added in v0.2.2

func (p *BaseStepPlugin) GetType() StepType

func (*BaseStepPlugin) ShouldContinue added in v0.2.2

func (p *BaseStepPlugin) ShouldContinue() bool

func (*BaseStepPlugin) ShouldExecute

func (p *BaseStepPlugin) ShouldExecute(data *Message) (bool, error)

func (*BaseStepPlugin) ShouldReturn added in v0.2.2

func (p *BaseStepPlugin) ShouldReturn() bool

type BaseWorkflowManager added in v0.3.0

type BaseWorkflowManager struct {
	Manager[WorkflowPlugin]
}

func NewBaseWorkflowManager added in v0.3.0

func NewBaseWorkflowManager() *BaseWorkflowManager

type BaseWorkflowPlugin added in v0.2.2

type BaseWorkflowPlugin struct {
	Name    string
	Version int
	Steps   []StepPlugin
}

func (*BaseWorkflowPlugin) Execute added in v0.2.2

func (p *BaseWorkflowPlugin) Execute(ctx context.Context, input *Message) (*Message, error)

func (*BaseWorkflowPlugin) ExecuteStep added in v0.2.2

func (p *BaseWorkflowPlugin) ExecuteStep(ctx context.Context, stepName string, data *Message) (*Message, error)

func (*BaseWorkflowPlugin) GetName added in v0.2.2

func (p *BaseWorkflowPlugin) GetName() string

func (*BaseWorkflowPlugin) GetStep added in v0.2.2

func (p *BaseWorkflowPlugin) GetStep(name string) (StepPlugin, error)

func (*BaseWorkflowPlugin) GetSteps added in v0.2.2

func (p *BaseWorkflowPlugin) GetSteps() []StepPlugin

func (*BaseWorkflowPlugin) GetVersion added in v0.5.0

func (p *BaseWorkflowPlugin) GetVersion() int

type BloblangPlugin

type BloblangPlugin struct {
	Name string
	// contains filtered or unexported fields
}

*

  • This is transforms the data using bloblang template.

func NewBloblangPlugin

func NewBloblangPlugin(name, template string) (*BloblangPlugin, error)

func (*BloblangPlugin) Execute

func (p *BloblangPlugin) Execute(_ context.Context, input *Message) (*Message, error)

func (*BloblangPlugin) GetName

func (p *BloblangPlugin) GetName() string

type ExecuteFunc added in v0.2.2

type ExecuteFunc func(context.Context, *Message) (*Message, error)

func (ExecuteFunc) Execute added in v0.2.2

func (f ExecuteFunc) Execute(ctx context.Context, data *Message) (*Message, error)

type ExecutionManager added in v0.5.0

type ExecutionManager interface {
	Execute(ctx context.Context, name string, data *Message) (*Message, error)
}

type ExecutionStatus added in v0.6.0

type ExecutionStatus struct {
	Status  ExecutionStatusName `json:"status" yaml:"status"`
	Message string              `json:"message" yaml:"message"`
	// This is used for workflow execution to store the last completed step index
	LastCompletedStepIndex int `json:"last_completed_step_index" yaml:"last_completed_step_index"`
}

func (*ExecutionStatus) GetStatus added in v0.6.0

func (s *ExecutionStatus) GetStatus() ExecutionStatusName

func (*ExecutionStatus) IsCompleted added in v0.6.0

func (s *ExecutionStatus) IsCompleted() bool

func (*ExecutionStatus) IsFailed added in v0.6.0

func (s *ExecutionStatus) IsFailed() bool

func (*ExecutionStatus) IsUnprocessed added in v0.6.0

func (s *ExecutionStatus) IsUnprocessed() bool

func (*ExecutionStatus) SetError added in v0.6.0

func (s *ExecutionStatus) SetError(err error)

type ExecutionStatusName added in v0.6.0

type ExecutionStatusName string
const (
	ExecutionStatusUnprocessed ExecutionStatusName = "unprocessed"
	ExecutionStatusCompleted   ExecutionStatusName = "completed"
	ExecutionStatusFailed      ExecutionStatusName = "failed"
)

type Executor added in v0.2.2

type Executor interface {
	Execute(context.Context, *Message) (*Message, error)
}

func NewBaseRetryableExecutor added in v0.3.0

func NewBaseRetryableExecutor(executor Executor, policy RetryPolicy) Executor

type ExpressionPlugin added in v0.8.0

type ExpressionPlugin struct {
	Name string
	// contains filtered or unexported fields
}

*

  • This is transforms the data using expr template.
  • Ref: github.com/antonmedv/expr

func NewExpressionPlugin added in v0.8.0

func NewExpressionPlugin(name, template string) (*ExpressionPlugin, error)

func (*ExpressionPlugin) Execute added in v0.8.0

func (p *ExpressionPlugin) Execute(_ context.Context, input *Message) (*Message, error)

func (*ExpressionPlugin) GetName added in v0.8.0

func (p *ExpressionPlugin) GetName() string

type IsErrorRetryableFunc added in v0.3.0

type IsErrorRetryableFunc func(error) bool
var AllErrorsRetryable IsErrorRetryableFunc = func(error) bool { return true }

type Manager added in v0.3.0

type Manager[T Plugin] interface {
	ExecutionManager
	Get(name string) (T, error)
	Has(name string) bool
	Add(plugin T)
}

func NewBaseManager added in v0.3.0

func NewBaseManager[T Plugin](typeVal string) Manager[T]

type Message added in v0.2.2

type Message struct {
	Data any `json:"data"`
	// This will be used in workflows to store the original input message
	Input    any             `json:"input"`
	Version  int             `json:"version"`
	Status   ExecutionStatus `json:"status"`
	Metadata map[string]any  `json:"metadata"`
}

func NewMessage added in v0.2.2

func NewMessage(data any) *Message

func NextPluginMessage added in v0.2.2

func NextPluginMessage(nextPlugin string) *Message

func (*Message) Clone added in v0.2.2

func (m *Message) Clone() *Message

func (*Message) GetBool added in v0.2.2

func (m *Message) GetBool() (bool, error)

func (*Message) GetMetadata added in v0.2.2

func (m *Message) GetMetadata(key string) (any, bool)

func (*Message) SetMetadata added in v0.2.2

func (m *Message) SetMetadata(key string, value any)

func (*Message) ToMap added in v0.2.2

func (m *Message) ToMap() map[string]interface{}

func (*Message) WithMetadata added in v0.5.0

func (m *Message) WithMetadata(key string, value any) *Message

type OrchestratorPlugin added in v0.2.1

type OrchestratorPlugin struct {
	// contains filtered or unexported fields
}

func NewOrchestratorPlugin added in v0.2.1

func NewOrchestratorPlugin(manager PluginManager, plugin Plugin) *OrchestratorPlugin

func (*OrchestratorPlugin) Execute added in v0.2.1

func (p *OrchestratorPlugin) Execute(ctx context.Context, data *Message) (*Message, error)

func (*OrchestratorPlugin) GetName added in v0.2.1

func (p *OrchestratorPlugin) GetName() string

type Plugin added in v0.2.2

type Plugin interface {
	Executor
	GetName() string
}

func NewBasePlugin

func NewBasePlugin(name string, executor Executor) Plugin

func NewBasePluginWithRetryPolicy added in v0.4.0

func NewBasePluginWithRetryPolicy(name string, executor Executor, policy RetryPolicy) Plugin

func NewBaseRetryablePlugin added in v0.4.0

func NewBaseRetryablePlugin(plugin Plugin, policy RetryPolicy) Plugin

func NewTransformPlugin

func NewTransformPlugin(name string, fn func(*Message) (*Message, error)) Plugin

type PluginManager

type PluginManager Manager[Plugin]

type RetryPolicy added in v0.3.0

type RetryPolicy interface {
	IsErrorRetryable(error) bool
	GetRetryCount() uint64
}

type RetryableExecutor added in v0.3.0

type RetryableExecutor interface {
	Executor
	GetRetryCount() int
	IsErrorRetryable(error) bool
}

type StepConfig added in v0.2.2

type StepConfig struct {
	Name       string           `json:"name" yaml:"name"`
	CheckBlobl string           `json:"check_blobl" yaml:"check_blobl"`
	CheckExpr  string           `json:"check_expr" yaml:"check_expr"`
	Return     bool             `json:"return" yaml:"return"`
	Continue   bool             `json:"continue" yaml:"continue"`
	Plugin     string           `json:"plugin" yaml:"plugin"`
	Bloblang   string           `json:"bloblang" yaml:"bloblang"`
	Expr       string           `json:"expr" yaml:"expr"`
	Retry      *BaseRetryPolicy `json:"retry" yaml:"retry"`
}

func (*StepConfig) GetType added in v0.2.2

func (c *StepConfig) GetType() StepType

type StepPlugin

type StepPlugin interface {
	Plugin
	GetType() StepType
	ShouldExecute(*Message) (bool, error)
	ShouldReturn() bool
	ShouldContinue() bool
	GetRetryPolicy() (RetryPolicy, bool)
}

func NewBaseStepPlugin added in v0.2.2

func NewBaseStepPlugin(pluginManager PluginManager, config StepConfig) (StepPlugin, error)

type StepType added in v0.2.2

type StepType string
const (
	BloblangStep   StepType = "bloblang"
	PluginStep     StepType = "plugin"
	ExpressionStep StepType = "expression"
	UnknownStep    StepType = "unknown"
)

type TransformFunc added in v0.2.2

type TransformFunc func(*Message) (*Message, error)

func (TransformFunc) Execute added in v0.2.2

func (f TransformFunc) Execute(_ context.Context, data *Message) (*Message, error)

type WorkflowConfig added in v0.2.2

type WorkflowConfig struct {
	Name    string       `json:"name" yaml:"name"`
	Version int          `json:"version" yaml:"version"`
	Steps   []StepConfig `json:"steps" yaml:"steps"`
}

func LoadWorkflowFile added in v0.4.0

func LoadWorkflowFile(workflowFile string) (*WorkflowConfig, error)

func (*WorkflowConfig) GetVersion added in v0.6.0

func (c *WorkflowConfig) GetVersion() int

type WorkflowManager added in v0.3.0

type WorkflowManager Manager[WorkflowPlugin]

type WorkflowPlugin

type WorkflowPlugin interface {
	Plugin
	GetVersion() int
	GetSteps() []StepPlugin
	GetStep(name string) (StepPlugin, error)
	ExecuteStep(ctx context.Context, stepName string, data *Message) (*Message, error)
}

func NewBaseWorkflowPlugin added in v0.2.2

func NewBaseWorkflowPlugin(pluginManager PluginManager, config WorkflowConfig) (WorkflowPlugin, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL