flow

package module
v0.14.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2024 License: MIT Imports: 37 Imported by: 0

README

flow

实例代码

查看 examples

生成 echo 代码

cd $GOPATH/src
protoc -I. --gogo_out=:. --flow_out=:. github.com/vine-io/flow/examples/pb/hello.proto

启动 etcd

etcd

启动 server 和 client

go build -o _output/server examples/server/server.go
go build -o _output/client examples/client/client.go

_output/server
_output/client

下载 protoc-gen-flow

bash -c tool/install.sh

Documentation

Index

Constants

View Source
const (
	BpmnVisit       = "visit"
	BpmnLeave       = "leave"
	BpmnActiveStart = "activeStart"
	BpmnActiveEnd   = "activeEnd"
	BpmnComplete    = "complete"
	BpmnError       = "error"
	BpmnCancel      = "cancel"
)

Variables

View Source
var (
	Root         = "/vine.io/flow"
	WorkflowPath = path.Join(Root, "wf")
	ErrAborted   = errors.New("be aborted")
)

Functions

func EchoToAPI

func EchoToAPI(echo Echo) *api.Echo

EchoToAPI returns a new instance *api.Echo based on the specified Echo interface implementation.

func EntityToAPI

func EntityToAPI(entity Entity) *api.Entity

EntityToAPI returns a new instance *api.Entity based on the specified Entity interface implementation.

func ExtractFields

func ExtractFields(t any) []string

func GetTypePkgName

func GetTypePkgName(p reflect.Type) string

GetTypePkgName returns the package path and kind for object based on reflect.Type.

func HashName added in v0.11.4

func HashName(text string) string

func InjectTypeFields

func InjectTypeFields(t any, items map[string]string, entity string) error

func IsCancelErr added in v0.10.0

func IsCancelErr(err error) bool

func IsRetriedErr

func IsRetriedErr(err error) (int32, bool)

IsRetriedErr returns boolean value if the error is specified code.

func IsShadowErr added in v0.10.0

func IsShadowErr(err error) bool

func Load

func Load(tps ...any)

func NewOptions

func NewOptions(opts ...Option) *api.WorkflowOption

func OliveEscape added in v0.13.2

func OliveEscape(text string) string

func OliveUnEscape added in v0.13.2

func OliveUnEscape(text string) string

func ProvideWithName added in v0.9.1

func ProvideWithName(name string, value any) error

func Provides added in v0.9.1

func Provides(values ...any) error

func StepToAPI

func StepToAPI(step Step) *api.Step

StepToAPI returns a new instance *api.Step based on the specified Step interface implementation.

func StepToWorkStep

func StepToWorkStep(step Step, worker string) *api.WorkflowStep

StepToWorkStep returns a new instance *api.WorkflowStep based on the specified Step interface implementation.

Types

type CallPack

type CallPack struct {
	// contains filtered or unexported fields
}

func NewCall

func NewCall(ctx context.Context, chunk *api.PipeCallRequest) *CallPack

func (*CallPack) Apply added in v0.9.9

func (p *CallPack) Apply(data []byte)

func (*CallPack) ApplyErr added in v0.9.9

func (p *CallPack) ApplyErr(err error)

func (*CallPack) Destroy

func (p *CallPack) Destroy()

type CellStep added in v0.8.2

type CellStep struct {
	EmptyStep
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(cfg ClientConfig, attrs map[string]string) (*Client, error)

func (*Client) AbortWorkflowInstance added in v0.10.0

func (c *Client) AbortWorkflowInstance(ctx context.Context, wid string) error

func (*Client) Call

func (c *Client) Call(ctx context.Context, client, name string, data []byte, opts ...vclient.CallOption) ([]byte, error)

func (*Client) CommitInteractive added in v0.10.0

func (c *Client) CommitInteractive(ctx context.Context, pid, sid string, properties map[string]string, opts ...vclient.CallOption) error

func (*Client) ExecuteWorkflowInstance added in v0.10.0

func (c *Client) ExecuteWorkflowInstance(ctx context.Context, id, name, definitions string, dataObjects, properties map[string]string, watch bool) (WorkflowWatcher, error)

func (*Client) GetWorker added in v0.12.2

func (c *Client) GetWorker(ctx context.Context, id string) (*api.Worker, error)

func (*Client) HandleServiceErr added in v0.14.0

func (c *Client) HandleServiceErr(ctx context.Context, req *api.ErrHandleRequest, opts ...vclient.CallOption) error

func (*Client) Id

func (c *Client) Id() string

func (*Client) InspectWorkflowInstance added in v0.10.0

func (c *Client) InspectWorkflowInstance(ctx context.Context, wid string) (*api.Workflow, error)

func (*Client) ListInteractive added in v0.10.0

func (c *Client) ListInteractive(ctx context.Context, pid string, opts ...vclient.CallOption) ([]*api.Interactive, error)

func (*Client) ListRegistry added in v0.2.0

func (c *Client) ListRegistry(ctx context.Context) ([]*api.Entity, []*api.Echo, []*api.Step, error)

func (*Client) ListWorkFlowInstance added in v0.10.0

func (c *Client) ListWorkFlowInstance(ctx context.Context) ([]*api.WorkflowSnapshot, error)

func (*Client) ListWorker added in v0.2.0

func (c *Client) ListWorker(ctx context.Context) ([]*api.Worker, error)

func (*Client) NewSession

func (c *Client) NewSession() (*PipeSession, error)

func (*Client) PauseWorkflowInstance added in v0.10.0

func (c *Client) PauseWorkflowInstance(ctx context.Context, wid string) error

func (*Client) ResumeWorkflowInstance added in v0.10.0

func (c *Client) ResumeWorkflowInstance(ctx context.Context, wid string) error

func (*Client) Step

func (c *Client) Step(ctx context.Context, target string, step Step, items map[string]string, opts ...vclient.CallOption) (map[string]string, error)

func (*Client) WatchWorkflowInstance added in v0.10.0

func (c *Client) WatchWorkflowInstance(ctx context.Context, wid string, opts ...vclient.CallOption) (WorkflowWatcher, error)

func (*Client) WorkHook added in v0.12.2

func (c *Client) WorkHook(ctx context.Context) (WorkHookWatcher, error)

type ClientConfig

type ClientConfig struct {
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig(name, id, address string) ClientConfig

func (*ClientConfig) WithConn added in v0.2.2

func (c *ClientConfig) WithConn(conn vclient.Client) *ClientConfig

func (*ClientConfig) WithHeartbeat added in v0.2.2

func (c *ClientConfig) WithHeartbeat(t time.Duration) *ClientConfig

func (*ClientConfig) WithStore added in v0.4.3

func (c *ClientConfig) WithStore(store *ClientStore) *ClientConfig

func (*ClientConfig) WithTimeout added in v0.2.2

func (c *ClientConfig) WithTimeout(t time.Duration) *ClientConfig

type ClientPipe

type ClientPipe struct {
	Id string
	// contains filtered or unexported fields
}

func NewPipe

func NewPipe(ctx context.Context, id string, pr *Peer, stream PipeStream) *ClientPipe

func (*ClientPipe) Call

func (p *ClientPipe) Call(pack *CallPack) (<-chan []byte, <-chan error)

func (*ClientPipe) Close

func (p *ClientPipe) Close()

func (*ClientPipe) Start

func (p *ClientPipe) Start()

func (*ClientPipe) Step

func (p *ClientPipe) Step(pack *StepPack) (<-chan map[string]string, <-chan error)

type ClientStore

type ClientStore struct {
	// contains filtered or unexported fields
}

func NewClientStore

func NewClientStore() *ClientStore

func (*ClientStore) GetEcho

func (s *ClientStore) GetEcho(name string) (Echo, bool)

func (*ClientStore) GetEntity

func (s *ClientStore) GetEntity(kind string) (Entity, bool)

func (*ClientStore) GetStep

func (s *ClientStore) GetStep(name string) (Step, bool)

func (*ClientStore) Load added in v0.4.3

func (s *ClientStore) Load(ts ...any)

func (*ClientStore) PopulateEcho added in v0.9.1

func (s *ClientStore) PopulateEcho(name string) (Echo, error)

func (*ClientStore) PopulateStep added in v0.9.1

func (s *ClientStore) PopulateStep(name string) (Step, error)

func (*ClientStore) ProvideWithName added in v0.9.1

func (s *ClientStore) ProvideWithName(name string, value any) error

func (*ClientStore) Provides added in v0.9.1

func (s *ClientStore) Provides(values ...any) error

type Echo

type Echo interface {
	// Owner 所属 Entity 信息
	Owner() reflect.Type

	Call(ctx context.Context, data []byte) ([]byte, error)
	// Desc Echo 描述信息
	Desc() string
}

Echo 描述一个具体的请求

type EchoSet

type EchoSet struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewEchoSet

func NewEchoSet() *EchoSet

func (*EchoSet) Add

func (s *EchoSet) Add(echo *api.Echo)

func (*EchoSet) Contains

func (s *EchoSet) Contains(name string) bool

func (*EchoSet) Del

func (s *EchoSet) Del(echo *api.Echo)

func (*EchoSet) Get

func (s *EchoSet) Get(name string) (*api.Echo, bool)

func (*EchoSet) List added in v0.2.0

func (s *EchoSet) List() []*api.Echo

type Empty

type Empty struct{}

func (*Empty) Desc added in v0.9.2

func (e *Empty) Desc() string

func (*Empty) GetEID added in v0.13.2

func (e *Empty) GetEID() string

func (*Empty) Marshal

func (e *Empty) Marshal() ([]byte, error)

func (*Empty) OwnerReferences

func (e *Empty) OwnerReferences() []*api.OwnerReference

func (*Empty) Unmarshal

func (e *Empty) Unmarshal(data []byte) error

type EmptyEcho

type EmptyEcho struct{}

func (*EmptyEcho) Call

func (e *EmptyEcho) Call(ctx context.Context, data []byte) ([]byte, error)

func (*EmptyEcho) Desc added in v0.9.2

func (e *EmptyEcho) Desc() string

func (*EmptyEcho) Owner added in v0.6.0

func (e *EmptyEcho) Owner() reflect.Type

type EmptyStep

type EmptyStep struct{}

func (*EmptyStep) Cancel

func (s *EmptyStep) Cancel(ctx *PipeSessionCtx) error

func (*EmptyStep) Commit

func (s *EmptyStep) Commit(ctx *PipeSessionCtx) (out map[string]any, err error)

func (*EmptyStep) Desc added in v0.9.2

func (s *EmptyStep) Desc() string

func (*EmptyStep) Owner added in v0.6.0

func (s *EmptyStep) Owner() reflect.Type

func (*EmptyStep) Prepare

func (s *EmptyStep) Prepare(ctx *PipeSessionCtx) error

func (*EmptyStep) Rollback

func (s *EmptyStep) Rollback(ctx *PipeSessionCtx) error

type Entity

type Entity interface {
	GetEID() string
	// OwnerReferences Entity 之间的依赖信息
	OwnerReferences() []*api.OwnerReference
	// Marshal Entity 序列化
	Marshal() ([]byte, error)
	// Unmarshal Entity 反序列化
	Unmarshal(data []byte) error
	// Desc Entity 说明
	Desc() string
}

Entity 描述工作流中的具体资源,是工作流中的执行单元

type EntitySet

type EntitySet struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewEntitySet

func NewEntitySet() *EntitySet

func (*EntitySet) Add

func (s *EntitySet) Add(entity *api.Entity)

func (*EntitySet) Contains

func (s *EntitySet) Contains(name string) bool

func (*EntitySet) Del

func (s *EntitySet) Del(entity *api.Entity)

func (*EntitySet) Get

func (s *EntitySet) Get(kind string) (*api.Entity, bool)

func (*EntitySet) List added in v0.2.0

func (s *EntitySet) List() []*api.Entity

type IStepBuilder added in v0.13.5

type IStepBuilder interface {
	Id() string
	Entity() Entity
	Build() (any, error)
}

type Logger added in v0.12.7

type Logger struct {
	LoggerOptions
}

func NewLogger added in v0.12.7

func NewLogger(opts ...LoggerOption) *Logger

func (*Logger) Debug added in v0.12.7

func (lg *Logger) Debug(text string)

func (*Logger) Debugf added in v0.12.7

func (lg *Logger) Debugf(format string, args ...any)

func (*Logger) Error added in v0.12.7

func (lg *Logger) Error(text string)

func (*Logger) Errorf added in v0.12.7

func (lg *Logger) Errorf(format string, args ...any)

func (*Logger) Info added in v0.12.7

func (lg *Logger) Info(text string)

func (*Logger) Infof added in v0.12.7

func (lg *Logger) Infof(format string, args ...any)

func (*Logger) Trace added in v0.12.7

func (lg *Logger) Trace(text string)

func (*Logger) Tracef added in v0.12.7

func (lg *Logger) Tracef(format string, args ...any)

func (*Logger) Warn added in v0.12.7

func (lg *Logger) Warn(text string)

func (*Logger) Warnf added in v0.12.7

func (lg *Logger) Warnf(format string, args ...any)

type LoggerOption added in v0.12.7

type LoggerOption func(options *LoggerOptions)

func WithLoggerContext added in v0.12.7

func WithLoggerContext(ctx *PipeSessionCtx) LoggerOption

type LoggerOptions added in v0.12.7

type LoggerOptions struct {
	// contains filtered or unexported fields
}

func NewLoggerOptions added in v0.12.7

func NewLoggerOptions(opts ...LoggerOption) LoggerOptions

type MemberPipeHandler

type MemberPipeHandler func(*api.PipeResponse) (*api.PipeRequest, error)

type MemberPipeStream

type MemberPipeStream struct {
	// contains filtered or unexported fields
}

func NewMemberPipeStream

func NewMemberPipeStream(ctx context.Context, id string, handler MemberPipeHandler) *MemberPipeStream

func (*MemberPipeStream) Close

func (s *MemberPipeStream) Close() error

func (*MemberPipeStream) Context

func (s *MemberPipeStream) Context() context.Context

func (*MemberPipeStream) Recv

func (s *MemberPipeStream) Recv() (*api.PipeRequest, error)

func (*MemberPipeStream) Send

func (s *MemberPipeStream) Send(req *api.PipeResponse) error

type Option

type Option func(option *api.WorkflowOption)

Option represents a configuration option for Workflow struct. It provides a way to modify the fields of the Workflow struct a flexible ways.

func WithId

func WithId(id string) Option

WithId sets the Wid field of *api.WorkflowOption to the specified value.

func WithMaxRetry

func WithMaxRetry(retry int32) Option

WithMaxRetry sets the MaxRetries field of *api.WorkflowOption to the specified value.

func WithName

func WithName(name string) Option

WithName sets the Name field of *api.WorkflowOption to the specified value.

type Peer

type Peer struct {
	Server string
	Client string
}

type PipeSession

type PipeSession struct {
	// contains filtered or unexported fields
}

func NewPipeSession

func NewPipeSession(c *Client) (*PipeSession, error)

func (*PipeSession) Close

func (s *PipeSession) Close()

func (*PipeSession) IsConnected

func (s *PipeSession) IsConnected() bool

type PipeSessionCtx

type PipeSessionCtx struct {
	context.Context
	// contains filtered or unexported fields
}

func NewSessionCtx

func NewSessionCtx(ctx context.Context, wid, instanceId, sid, step string, revision api.Revision, c *Client) *PipeSessionCtx

func (*PipeSessionCtx) Call

func (c *PipeSessionCtx) Call(ctx context.Context, target, echo string, data []byte, opts ...vclient.CallOption) ([]byte, error)

func (*PipeSessionCtx) Get

func (c *PipeSessionCtx) Get(ctx context.Context, key string, opts ...vclient.CallOption) ([]byte, error)

func (*PipeSessionCtx) Log added in v0.12.7

func (c *PipeSessionCtx) Log(opts ...LoggerOption) *Logger

func (*PipeSessionCtx) Put

func (c *PipeSessionCtx) Put(ctx context.Context, key string, data any, opts ...vclient.CallOption) error

func (*PipeSessionCtx) Revision

func (c *PipeSessionCtx) Revision() *api.Revision

func (*PipeSessionCtx) WorkflowID

func (c *PipeSessionCtx) WorkflowID() string

type PipeSet

type PipeSet struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewPipeSet

func NewPipeSet() *PipeSet

func (*PipeSet) Add

func (ps *PipeSet) Add(p *ClientPipe)

func (*PipeSet) Del

func (ps *PipeSet) Del(p *ClientPipe)

func (*PipeSet) Get

func (ps *PipeSet) Get(id string) (*ClientPipe, bool)

type PipeStream

type PipeStream interface {
	Context() context.Context
	Send(*api.PipeResponse) error
	Recv() (*api.PipeRequest, error)
	Close() error
}

type RpcServer

type RpcServer struct {
	// contains filtered or unexported fields
}

func NewRPCServer

func NewRPCServer(s vserver.Server, scheduler *Scheduler) (*RpcServer, error)

func (*RpcServer) AbortWorkflowInstance added in v0.10.0

func (*RpcServer) Call

func (rs *RpcServer) Call(ctx context.Context, req *api.CallRequest, rsp *api.CallResponse) error

func (*RpcServer) CommitInteractive added in v0.10.0

func (rs *RpcServer) CommitInteractive(ctx context.Context, req *api.CommitInteractiveRequest, rsp *api.CommitInteractiveResponse) (err error)

func (*RpcServer) ExecuteWorkflowInstance added in v0.13.2

func (*RpcServer) GetWorker added in v0.12.2

func (rs *RpcServer) GetWorker(ctx context.Context, req *api.GetWorkerRequest, rsp *api.GetWorkerResponse) error

func (*RpcServer) HandleServiceErr added in v0.14.0

func (rs *RpcServer) HandleServiceErr(ctx context.Context, req *api.HandleServiceErrRequest, rsp *api.HandleServiceErrResponse) (err error)

func (*RpcServer) Id

func (rs *RpcServer) Id() string

func (*RpcServer) InspectWorkflowInstance added in v0.10.0

func (*RpcServer) ListInteractive added in v0.10.0

func (rs *RpcServer) ListInteractive(ctx context.Context, req *api.ListInteractiveRequest, rsp *api.ListInteractiveResponse) (err error)

func (*RpcServer) ListRegistry added in v0.2.0

func (rs *RpcServer) ListRegistry(ctx context.Context, req *api.ListRegistryRequest, rsp *api.ListRegistryResponse) error

func (*RpcServer) ListWorker added in v0.2.0

func (rs *RpcServer) ListWorker(ctx context.Context, req *api.ListWorkerRequest, rsp *api.ListWorkerResponse) error

func (*RpcServer) ListWorkflowInstance added in v0.10.0

func (*RpcServer) PauseWorkflowInstance added in v0.10.0

func (*RpcServer) Pipe

func (rs *RpcServer) Pipe(ctx context.Context, stream api.FlowRpc_PipeStream) error

func (*RpcServer) Register

func (rs *RpcServer) Register(ctx context.Context, req *api.RegisterRequest, rsp *api.RegisterResponse) error

func (*RpcServer) ResumeWorkflowInstance added in v0.10.0

func (*RpcServer) Step

func (rs *RpcServer) Step(ctx context.Context, req *api.StepRequest, rsp *api.StepResponse) error

func (*RpcServer) StepGet

func (rs *RpcServer) StepGet(ctx context.Context, req *api.StepGetRequest, rsp *api.StepGetResponse) error

func (*RpcServer) StepPut

func (rs *RpcServer) StepPut(ctx context.Context, req *api.StepPutRequest, rsp *api.StepPutResponse) error

func (*RpcServer) StepTrace

func (rs *RpcServer) StepTrace(ctx context.Context, req *api.StepTraceRequest, rsp *api.StepTraceResponse) error

func (*RpcServer) Stop

func (rs *RpcServer) Stop() error

func (*RpcServer) WatchWorkflowInstance added in v0.10.0

func (*RpcServer) WorkHook added in v0.12.2

func (rs *RpcServer) WorkHook(ctx context.Context, req *api.WorkHookRequest, stream api.FlowRpc_WorkHookStream) error

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(name string, storage *clientv3.Client, size int) (*Scheduler, error)

func (*Scheduler) CommitInteractive added in v0.10.0

func (s *Scheduler) CommitInteractive(ctx context.Context, pid, sid string, properties map[string]string) error

func (*Scheduler) ExecuteWorkflowInstance added in v0.10.0

func (s *Scheduler) ExecuteWorkflowInstance(id, name, definitionsText string, dataObjects, properties map[string]string, ps *PipeSet) error

func (*Scheduler) GetRegistry added in v0.2.0

func (s *Scheduler) GetRegistry() (entities []*api.Entity, echoes []*api.Echo, steps []*api.Step)

func (*Scheduler) GetWorker added in v0.12.2

func (s *Scheduler) GetWorker(ctx context.Context, id string) (*api.Worker, error)

func (*Scheduler) GetWorkers added in v0.2.0

func (s *Scheduler) GetWorkers(ctx context.Context) ([]*api.Worker, error)

func (*Scheduler) GetWorkflowInstance added in v0.10.0

func (s *Scheduler) GetWorkflowInstance(wid string) (*Workflow, bool)

func (*Scheduler) GetWorkflowInstances added in v0.10.0

func (s *Scheduler) GetWorkflowInstances() []*api.WorkflowSnapshot

func (*Scheduler) HandleServiceErr added in v0.14.0

func (s *Scheduler) HandleServiceErr(ctx context.Context, req api.ErrHandleRequest) error

func (*Scheduler) InspectWorkflowInstance added in v0.10.0

func (s *Scheduler) InspectWorkflowInstance(ctx context.Context, wid string) (*api.Workflow, error)

func (*Scheduler) IsClosed

func (s *Scheduler) IsClosed() bool

func (*Scheduler) ListInteractive added in v0.10.0

func (s *Scheduler) ListInteractive(ctx context.Context, pid string) ([]*api.Interactive, error)

func (*Scheduler) Register

func (s *Scheduler) Register(worker *api.Worker, entities []*api.Entity, echoes []*api.Echo, steps []*api.Step) error

func (*Scheduler) RemoveWorkflowInstance added in v0.10.2

func (s *Scheduler) RemoveWorkflowInstance(wid string)

func (*Scheduler) SetWorkflowInstance added in v0.10.2

func (s *Scheduler) SetWorkflowInstance(wf *Workflow)

func (*Scheduler) StepGet

func (s *Scheduler) StepGet(ctx context.Context, wid, key string) ([]byte, error)

func (*Scheduler) StepPut

func (s *Scheduler) StepPut(ctx context.Context, wid, key, value string) error

func (*Scheduler) StepTrace

func (s *Scheduler) StepTrace(ctx context.Context, traceLog *api.TraceLog) error

func (*Scheduler) Stop

func (s *Scheduler) Stop(wait bool)

func (*Scheduler) WatchWorkflowInstance added in v0.10.0

func (s *Scheduler) WatchWorkflowInstance(ctx context.Context, wid string) (<-chan *api.WorkflowWatchResult, error)

type Step

type Step interface {
	// Owner Step 所属 Entity
	Owner() reflect.Type

	Prepare(ctx *PipeSessionCtx) error

	Commit(ctx *PipeSessionCtx) (map[string]any, error)

	Rollback(ctx *PipeSessionCtx) error

	Cancel(ctx *PipeSessionCtx) error
	// Desc Step 描述信息
	Desc() string
}

Step 表示具有原子性的复杂操作

type StepPack

type StepPack struct {
	// contains filtered or unexported fields
}

func NewStep

func NewStep(ctx context.Context, chunk *api.PipeStepRequest) *StepPack

func (*StepPack) Apply added in v0.9.9

func (p *StepPack) Apply(data map[string]string)

func (*StepPack) ApplyErr added in v0.9.9

func (p *StepPack) ApplyErr(err error)

func (*StepPack) Destroy

func (p *StepPack) Destroy()

type StepSet

type StepSet struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewStepSet

func NewStepSet() *StepSet

func (*StepSet) Add

func (s *StepSet) Add(step *api.Step)

func (*StepSet) Contains

func (s *StepSet) Contains(name string) bool

func (*StepSet) Del

func (s *StepSet) Del(step *api.Step)

func (*StepSet) Get

func (s *StepSet) Get(name string) (*api.Step, bool)

func (*StepSet) List added in v0.2.0

func (s *StepSet) List() []*api.Step

type SubWorkflowBuilder added in v0.13.5

type SubWorkflowBuilder struct {
	// contains filtered or unexported fields
}

SubWorkflowBuilder the builder pattern is used to separate the construction of a complex object of its representation so that the same construction sub process can create different representations.

func NewSubWorkflowBuilder added in v0.13.5

func NewSubWorkflowBuilder(entity Entity, opts ...Option) *SubWorkflowBuilder

NewSubWorkflowBuilder returns a new instance of the SubWorkflowBuilder struct. The builder can be used to construct a Workflow struct with specific options.

func (*SubWorkflowBuilder) Build added in v0.13.5

func (b *SubWorkflowBuilder) Build() (any, error)

func (*SubWorkflowBuilder) Entity added in v0.13.5

func (b *SubWorkflowBuilder) Entity() Entity

func (*SubWorkflowBuilder) Id added in v0.13.5

func (b *SubWorkflowBuilder) Id() string

func (*SubWorkflowBuilder) Item added in v0.13.5

func (b *SubWorkflowBuilder) Item(key string, value any) *SubWorkflowBuilder

Item adds a key-value pairs to the Workflow struct.

func (*SubWorkflowBuilder) Items added in v0.13.5

func (b *SubWorkflowBuilder) Items(items map[string]any) *SubWorkflowBuilder

Items adds a map of key-value pairs to the Workflow struct.

func (*SubWorkflowBuilder) Step added in v0.13.5

Step adds Step interface implementations to the Workflow struct.

func (*SubWorkflowBuilder) Steps added in v0.13.5

Steps adds a slice of Step interface implementations to the Workflow struct.

type Tag

type Tag struct {
	Name string
	Kind TagKind
}

type TagKind added in v0.7.0

type TagKind int
const (
	TagKindCtx TagKind = iota + 1
	TagKindEntity
)

type TestStep added in v0.6.0

type TestStep struct {
	E *Empty `flow:"entity"`
	B int32  `flow:"ctx:b"`
	C string `flow:"ctx:c"`
	// contains filtered or unexported fields
}

func (*TestStep) Cancel added in v0.6.0

func (s *TestStep) Cancel(ctx *PipeSessionCtx) error

func (*TestStep) Commit added in v0.6.0

func (s *TestStep) Commit(ctx *PipeSessionCtx) (map[string]any, error)

func (*TestStep) Desc added in v0.9.2

func (s *TestStep) Desc() string

func (*TestStep) Owner added in v0.6.0

func (s *TestStep) Owner() reflect.Type

func (*TestStep) Prepare added in v0.6.0

func (s *TestStep) Prepare(ctx *PipeSessionCtx) error

func (*TestStep) Rollback added in v0.6.0

func (s *TestStep) Rollback(ctx *PipeSessionCtx) error

type WorkHookWatcher added in v0.12.2

type WorkHookWatcher interface {
	Next() (*api.WorkHookResult, error)
}

type WorkerSub added in v0.12.2

type WorkerSub struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewWorkSub added in v0.12.2

func NewWorkSub() *WorkerSub

func (*WorkerSub) Add added in v0.12.2

func (ws *WorkerSub) Add(id string, stream api.FlowRpc_WorkHookStream)

func (*WorkerSub) Del added in v0.12.2

func (ws *WorkerSub) Del(id string)

func (*WorkerSub) Pub added in v0.12.2

func (ws *WorkerSub) Pub(result *api.WorkHookResult)

type Workflow

type Workflow struct {
	// protects for api.Workflow and snapshot
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewWorkflow

func NewWorkflow(id, instanceId, name string, dataObjects, items map[string]string, storage *clientv3.Client, ps *PipeSet) *Workflow

func (*Workflow) Abort

func (w *Workflow) Abort()

func (*Workflow) Cancel

func (w *Workflow) Cancel()

func (*Workflow) CommitInteractive added in v0.10.0

func (w *Workflow) CommitInteractive(it *api.Interactive)

func (*Workflow) Context

func (w *Workflow) Context() context.Context

func (*Workflow) Destroy added in v0.10.0

func (w *Workflow) Destroy()

func (*Workflow) Execute

func (w *Workflow) Execute()

func (*Workflow) Handle added in v0.10.0

func (w *Workflow) Handle(step *api.WorkflowStep, action api.StepAction, items map[string]string) (map[string]string, error)

func (*Workflow) ID

func (w *Workflow) ID() string

func (*Workflow) Init

func (w *Workflow) Init() (err error)

func (*Workflow) Inspect

func (w *Workflow) Inspect(ctx context.Context) (*api.Workflow, error)

func (*Workflow) InstanceId added in v0.12.6

func (w *Workflow) InstanceId() string

func (*Workflow) InteractiveHandle added in v0.10.0

func (w *Workflow) InteractiveHandle(ctx context.Context, step *api.WorkflowStep, it *api.Interactive) error

func (*Workflow) IsAbort

func (w *Workflow) IsAbort() bool

func (*Workflow) IsStop

func (w *Workflow) IsStop() bool

func (*Workflow) NewSnapshot

func (w *Workflow) NewSnapshot() *api.WorkflowSnapshot

func (*Workflow) NewWatcher

func (w *Workflow) NewWatcher(ctx context.Context) (<-chan *api.WorkflowWatchResult, error)

func (*Workflow) Pause

func (w *Workflow) Pause() bool

func (*Workflow) Resume

func (w *Workflow) Resume() bool

type WorkflowBuilder

type WorkflowBuilder struct {
	// contains filtered or unexported fields
}

WorkflowBuilder the builder pattern is used to separate the construction of a complex object of its representation so that the same construction process can create different representations.

func NewWorkFlowBuilder added in v0.13.2

func NewWorkFlowBuilder(opts ...Option) *WorkflowBuilder

NewWorkFlowBuilder returns a new instance of the WorkflowBuilder struct. The builder can be used to construct a Workflow struct with specific options.

func (*WorkflowBuilder) Item added in v0.9.5

func (b *WorkflowBuilder) Item(key string, value any) *WorkflowBuilder

Item adds a key-value pairs to the Workflow struct.

func (*WorkflowBuilder) Items

func (b *WorkflowBuilder) Items(items map[string]any) *WorkflowBuilder

Items adds a map of key-value pairs to the Workflow struct.

func (*WorkflowBuilder) SetEntity added in v0.13.8

func (b *WorkflowBuilder) SetEntity(entity Entity) *WorkflowBuilder

func (*WorkflowBuilder) Step added in v0.9.7

Step adds Step interface implementations to the Workflow struct.

func (*WorkflowBuilder) Steps

func (b *WorkflowBuilder) Steps(sbs ...IStepBuilder) *WorkflowBuilder

Steps adds a slice of Step interface implementations to the Workflow struct.

func (*WorkflowBuilder) ToProcessDefinitions added in v0.13.2

func (b *WorkflowBuilder) ToProcessDefinitions() (*schema.Definitions, map[string]string, map[string]string, error)

func (*WorkflowBuilder) ToSubProcessDefinitions added in v0.13.2

func (b *WorkflowBuilder) ToSubProcessDefinitions() (*schema.Definitions, map[string]string, map[string]string, error)

type WorkflowStepBuilder added in v0.7.0

type WorkflowStepBuilder struct {
	// contains filtered or unexported fields
}

WorkflowStepBuilder the builder pattern is used to separate the construction of a complex object of its representation so that the same construction process can create different representations.

func NewStepBuilder added in v0.7.0

func NewStepBuilder(step Step, worker string, entity Entity) *WorkflowStepBuilder

func (*WorkflowStepBuilder) Build added in v0.7.0

func (b *WorkflowStepBuilder) Build() (any, error)

func (*WorkflowStepBuilder) Entity added in v0.13.5

func (b *WorkflowStepBuilder) Entity() Entity

func (*WorkflowStepBuilder) Id added in v0.13.5

func (b *WorkflowStepBuilder) Id() string

func (*WorkflowStepBuilder) SetId added in v0.13.5

type WorkflowWatcher

type WorkflowWatcher interface {
	Next() (*api.WorkflowWatchResult, error)
}

Directories

Path Synopsis
cmd
protoc-gen-flow
protoc-gen-vine is a plugin for the Google protocol buffer compiler to generate Go code.
protoc-gen-vine is a plugin for the Google protocol buffer compiler to generate Go code.
examples
pb

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL