flow

package
v3.7.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2021 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package flow is an interface used for saga pattern microservice workflow

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStepNotExists = errors.New("step not exists")
	ErrMissingClient = errors.New("client not set")
)
View Source
var (
	StatusString = map[Status]string{
		StatusPending: "StatusPending",
		StatusRunning: "StatusRunning",
		StatusFailure: "StatusFailure",
		StatusSuccess: "StatusSuccess",
		StatusAborted: "StatusAborted",
		StatusSuspend: "StatusSuspend",
	}
	StringStatus = map[string]Status{
		"StatusPending": StatusPending,
		"StatusRunning": StatusRunning,
		"StatusFailure": StatusFailure,
		"StatusSuccess": StatusSuccess,
		"StatusAborted": StatusAborted,
		"StatusSuspend": StatusSuspend,
	}
)

Functions

func NewContext added in v3.4.2

func NewContext(ctx context.Context, f Flow) context.Context

NewContext stores Flow to context

func RegisterStep added in v3.4.9

func RegisterStep(step Step)

Types

type ExecuteOption added in v3.4.2

type ExecuteOption func(*ExecuteOptions)

func ExecuteAsync added in v3.4.9

func ExecuteAsync(b bool) ExecuteOption

func ExecuteClient added in v3.4.2

func ExecuteClient(c client.Client) ExecuteOption

func ExecuteContext added in v3.4.2

func ExecuteContext(ctx context.Context) ExecuteOption

func ExecuteLogger added in v3.4.2

func ExecuteLogger(l logger.Logger) ExecuteOption

func ExecuteMeter added in v3.4.2

func ExecuteMeter(m meter.Meter) ExecuteOption

func ExecuteReverse added in v3.4.2

func ExecuteReverse(b bool) ExecuteOption

func ExecuteTimeout added in v3.4.2

func ExecuteTimeout(td time.Duration) ExecuteOption

func ExecuteTracer added in v3.4.2

func ExecuteTracer(t tracer.Tracer) ExecuteOption

type ExecuteOptions added in v3.4.2

type ExecuteOptions struct {
	// Client holds the client.Client
	Client client.Client
	// Tracer holds the tracer
	Tracer tracer.Tracer
	// Logger holds the logger
	Logger logger.Logger
	// Meter holds the meter
	Meter meter.Meter
	// Context can be used to abort execution or pass additional opts
	Context context.Context
	// Start step
	Start string
	// Reverse execution
	Reverse bool
	// Timeout for execution
	Timeout time.Duration
	// Async enables async execution
	Async bool
}

func NewExecuteOptions added in v3.4.2

func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions

type Flow added in v3.3.17

type Flow interface {
	// Options returns options
	Options() Options
	// Init initialize
	Init(...Option) error
	// WorkflowCreate creates new workflow with specific id and steps
	WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error)
	// WorkflowSave saves workflow
	WorkflowSave(ctx context.Context, w Workflow) error
	// WorkflowLoad loads workflow with specific id
	WorkflowLoad(ctx context.Context, id string) (Workflow, error)
	// WorkflowList lists all workflows
	WorkflowList(ctx context.Context) ([]Workflow, error)
}

Flow the base interface to interact with workflows

func FromContext added in v3.4.2

func FromContext(ctx context.Context) (Flow, bool)

FromContext returns Flow from context

func NewFlow added in v3.4.2

func NewFlow(opts ...Option) Flow

type Message added in v3.4.9

type Message struct {
	Header metadata.Metadata
	Body   RawMessage
}

type Option added in v3.4.2

type Option func(*Options)

Option func

func Client added in v3.4.2

func Client(c client.Client) Option

Client to use for sync/async communication

func Context added in v3.4.2

func Context(ctx context.Context) Option

Context specifies a context for the service. Can be used to signal shutdown of the flow Can be used for extra option values.

func Logger added in v3.4.2

func Logger(l logger.Logger) Option

Logger sets the logger option

func Meter added in v3.4.2

func Meter(m meter.Meter) Option

Meter sets the meter option

func SetOption added in v3.4.2

func SetOption(k, v interface{}) Option

SetOption returns a function to setup a context with given value

func Store added in v3.4.2

func Store(s store.Store) Option

Store used for intermediate results

func Tracer added in v3.4.2

func Tracer(t tracer.Tracer) Option

Tracer mechanism for distributed tracking

type Options added in v3.4.2

type Options struct {
	// Context holds the external options and can be used for flow shutdown
	Context context.Context
	// Client holds the client.Client
	Client client.Client
	// Tracer holds the tracer
	Tracer tracer.Tracer
	// Logger holds the logger
	Logger logger.Logger
	// Meter holds the meter
	Meter meter.Meter
	// Store used for intermediate results
	Store store.Store
}

Options server struct

func NewOptions added in v3.4.2

func NewOptions(opts ...Option) Options

NewOptions returns new options struct with default or passed values

type RawMessage added in v3.4.9

type RawMessage []byte

RawMessage is a raw encoded JSON value. It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.

func (*RawMessage) MarshalJSON added in v3.4.9

func (m *RawMessage) MarshalJSON() ([]byte, error)

MarshalJSON returns m as the JSON encoding of m.

func (*RawMessage) UnmarshalJSON added in v3.4.9

func (m *RawMessage) UnmarshalJSON(data []byte) error

UnmarshalJSON sets *m to a copy of data.

type Status added in v3.4.9

type Status int
const (
	StatusPending Status = iota
	StatusRunning
	StatusFailure
	StatusSuccess
	StatusAborted
	StatusSuspend
)

func (Status) String added in v3.4.9

func (status Status) String() string

type Step

type Step interface {
	// ID returns step id
	ID() string
	// Endpoint returns rpc endpoint service_name.service_method or broker topic
	Endpoint() string
	// Execute step run
	Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error)
	// Requires returns dependent steps
	Requires() []string
	// Options returns step options
	Options() StepOptions
	// Require add required steps
	Require(steps ...Step) error
	// String
	String() string
	// GetStatus returns step status
	GetStatus() Status
	// SetStatus sets the step status
	SetStatus(Status)
	// Request returns step request message
	Request() *Message
	// Response returns step response message
	Response() *Message
}

Step represents dedicated workflow step

func NewCallStep added in v3.4.2

func NewCallStep(service string, name string, method string, opts ...StepOption) Step

func NewPublishStep added in v3.4.2

func NewPublishStep(topic string, opts ...StepOption) Step

type StepOption added in v3.4.2

type StepOption func(*StepOptions)

func StepFallback added in v3.4.2

func StepFallback(step string) StepOption

func StepID added in v3.4.2

func StepID(id string) StepOption

func StepRequires added in v3.4.2

func StepRequires(steps ...string) StepOption

type StepOptions added in v3.4.2

type StepOptions struct {
	ID       string
	Context  context.Context
	Requires []string
	Fallback string
}

func NewStepOptions added in v3.4.2

func NewStepOptions(opts ...StepOption) StepOptions

type Workflow added in v3.3.17

type Workflow interface {
	// ID returns id of the workflow
	ID() string
	// Execute workflow with args, return execution id and error
	Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error)
	// RemoveSteps remove steps from workflow
	RemoveSteps(steps ...Step) error
	// AppendSteps append steps to workflow
	AppendSteps(steps ...Step) error
	// Status returns workflow status
	Status() Status
	// Steps returns steps slice where parallel steps returned on the same level
	Steps() ([][]Step, error)
	// Suspend suspends execution
	Suspend(ctx context.Context, id string) error
	// Resume resumes execution
	Resume(ctx context.Context, id string) error
	// Abort abort execution
	Abort(ctx context.Context, id string) error
}

Workflow contains all steps to execute

type WorkflowOption added in v3.4.2

type WorkflowOption func(*WorkflowOptions)

WorflowOption signature

func WorkflowID added in v3.4.2

func WorkflowID(id string) WorkflowOption

WorkflowID set workflow id

type WorkflowOptions added in v3.4.2

type WorkflowOptions struct {
	ID      string
	Context context.Context
}

WorkflowOptions holds workflow options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL