pipeline

package
v0.0.0-...-678c32f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2020 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetAllRegisteredJoints

func GetAllRegisteredJoints() map[string]interface{}

func RegisterPipeJoint

func RegisterPipeJoint(joint Processor)

func RegisterPipeJointWithName

func RegisterPipeJointWithName(jointName string, joint Processor)

Types

type Context

type Context struct {
	Parameters

	SequenceID   int64       `json:"sequence"`
	IsSimulate   bool        `json:"is_simulate"`
	IgnoreBroken bool        `json:"ignore_broken"`
	Payload      interface{} `json:"-"`

	PipelineID string
	// contains filtered or unexported fields
}

func UnMarshall

func UnMarshall(b []byte) Context

func (*Context) End

func (context *Context) End(msg interface{})

End break all pipelines, but the end phrase not included

func (*Context) Exit

func (context *Context) Exit(msg interface{})

Exit tells pipeline to exit

func (*Context) IsEnd

func (context *Context) IsEnd() bool

IsEnd indicates whether the pipe process is end, end means no more processes will be execute

func (*Context) IsExit

func (context *Context) IsExit() bool

IsExit means all pipelines will be broke and jump to outside, even the end phrase will not be executed as well

func (*Context) Marshall

func (context *Context) Marshall() []byte

type ParaKey

type ParaKey string

type Parameters

type Parameters struct {
	Data map[string]interface{} `json:"data,omitempty"`
	// contains filtered or unexported fields
}

func (*Parameters) Get

func (para *Parameters) Get(key ParaKey) interface{}

func (*Parameters) GetArray

func (para *Parameters) GetArray(key ParaKey) ([]interface{}, bool)

GetArray will return a array which type of the items are interface {}

func (*Parameters) GetBool

func (para *Parameters) GetBool(key ParaKey, defaultV bool) bool

func (*Parameters) GetBytes

func (para *Parameters) GetBytes(key ParaKey) ([]byte, bool)

func (*Parameters) GetInt

func (para *Parameters) GetInt(key ParaKey, defaultV int) (int, bool)

func (*Parameters) GetInt64

func (para *Parameters) GetInt64(key ParaKey, defaultV int64) (int64, bool)

func (*Parameters) GetInt64OrDefault

func (para *Parameters) GetInt64OrDefault(key ParaKey, defaultV int64) int64

func (*Parameters) GetIntOrDefault

func (para *Parameters) GetIntOrDefault(key ParaKey, defaultV int) int

func (*Parameters) GetMap

func (para *Parameters) GetMap(key ParaKey) (map[string]interface{}, bool)

func (*Parameters) GetOrDefault

func (para *Parameters) GetOrDefault(key ParaKey, val interface{}) interface{}

func (*Parameters) GetString

func (para *Parameters) GetString(key ParaKey) (string, bool)

func (*Parameters) GetStringArray

func (para *Parameters) GetStringArray(key ParaKey) ([]string, bool)

func (*Parameters) GetStringMap

func (para *Parameters) GetStringMap(key ParaKey) (result map[string]string, ok bool)

func (*Parameters) GetStringOrDefault

func (para *Parameters) GetStringOrDefault(key ParaKey, val string) string

func (*Parameters) GetTime

func (para *Parameters) GetTime(key ParaKey) (time.Time, bool)

func (*Parameters) Has

func (para *Parameters) Has(key ParaKey) bool

func (*Parameters) MustGet

func (para *Parameters) MustGet(key ParaKey) interface{}

func (*Parameters) MustGetArray

func (para *Parameters) MustGetArray(key ParaKey) []interface{}

func (*Parameters) MustGetBytes

func (para *Parameters) MustGetBytes(key ParaKey) []byte

func (*Parameters) MustGetInt

func (para *Parameters) MustGetInt(key ParaKey) int

MustGetInt return 0 if not key was found

func (*Parameters) MustGetInt64

func (para *Parameters) MustGetInt64(key ParaKey) int64

func (*Parameters) MustGetMap

func (para *Parameters) MustGetMap(key ParaKey) map[string]interface{}

func (*Parameters) MustGetString

func (para *Parameters) MustGetString(key ParaKey) string

func (*Parameters) MustGetStringArray

func (para *Parameters) MustGetStringArray(key ParaKey) []string

func (*Parameters) MustGetTime

func (para *Parameters) MustGetTime(key ParaKey) time.Time

func (*Parameters) Set

func (para *Parameters) Set(key ParaKey, value interface{})

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(name string) *Pipeline

func NewPipelineFromConfig

func NewPipelineFromConfig(name string, config *PipelineConfig, context *Context) *Pipeline

func (*Pipeline) Context

func (pipe *Pipeline) Context(s *Context) *Pipeline

func (*Pipeline) CurrentProcessor

func (pipe *Pipeline) CurrentProcessor() string

func (*Pipeline) End

func (pipe *Pipeline) End(s Processor) *Pipeline

func (*Pipeline) Error

func (pipe *Pipeline) Error(s Processor) *Pipeline

func (*Pipeline) GetContext

func (pipe *Pipeline) GetContext() *Context

func (*Pipeline) GetID

func (pipe *Pipeline) GetID() string

func (*Pipeline) Join

func (pipe *Pipeline) Join(s Processor) *Pipeline

func (*Pipeline) Pause

func (pipe *Pipeline) Pause() *Context

func (*Pipeline) Resume

func (pipe *Pipeline) Resume() *Context

func (*Pipeline) Run

func (pipe *Pipeline) Run() *Context

func (*Pipeline) Start

func (pipe *Pipeline) Start(s Processor) *Pipeline

type PipelineConfig

type PipelineConfig struct {
	ID             string             `gorm:"not null;unique;primary_key" json:"id,omitempty" index:"id"`
	Name           string             `json:"name,omitempty" config:"name"`
	StartProcessor *ProcessorConfig   `json:"start,omitempty" config:"start"`
	Processors     []*ProcessorConfig `json:"process,omitempty" config:"process"`
	EndProcessor   *ProcessorConfig   `json:"end,omitempty" config:"end"`
	ErrorProcessor *ProcessorConfig   `json:"error,omitempty" config:"error"`
	Created        time.Time          `json:"created,omitempty"`
	Updated        time.Time          `json:"updated,omitempty"`
	Tags           []string           `json:"tags,omitempty" config:"tags"`
}

PipelineConfig config for each pipeline, a pipeline may have more than one processors

func GetStaticPipelineConfig

func GetStaticPipelineConfig(pipelineID string) PipelineConfig

type Processor

type Processor interface {
	Name() string
	Process(s *Context) error
}

func GetJointInstance

func GetJointInstance(cfg *ProcessorConfig) Processor

type ProcessorConfig

type ProcessorConfig struct {
	Name       string                 `json:"joint" config:"joint"`                     //the joint name
	Parameters map[string]interface{} `json:"parameters,omitempty" config:"parameters"` //kv parameters for this joint
	Enabled    bool                   `json:"enabled" config:"enabled"`
}

ProcessorConfig configs for each joint

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL