entity

package
v0.0.0-...-aaf3491 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2023 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CommandNameRetry  = "retry"
	CommandNameCancel = "cancel"
)

Variables

View Source
var (
	StoreMarshal   func(interface{}) ([]byte, error)
	StoreUnmarshal func([]byte, interface{}) error
)

Functions

func CtxWithRunningTaskIns

func CtxWithRunningTaskIns(ctx context.Context, task *TaskInstance) context.Context

Types

type ActiveAction

type ActiveAction string
const (
	// skip action when all condition is meet, otherwise execute it
	ActiveActionSkip ActiveAction = "skip"
	// block action when all condition is meet, otherwise execute it
	ActiveActionBlock ActiveAction = "block"
)

type BaseInfo

type BaseInfo struct {
	ID        string `yaml:"id" json:"id" bson:"_id"`
	CreatedAt int64  `yaml:"createdAt" json:"createdAt" bson:"createdAt"`
	UpdatedAt int64  `yaml:"updatedAt" json:"updatedAt" bson:"updatedAt"`
}

BaseInfo

func (*BaseInfo) GetBaseInfo

func (b *BaseInfo) GetBaseInfo() *BaseInfo

GetBaseInfo getter

func (*BaseInfo) Initial

func (b *BaseInfo) Initial()

Initial base info

func (*BaseInfo) Update

func (b *BaseInfo) Update()

Update

type BaseInfoGetter

type BaseInfoGetter interface {
	GetBaseInfo() *BaseInfo
}

BaseInfoGetter

type Check

type Check struct {
	Conditions []TaskCondition `yaml:"conditions,omitempty" json:"conditions,omitempty"  bson:"conditions,omitempty"`
	Act        ActiveAction    `yaml:"act,omitempty" json:"act,omitempty"  bson:"act,omitempty"`
}

Check

func (*Check) IsMeet

func (c *Check) IsMeet(dagIns *DagInstance) bool

IsMeet return if check is meet

func (*Check) Scan

func (d *Check) Scan(value interface{}) error

实现 sql.Scanner 接口,Scan 将 value 扫描至 Task

func (Check) Value

func (d Check) Value() (driver.Value, error)

实现 driver.Valuer 接口,Value 返回 Task value

type Command

type Command struct {
	Name             CommandName
	TargetTaskInsIDs []string
}

Command

func (*Command) Scan

func (d *Command) Scan(value interface{}) error

实现 sql.Scanner 接口,Scan 将 value 扫描至 Jsonb

func (Command) Value

func (d Command) Value() (driver.Value, error)

实现 driver.Valuer 接口,Value 返回 json value

type CommandName

type CommandName string

CommandName

type CtxKey

type CtxKey string
const (
	CtxKeyRunningTaskIns CtxKey = "running-task"
)

type Dag

type Dag struct {
	BaseInfo `yaml:",inline" json:",inline" bson:"inline"`
	Name     string    `yaml:"name,omitempty" json:"name,omitempty" bson:"name,omitempty"`
	Desc     string    `yaml:"desc,omitempty" json:"desc,omitempty" bson:"desc,omitempty"`
	Cron     string    `yaml:"cron,omitempty" json:"cron,omitempty" bson:"cron,omitempty"`
	Vars     DagVars   `yaml:"vars,omitempty" json:"vars,omitempty" bson:"vars,omitempty"`
	Status   DagStatus `yaml:"status,omitempty" json:"status,omitempty" bson:"status,omitempty"`
	Tasks    Tasks     `yaml:"tasks,omitempty" json:"tasks,omitempty" bson:"tasks,omitempty"`
}

Dag

func NewDag

func NewDag() *Dag

NewDag new a dag

func (*Dag) Run

func (d *Dag) Run(trigger Trigger, specVars map[string]string) (*DagInstance, error)

Run used to build a new DagInstance, then you also need save it to Store

type DagInstance

type DagInstance struct {
	BaseInfo  `bson:"inline"`
	DagID     string            `json:"dagId,omitempty" bson:"dagId,omitempty" gorm:"column:dag_id"`
	Trigger   Trigger           `json:"trigger,omitempty" bson:"trigger,omitempty"`
	Worker    string            `json:"worker,omitempty" bson:"worker,omitempty"`
	Vars      DagInstanceVars   `json:"vars,omitempty" bson:"vars,omitempty"`
	ShareData *ShareData        `json:"shareData,omitempty" bson:"shareData,omitempty"`
	Status    DagInstanceStatus `json:"status,omitempty" bson:"status,omitempty"`
	Reason    string            `json:"reason,omitempty" bson:"reason,omitempty"`
	Cmd       *Command          `json:"cmd,omitempty" bson:"cmd,omitempty"`
}

DagInstance

func (*DagInstance) Block

func (dagIns *DagInstance) Block(reason string)

Block the dag instance

func (*DagInstance) CanModifyStatus

func (dagIns *DagInstance) CanModifyStatus() bool

CanChange indicate if the dag instance can modify status

func (*DagInstance) Cancel

func (dagIns *DagInstance) Cancel(taskInsIds []string) error

Cancel a task, it is just set a command, command will execute by Parser

func (*DagInstance) Fail

func (dagIns *DagInstance) Fail(reason string)

Fail the dag instance

func (*DagInstance) Retry

func (dagIns *DagInstance) Retry(taskInsIds []string) error

Retry a task, it is just set a command, command will execute by Parser

func (*DagInstance) Run

func (dagIns *DagInstance) Run()

Success the dag instance

func (*DagInstance) Success

func (dagIns *DagInstance) Success()

Success the dag instance

func (*DagInstance) VarsGetter

func (dagIns *DagInstance) VarsGetter() utils.KeyValueGetter

VarsGetter

func (*DagInstance) VarsIterator

func (dagIns *DagInstance) VarsIterator() utils.KeyValueIterator

VarsIterator

type DagInstanceHookFunc

type DagInstanceHookFunc func(dagIns *DagInstance)

type DagInstanceLifecycleHook

type DagInstanceLifecycleHook struct {
	BeforeRun     DagInstanceHookFunc
	BeforeSuccess DagInstanceHookFunc
	BeforeFail    DagInstanceHookFunc
	BeforeBlock   DagInstanceHookFunc
	BeforeRetry   DagInstanceHookFunc
}

DagInstanceLifecycleHook

var (
	HookDagInstance DagInstanceLifecycleHook
)

type DagInstanceStatus

type DagInstanceStatus string

DagInstanceStatus

const (
	DagInstanceStatusInit      DagInstanceStatus = "init"
	DagInstanceStatusScheduled DagInstanceStatus = "scheduled"
	DagInstanceStatusRunning   DagInstanceStatus = "running"
	DagInstanceStatusBlocked   DagInstanceStatus = "blocked"
	DagInstanceStatusFailed    DagInstanceStatus = "failed"
	DagInstanceStatusSuccess   DagInstanceStatus = "success"
)

type DagInstanceVar

type DagInstanceVar struct {
	Value string `json:"value,omitempty" bson:"value,omitempty"`
}

DagInstanceVar

type DagInstanceVars

type DagInstanceVars map[string]DagInstanceVar

DagInstanceVars

func (DagInstanceVars) Render

func (vars DagInstanceVars) Render(p map[string]interface{}) (map[string]interface{}, error)

Render variables

func (*DagInstanceVars) Scan

func (d *DagInstanceVars) Scan(value interface{}) error

实现 sql.Scanner 接口,Scan 将 value 扫描至 Jsonb

func (DagInstanceVars) Value

func (d DagInstanceVars) Value() (driver.Value, error)

实现 driver.Valuer 接口,Value 返回 json value

type DagStatus

type DagStatus string

DagStatus

const (
	DagStatusNormal  DagStatus = "normal"
	DagStatusStopped DagStatus = "stopped"
)

type DagVar

type DagVar struct {
	Desc         string `yaml:"desc,omitempty" json:"desc,omitempty" bson:"desc,omitempty"`
	DefaultValue string `yaml:"defaultValue,omitempty" json:"defaultValue,omitempty" bson:"defaultValue,omitempty"`
}

DagVar

type DagVars

type DagVars map[string]DagVar

func (*DagVars) Scan

func (d *DagVars) Scan(value interface{}) error

实现 sql.Scanner 接口,Scan 将 value 扫描至 Jsonb

func (DagVars) Value

func (d DagVars) Value() (driver.Value, error)

实现 driver.Valuer 接口,Value 返回 json value

type DependOns

type DependOns []string

func (*DependOns) Scan

func (d *DependOns) Scan(value interface{}) error

实现 sql.Scanner 接口,Scan 将 value 扫描至 Tasks

func (DependOns) Value

func (d DependOns) Value() (driver.Value, error)

实现 driver.Valuer 接口,Value 返回 Tasks value

type MockBaseInfoGetter

type MockBaseInfoGetter struct {
	mock.Mock
}

MockBaseInfoGetter is an autogenerated mock type for the BaseInfoGetter type

func (*MockBaseInfoGetter) GetBaseInfo

func (_m *MockBaseInfoGetter) GetBaseInfo() *BaseInfo

GetBaseInfo provides a mock function with given fields:

type Operator

type Operator string
const (
	OperatorIn    Operator = "in"
	OperatorNotIn Operator = "not-in"
)

type PreChecks

type PreChecks map[string]*Check

func (*PreChecks) Scan

func (d *PreChecks) Scan(value interface{}) error

实现 sql.Scanner 接口,Scan 将 value 扫描至 Task

func (PreChecks) Value

func (d PreChecks) Value() (driver.Value, error)

实现 driver.Valuer 接口,Value 返回 Task value

type ShareData

type ShareData struct {
	Dict map[string]string
	Save func(data *ShareData) error
	// contains filtered or unexported fields
}

ShareData can read/write within all tasks and will persist it if you want a high performance just within same task, you can use ExecuteContext's Context

func (*ShareData) Get

func (d *ShareData) Get(key string) (string, bool)

Get value from share data, it is thread-safe.

func (*ShareData) MarshalBSON

func (d *ShareData) MarshalBSON() ([]byte, error)

MarshalBSON used by mongo

func (*ShareData) MarshalJSON

func (d *ShareData) MarshalJSON() ([]byte, error)

MarshalJSON used by json

func (*ShareData) Scan

func (d *ShareData) Scan(value interface{}) error

实现 sql.Scanner 接口,Scan 将 value 扫描至 Jsonb

func (*ShareData) Set

func (d *ShareData) Set(key string, val string)

Set value to share data, it is thread-safe.

func (*ShareData) UnmarshalBSON

func (d *ShareData) UnmarshalBSON(data []byte) error

UnmarshalBSON used by mongo

func (*ShareData) UnmarshalJSON

func (d *ShareData) UnmarshalJSON(data []byte) error

UnmarshalJSON used by json

func (*ShareData) Value

func (d *ShareData) Value() (driver.Value, error)

实现 driver.Valuer 接口,Value 返回 json value

type SpecifiedVar

type SpecifiedVar struct {
	Name  string
	Value string
}

SpecifiedVar

type Task

type Task struct {
	ID          string                 `yaml:"id,omitempty" json:"id,omitempty"  bson:"id,omitempty"`
	Name        string                 `yaml:"name,omitempty" json:"name,omitempty"  bson:"name,omitempty"`
	DependOn    []string               `yaml:"dependOn,omitempty" json:"dependOn,omitempty"  bson:"dependOn,omitempty"`
	ActionName  string                 `yaml:"actionName,omitempty" json:"actionName,omitempty"  bson:"actionName,omitempty"`
	TimeoutSecs int                    `yaml:"timeoutSecs,omitempty" json:"timeoutSecs,omitempty"  bson:"timeoutSecs,omitempty"`
	Params      map[string]interface{} `yaml:"params,omitempty" json:"params,omitempty"  bson:"params,omitempty"`
	PreChecks   PreChecks              `yaml:"preCheck,omitempty" json:"preCheck,omitempty"  bson:"preCheck,omitempty"`
}

Task

func (*Task) GetDepend

func (t *Task) GetDepend() []string

GetDepend

func (*Task) GetGraphID

func (t *Task) GetGraphID() string

GetGraphID

func (*Task) GetID

func (t *Task) GetID() string

GetID

func (*Task) GetStatus

func (t *Task) GetStatus() TaskInstanceStatus

GetStatus

func (*Task) Scan

func (d *Task) Scan(value interface{}) error

实现 sql.Scanner 接口,Scan 将 value 扫描至 Task

func (Task) Value

func (d Task) Value() (driver.Value, error)

实现 driver.Valuer 接口,Value 返回 Task value

type TaskCondition

type TaskCondition struct {
	Source TaskConditionSource `yaml:"source,omitempty" json:"source,omitempty"  bson:"source,omitempty"`
	Key    string              `yaml:"key,omitempty" json:"key,omitempty"  bson:"key,omitempty"`
	Values []string            `yaml:"values,omitempty" json:"values,omitempty"  bson:"values,omitempty"`
	Op     Operator            `yaml:"op,omitempty" json:"op,omitempty"  bson:"op,omitempty"`
}

TaskCondition

func (*TaskCondition) IsMeet

func (c *TaskCondition) IsMeet(dagIns *DagInstance) bool

IsMeet return if check is meet

type TaskConditionSource

type TaskConditionSource string
const (
	TaskConditionSourceVars      TaskConditionSource = "vars"
	TaskConditionSourceShareData TaskConditionSource = "share-data"
)

func (TaskConditionSource) BuildKvGetter

func (t TaskConditionSource) BuildKvGetter(dagIns *DagInstance) utils.KeyValueGetter

BuildKvGetter

type TaskInstance

type TaskInstance struct {
	BaseInfo `bson:"inline"`
	// Task's Id it should be unique in a dag instance
	TaskID      string             `json:"taskId,omitempty" bson:"taskId,omitempty"`
	DagInsID    string             `json:"dagInsId,omitempty" bson:"dagInsId,omitempty"`
	Name        string             `json:"name,omitempty" bson:"name,omitempty"`
	DependOn    DependOns          `json:"dependOn,omitempty" bson:"dependOn,omitempty"`
	ActionName  string             `json:"actionName,omitempty" bson:"actionName,omitempty"`
	TimeoutSecs int                `json:"timeoutSecs" bson:"timeoutSecs"`
	Params      datatypes.JSONMap  `json:"params,omitempty" bson:"params,omitempty"`
	Traces      TraceInfos         `json:"traces,omitempty" bson:"traces,omitempty"`
	Status      TaskInstanceStatus `json:"status,omitempty" bson:"status,omitempty"`
	Reason      string             `json:"reason,omitempty" bson:"reason,omitempty"`
	PreChecks   PreChecks          `json:"preChecks,omitempty"  bson:"preChecks,omitempty"`

	// used to save changes
	Patch              func(*TaskInstance) error `json:"-" bson:"-" gorm:"-"`
	Context            run.ExecuteContext        `json:"-" bson:"-" gorm:"-"`
	RelatedDagInstance *DagInstance              `json:"-" bson:"-" gorm:"-"`
	// contains filtered or unexported fields
}

TaskInstance

func CtxRunningTaskIns

func CtxRunningTaskIns(ctx context.Context) (*TaskInstance, bool)

func NewTaskInstance

func NewTaskInstance(dagInsId string, t Task) *TaskInstance

NewTaskInstance

func (*TaskInstance) DoPreCheck

func (t *TaskInstance) DoPreCheck(dagIns *DagInstance) (isActive bool, err error)

DoPreCheck

func (*TaskInstance) GetDepend

func (t *TaskInstance) GetDepend() []string

GetDepend

func (*TaskInstance) GetGraphID

func (t *TaskInstance) GetGraphID() string

GetGraphID

func (*TaskInstance) GetID

func (t *TaskInstance) GetID() string

GetID

func (*TaskInstance) GetStatus

func (t *TaskInstance) GetStatus() TaskInstanceStatus

GetStatus

func (*TaskInstance) InitialDep

func (t *TaskInstance) InitialDep(ctx run.ExecuteContext, patch func(*TaskInstance) error, dagIns *DagInstance)

InitialDep

func (*TaskInstance) Run

func (t *TaskInstance) Run(params interface{}, act run.Action) (err error)

Run action

func (*TaskInstance) SetStatus

func (t *TaskInstance) SetStatus(s TaskInstanceStatus) error

SetStatus will persist task instance

func (*TaskInstance) Trace

func (t *TaskInstance) Trace(msg string, ops ...run.TraceOp)

Trace info

type TaskInstanceStatus

type TaskInstanceStatus string

TaskInstanceStatus

const (
	TaskInstanceStatusInit     TaskInstanceStatus = "init"
	TaskInstanceStatusCanceled TaskInstanceStatus = "canceled"
	TaskInstanceStatusRunning  TaskInstanceStatus = "running"
	TaskInstanceStatusEnding   TaskInstanceStatus = "ending"
	TaskInstanceStatusFailed   TaskInstanceStatus = "failed"
	TaskInstanceStatusRetrying TaskInstanceStatus = "retrying"
	TaskInstanceStatusSuccess  TaskInstanceStatus = "success"
	TaskInstanceStatusBlocked  TaskInstanceStatus = "blocked"
	TaskInstanceStatusSkipped  TaskInstanceStatus = "skipped"
)

type Tasks

type Tasks []Task

func (*Tasks) Scan

func (d *Tasks) Scan(value interface{}) error

实现 sql.Scanner 接口,Scan 将 value 扫描至 Tasks

func (Tasks) Value

func (d Tasks) Value() (driver.Value, error)

实现 driver.Valuer 接口,Value 返回 Tasks value

type TraceInfo

type TraceInfo struct {
	Time    int64  `json:"time,omitempty" bson:"time,omitempty"`
	Message string `json:"message,omitempty" bson:"message,omitempty"`
}

TraceInfo

type TraceInfos

type TraceInfos []TraceInfo

func (*TraceInfos) Scan

func (d *TraceInfos) Scan(value interface{}) error

实现 sql.Scanner 接口,Scan 将 value 扫描至 Tasks

func (TraceInfos) Value

func (d TraceInfos) Value() (driver.Value, error)

实现 driver.Valuer 接口,Value 返回 Tasks value

type Trigger

type Trigger string

Trigger

const (
	TriggerManually Trigger = "manually"
	TriggerCron     Trigger = "cron"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL