Versions in this module Expand all Collapse all v0 v0.1.3 Sep 15, 2023 v0.1.2 Sep 15, 2023 Changes in this version + const DefFailedReason + const ReasonParentCancel + const ReasonSuccessAfterCanceled + var ActionMap = map[string]run.Action + var CommSync = func() CommandOptSetter + var CommSyncInterval = func(duration time.Duration) CommandOptSetter + var CommSyncTimeout = func(duration time.Duration) CommandOptSetter + func GetCanRunAction(param string) run.Action + func RegisterCanRunAction(actionInter interface{}) error + func SetCommander(c Commander) + func SetExecutor(e Executor) + func SetKeeper(e Keeper) + func SetParser(e Parser) + func SetStore(e Store) + type Closer interface + Close func() + type CommandOptSetter func(opt *CommandOption) + type CommandOption struct + type Commander interface + CancelTask func(taskInsIds []string, ops ...CommandOptSetter) error + RetryDagIns func(dagInsId string, ops ...CommandOptSetter) error + RetryTask func(taskInsIds []string, ops ...CommandOptSetter) error + RunDag func(dagId string, specVar map[string]string) (*entity.DagInstance, error) + func GetCommander() Commander + type DefCommander struct + func (c *DefCommander) CancelTask(taskInsIds []string, ops ...CommandOptSetter) error + func (c *DefCommander) RetryDagIns(dagInsId string, ops ...CommandOptSetter) error + func (c *DefCommander) RetryTask(taskInsIds []string, ops ...CommandOptSetter) error + func (c *DefCommander) RunDag(dagId string, specVars map[string]string) (*entity.DagInstance, error) + type DefDispatcher struct + func NewDefDispatcher() *DefDispatcher + func (d *DefDispatcher) Close() + func (d *DefDispatcher) Do() error + func (d *DefDispatcher) Init() + func (d *DefDispatcher) WatchInitDags() + type DefExecutor struct + func NewDefExecutor(timeout time.Duration, workers int) *DefExecutor + func (e *DefExecutor) CancelTaskIns(taskInsIds []string) error + func (e *DefExecutor) Close() + func (e *DefExecutor) Init() + func (e *DefExecutor) Push(dagIns *entity.DagInstance, taskIns *entity.TaskInstance) + type DefParser struct + func NewDefParser(workerNumber int, taskTimeout time.Duration) *DefParser + func (p *DefParser) Close() + func (p *DefParser) EntryTaskIns(taskIns *entity.TaskInstance) + func (p *DefParser) Init() + func (p *DefParser) InitialDagIns(dagIns *entity.DagInstance) + type DefWatchDog struct + func NewDefWatchDog(dagScheduledTimeout time.Duration) *DefWatchDog + func (wd *DefWatchDog) Close() + func (wd *DefWatchDog) Init() + type DistributedMutex interface + Lock func(ctx context.Context, ops ...LockOptionOp) error + Unlock func(ctx context.Context) error + type Executor interface + CancelTaskIns func(taskInsIds []string) error + Push func(dagIns *entity.DagInstance, taskIns *entity.TaskInstance) + func GetExecutor() Executor + type Keeper interface + AliveNodes func() ([]string, error) + IsAlive func(workerKey string) (bool, error) + IsLeader func() bool + NewMutex func(key string) DistributedMutex + WorkerKey func() string + WorkerNumber func() int + func GetKeeper() Keeper + type ListDagInput struct + type ListDagInstanceInput struct + DagID string + HasCmd bool + Limit int64 + Offset int64 + Status []entity.DagInstanceStatus + UpdatedEnd int64 + Worker string + type ListTaskInstanceInput struct + DagInsID string + Expired bool + IDs []string + SelectField []string + Status []entity.TaskInstanceStatus + type LockOption struct + ReentrantIdentity string + SpinInterval time.Duration + TTL time.Duration + func NewLockOption(ops []LockOptionOp) *LockOption + type LockOptionOp func(option *LockOption) + func LockTTL(d time.Duration) LockOptionOp + func Reentrant(identity string) LockOptionOp + type MockCloser struct + func (_m *MockCloser) Close() + type MockExecutor struct + func (_m *MockExecutor) CancelTaskIns(taskInsId []string) error + func (_m *MockExecutor) Push(data *entity.DagInstance, taskIns *entity.TaskInstance) + type MockKeeper struct + func (_m *MockKeeper) AliveNodes() ([]string, error) + func (_m *MockKeeper) Close() + func (_m *MockKeeper) IsAlive(workerKey string) (bool, error) + func (_m *MockKeeper) IsLeader() bool + func (_m *MockKeeper) NewMutex(key string) DistributedMutex + func (_m *MockKeeper) WorkerKey() string + func (_m *MockKeeper) WorkerNumber() int + type MockParser struct + func (_m *MockParser) EntryTaskIns(taskIns *entity.TaskInstance) + func (_m *MockParser) InitialDagIns(dagIns *entity.DagInstance) + type MockStore struct + func (_m *MockStore) BatchCreatTaskIns(taskIns []*entity.TaskInstance) error + func (_m *MockStore) BatchUpdateDagIns(dagIns []*entity.DagInstance) error + func (_m *MockStore) BatchUpdateTaskIns(taskIns []*entity.TaskInstance) error + func (_m *MockStore) Close() + func (_m *MockStore) CreateDag(dag *entity.Dag) error + func (_m *MockStore) CreateDagIns(dagIns *entity.DagInstance) error + func (_m *MockStore) GetDag(dagId string) (*entity.Dag, error) + func (_m *MockStore) GetDagInstance(dagInsId string) (*entity.DagInstance, error) + func (_m *MockStore) GetTaskIns(taskIns string) (*entity.TaskInstance, error) + func (_m *MockStore) ListDagInstance(input *ListDagInstanceInput) ([]*entity.DagInstance, error) + func (_m *MockStore) ListTaskInstance(input *ListTaskInstanceInput) ([]*entity.TaskInstance, error) + func (_m *MockStore) Marshal(obj interface{}) ([]byte, error) + func (_m *MockStore) PatchDagIns(dagIns *entity.DagInstance, mustsPatchFields ...string) error + func (_m *MockStore) PatchTaskIns(taskIns *entity.TaskInstance) error + func (_m *MockStore) Unmarshal(bytes []byte, ptr interface{}) error + func (_m *MockStore) UpdateDag(dag *entity.Dag) error + func (_m *MockStore) UpdateDagIns(dagIns *entity.DagInstance) error + func (_m *MockStore) UpdateTaskIns(taskIns *entity.TaskInstance) error + type MockTaskInfoGetter struct + Depend []string + ID string + Status entity.TaskInstanceStatus + func (_m *MockTaskInfoGetter) GetDepend() []string + func (_m *MockTaskInfoGetter) GetGraphID() string + func (_m *MockTaskInfoGetter) GetID() string + func (_m *MockTaskInfoGetter) GetStatus() entity.TaskInstanceStatus + type Parser interface + EntryTaskIns func(taskIns *entity.TaskInstance) + InitialDagIns func(dagIns *entity.DagInstance) + func GetParser() Parser + type Store interface + BatchCreatTaskIns func(taskIns []*entity.TaskInstance) error + BatchUpdateDagIns func(dagIns []*entity.DagInstance) error + BatchUpdateTaskIns func(taskIns []*entity.TaskInstance) error + CreateDag func(dag *entity.Dag) error + CreateDagIns func(dagIns *entity.DagInstance) error + GetDag func(dagId string) (*entity.Dag, error) + GetDagInstance func(dagInsId string) (*entity.DagInstance, error) + GetTaskIns func(taskIns string) (*entity.TaskInstance, error) + ListDagInstance func(input *ListDagInstanceInput) ([]*entity.DagInstance, error) + ListTaskInstance func(input *ListTaskInstanceInput) ([]*entity.TaskInstance, error) + Marshal func(obj interface{}) ([]byte, error) + PatchDagIns func(dagIns *entity.DagInstance, mustsPatchFields ...string) error + PatchTaskIns func(taskIns *entity.TaskInstance) error + Unmarshal func(bytes []byte, ptr interface{}) error + UpdateDag func(dagIns *entity.Dag) error + UpdateDagIns func(dagIns *entity.DagInstance) error + UpdateTaskIns func(taskIns *entity.TaskInstance) error + func GetStore() Store + type TaskInfoGetter interface + GetDepend func() []string + GetGraphID func() string + GetID func() string + GetStatus func() entity.TaskInstanceStatus + func MapMockTasksToGetter(taskIns []*MockTaskInfoGetter) (ret []TaskInfoGetter) + func MapTaskInsToGetter(taskIns []*entity.TaskInstance) (ret []TaskInfoGetter) + func MapTasksToGetter(taskIns []entity.Task) (ret []TaskInfoGetter) + type TaskNode struct + Status entity.TaskInstanceStatus + TaskInsID string + func BuildRootNode(tasks []TaskInfoGetter) (*TaskNode, error) + func MustBuildRootNode(tasks []TaskInfoGetter) *TaskNode + func NewTaskNodeFromGetter(instance TaskInfoGetter) *TaskNode + func (t *TaskNode) AppendChild(task *TaskNode) + func (t *TaskNode) AppendParent(task *TaskNode) + func (t *TaskNode) CanBeExecuted() bool + func (t *TaskNode) CanExecuteChild() bool + func (t *TaskNode) ComputeStatus() (status TreeStatus, srcTaskInsId string) + func (t *TaskNode) Executable() bool + func (t *TaskNode) GetExecutableTaskIds() (executables []string) + func (t *TaskNode) GetNextTaskIds(completedOrRetryTask *entity.TaskInstance) (executable []string, find bool) + func (t *TaskNode) HasCycle() (cycleStart *TaskNode) + type TaskTree struct + DagIns *entity.DagInstance + Root *TaskNode + type TreeStatus string + const TreeStatusBlocked + const TreeStatusFailed + const TreeStatusRunning + const TreeStatusSuccess