job

package
v3.6.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2018 License: LGPL-3.0 Imports: 29 Imported by: 18

README

JOB设计说明

节点执行master发送的job任务。job任务分为两类:

任务执行完成如果返回格式化数据,解析数据并写回master节点进行处理。

Documentation

Index

Constants

View Source
const (
	KindCommon   = iota
	KindAlone    // 任何时间段只允许单机执行
	KindInterval // 一个任务执行间隔内允许执行一次
)
View Source
const (
	DefaultJobGroup = "default"
)

Variables

This section is empty.

Functions

func CreateExecutionRecord

func CreateExecutionRecord(j *Job, t time.Time, rs string, success bool)

CreateExecutionRecord 创建存储记录

func CreateJobKey

func CreateJobKey(id string) string

CreateJobKey JobKey

func DeleteJob

func DeleteJob(hash string) error

DeleteJob delete job

func Exit

func Exit(i interface{})

func GetIDFromKey

func GetIDFromKey(key string) string

GetIDFromKey 从 etcd 的 key 中取 id

func GetJobs

func GetJobs(node *model.HostNode) (jobs map[string]*Job, err error)

GetJobs 获取当前节点jobs

func PutJob

func PutJob(j *Job) error

PutJob 添加获取更新job

func Reload

func Reload(i interface{})

func SSHClient

func SSHClient(hostport string, username string) (*ssh.Client, error)

func SSHClientTo

func SSHClientTo(hostport string, username, password string) (*ssh.Client, error)

func StartProc

func StartProc() error

维持 lease id 服务

func UnifiedLogin

func UnifiedLogin(login *model.Login) (*ssh.Client, error)

func WatchJobs

func WatchJobs() client.WatchChan

WatchJobs watch jobs

Types

type Cmd

type Cmd struct {
	*Job
	*Rule
}

Cmd 可执行任务

func (*Cmd) GetID

func (c *Cmd) GetID() string

GetID GetID

func (*Cmd) Run

func (c *Cmd) Run()

Run 执行

type ExecutionRecord

type ExecutionRecord struct {
	ID         string    `json:"id"`
	JobID      string    `json:"job_id"` // 任务 Id,索引
	TaskID     string    `json:"task_id"`
	User       string    `json:"user"`              // 执行此次任务的用户
	Name       string    `json:"name"`              // 任务名称
	Node       string    `json:"node"`              // 运行此次任务的节点 ip,索引
	Command    string    `json:"command,omitempty"` // 执行的命令,包括参数
	Output     string    `json:"output"`            // 任务输出的所有内容
	Success    bool      `json:"success"`           // 是否执行成功
	BeginTime  time.Time `json:"beginTime"`         // 任务开始执行时间,精确到毫秒,索引
	EndTime    time.Time `json:"endTime"`           // 任务执行完毕时间,精确到毫秒
	IsHandle   bool      `json:"is_handle"`         //是否已经处理
	HandleTime time.Time `json:"handle_time"`       //处理时间
}

ExecutionRecord 任务执行记录

func GetExecutionRecordByID

func GetExecutionRecordByID(id string) (l *ExecutionRecord, err error)

GetExecutionRecordByID 获取执行记录

func GetJobExecutionRecords

func GetJobExecutionRecords(JobID string) ([]*ExecutionRecord, error)

GetJobExecutionRecords 获取某个任务执行记录

func ParseExecutionRecord

func ParseExecutionRecord(body []byte) (e ExecutionRecord)

ParseExecutionRecord 解析

func (ExecutionRecord) CompleteHandle

func (e ExecutionRecord) CompleteHandle()

CompleteHandle 完成处理记录 master节点处理完成后调用

func (ExecutionRecord) IsHandleRight

func (e ExecutionRecord) IsHandleRight() bool

IsHandleRight 是否具有处理结果权限

func (ExecutionRecord) String

func (e ExecutionRecord) String() string

type Job

type Job struct {
	ID      string   `json:"id"`
	TaskID  string   `json:"taskID"`
	EventID string   `json:"event_id"`
	NodeID  string   `json:"node_id"`
	Hash    string   `json:"hash"`
	Name    string   `json:"name"`
	Command string   `json:"cmd"`
	Stdin   string   `json:"stdin"`
	Envs    []string `json:"envs"`
	User    string   `json:"user"`
	//rules 为nil 即当前任务是一次任务
	Rules   *Rule `json:"rule"`
	Pause   bool  `json:"pause"`   // 可手工控制的状态
	Timeout int64 `json:"timeout"` // 任务执行时间超时设置,大于 0 时有效
	// 执行任务失败重试次数
	// 默认为 0,不重试
	Retry int `json:"retry"`
	// 执行任务失败重试时间间隔
	// 单位秒,如果不大于 0 则马上重试
	Interval int `json:"interval"`
	// 任务类型
	// 0: 单次任务
	// 1: 循环任务
	Kind int `json:"kind"`
	// 平均执行时间,单位 ms
	AvgTime int64 `json:"avg_time"`

	// 控制同时执行任务数
	Count     *int64 `json:"-"`
	Scheduler *Scheduler
	RunStatus *RunStatus
	// contains filtered or unexported fields
}

Job 需要执行的任务

func CreateJobFromTask

func CreateJobFromTask(task *model.Task) (*Job, error)

CreateJobFromTask 从task创建job

func GetJob

func GetJob(id string) (job *Job, err error)

GetJob get job

func GetJobAndRev

func GetJobAndRev(id string) (job *Job, rev int64, err error)

GetJobAndRev get job

func GetJobFromKv

func GetJobFromKv(kv *mvccpb.KeyValue) (job *Job, err error)

GetJobFromKv Create job from etcd value

func (*Job) Avg

func (j *Job) Avg(t, et time.Time)

func (*Job) Check

func (j *Job) Check() error

Check

func (*Job) Cmds

func (j *Job) Cmds(node *model.HostNode) (cmds map[string]*Cmd)

Cmds 根据执行策略 创建 cmd

func (*Job) CountRunning

func (j *Job) CountRunning() (int64, error)

CountRunning 获取结点正在执行任务的数量

func (*Job) Decode

func (j *Job) Decode(data []byte) error

Decode Decode

func (*Job) Fail

func (j *Job) Fail(t time.Time, msg string)

func (Job) IsRunOn

func (j Job) IsRunOn(node *model.HostNode) bool

IsRunOn 是否在本节点执行

func (*Job) Key

func (j *Job) Key() string

Key Key

func (*Job) Notify

func (j *Job) Notify(t time.Time, msg string)

func (*Job) ResolveShell

func (j *Job) ResolveShell() error

ResolveShell ResolveShell

func (*Job) Run

func (j *Job) Run(nid string) bool

Run 执行任务

func (*Job) RunBuildInWithRecovery

func (j *Job) RunBuildInWithRecovery(nid string)

RunBuildInWithRecovery run build should delete

func (*Job) RunWithRecovery

func (j *Job) RunWithRecovery()

RunWithRecovery 执行任务,并捕获异常

func (*Job) ShortName

func (j *Job) ShortName() string

ShortName ShortName

func (*Job) String

func (j *Job) String() string

func (*Job) Success

func (j *Job) Success(t time.Time, out string)

Success 记录执行结果

func (*Job) Valid

func (j *Job) Valid() error

Valid 安全选项验证

func (*Job) ValidRules

func (j *Job) ValidRules() error

ValidRules ValidRules

type Process

type Process struct {
	ID     string    `json:"id"` // pid
	JobID  string    `json:"jobId"`
	Group  string    `json:"group"`
	NodeID string    `json:"nodeId"`
	Time   time.Time `json:"time"` // 开始执行时间
	// contains filtered or unexported fields
}

当前执行中的任务信息 key: /cronsun/proc/node/group/jobId/pid value: 开始执行时间 key 会自动过期,防止进程意外退出后没有清除相关 key,过期时间可配置

func GetProcFromKey

func GetProcFromKey(key string) (proc *Process, err error)

func (*Process) Key

func (p *Process) Key() string

func (*Process) Start

func (p *Process) Start()

func (*Process) Stop

func (p *Process) Stop()

func (*Process) Val

func (p *Process) Val() string

type Rule

type Rule struct {
	ID       string            `json:"id"`
	Mode     RuleMode          `json:"mode"` //once,
	Timer    string            `json:"timer"`
	Labels   map[string]string `json:"labels"`
	Schedule cron.Schedule     `json:"-"`
}

Rule 任务规则

func (*Rule) Valid

func (j *Rule) Valid() error

Valid 验证 timer 字段,创建Schedule

type RuleMode

type RuleMode string

RuleMode RuleMode

var Cycle RuleMode = "cycle"

Cycle 循环运行

var ManyOnce RuleMode = "manyonce"

ManyOnce 多次运行

var OnlyOnce RuleMode = "onlyonce"

OnlyOnce 只能一次

type RunStatus

type RunStatus struct {
	Status    string    `json:"status"`
	StartTime time.Time `json:"start_time"`
	EndTime   time.Time `json:"end_time"`
	RecordID  string    `json:"record_id"`
}

RunStatus job run status

type Scheduler

type Scheduler struct {
	NodeID          string    `json:"node_id"`
	SchedulerTime   time.Time `json:"scheduler_time"`
	CanRun          bool      `json:"can_run"`
	Message         string    `json:"message"`
	SchedulerStatus string    `json:"scheduler_status"`
}

Scheduler 调度信息

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL