flow

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2025 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ParseExecID added in v0.0.2

func ParseExecID(execID string) (flowName string, nodeName string, sessID string, err error)

Types

type Aggregator

type Aggregator func(map[string][]byte) ([]byte, error)

Aggregator definition for the data aggregator of nodes

type Definitor

type Definitor func(ctx context.Context, f *Flow) error

type ExecResult

type ExecResult struct {
	ID   string `json:"id"`
	Resp []byte `json:"resp"`
	Err  string `json:"err"`
}

type ExecutionOptions

type ExecutionOptions struct {
	// contains filtered or unexported fields
}

type Executor

type Executor struct {
	// id format: "flow_name:node_name:random_id"
	ID   string `json:"id"`
	Body []byte `json:"body"`
	// contains filtered or unexported fields
}

func (*Executor) Execute

func (e *Executor) Execute(ctx context.Context) error

type Flow

type Flow struct {
	Name string
	DAG  *dag.DAG
	// contains filtered or unexported fields
}

func New

func New(
	name string, stor store.Store, cli *asynq.Client,
	logger *slog.Logger, cfg *types.Config, insp *asynq.Inspector,
) (*Flow, error)

func (*Flow) Edge

func (f *Flow) Edge(src, dst string) error

func (*Flow) GetResult

func (f *Flow) GetResult(sessID string) (map[string]*ExecResult, error)

func (*Flow) IfNode added in v0.0.2

func (f *Flow) IfNode(
	name string, condFn IfCondFunc, trueFn, falseFn NodeFunc, opts ...Option,
) error

func (*Flow) Node

func (f *Flow) Node(name string, fn NodeFunc, opts ...Option) error

func (*Flow) Register

func (f *Flow) Register(mux *asynq.ServeMux)

func (*Flow) Submit

func (f *Flow) Submit(body []byte) (string, error)

func (*Flow) SwitchNode

func (f *Flow) SwitchNode(
	name string, condFn SwitchCondFunc,
	cases map[string]NodeFunc, opts ...Option,
) error

type ForEach

type ForEach func([]byte) map[string][]byte

ForEach definition for the foreach function

type Forwarder

type Forwarder func([]byte) []byte

Forwarder definition for the data forwarder of nodes

type FuncErrorHandler

type FuncErrorHandler func([]byte, error) error

type IfCondFunc added in v0.0.2

type IfCondFunc func([]byte) bool

type NodeFunc

type NodeFunc func([]byte, map[string][]string) ([]byte, error)

type Option

type Option func(*ExecutionOptions)

func WithAggregator

func WithAggregator(agg Aggregator) Option

func WithFailureHandler

func WithFailureHandler(fn FuncErrorHandler) Option

func WithFinalFailureHandler added in v0.0.2

func WithFinalFailureHandler(fn FuncErrorHandler) Option

type SwitchCondFunc

type SwitchCondFunc func([]byte) string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL