job

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 8, 2022 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JobsKeyPrefix   = "/juno/cronjob/job/"    // job prefix
	OnceKeyPrefix   = "/juno/cronjob/once/"   // job that run immediately
	LockKeyPrefix   = "/juno/cronjob/lock/"   // job lock (only for single-node mode job)
	ProcKeyPrefix   = "/juno/cronjob/proc/"   // running process
	ResultKeyPrefix = "/juno/cronjob/result/" // task result (logs and status)
)
View Source
const (
	TypeNormal = 0 // 运行各节点都能运行任务
	TypeAlone  = 1 // 同一时间只允许一个节点一个任务运行
)

Variables

This section is empty.

Functions

func GetCurrentDirectory

func GetCurrentDirectory() string

func GetIDFromKey

func GetIDFromKey(key string) string

从 etcd 的 key 中取 id

func NewEtcdTimeoutContext

func NewEtcdTimeoutContext(w *Worker) (context.Context, context.CancelFunc)

NewEtcdTimeoutContext return a new etcdTimeoutContext

Types

type Cmd

type Cmd struct {
	*Job
	*Timer
	// contains filtered or unexported fields
}

func (*Cmd) GetID

func (c *Cmd) GetID() string

func (*Cmd) Run

func (c *Cmd) Run() error

type Config

type Config struct {
	Enable bool

	EtcdConfigKey   string // jupiter.etcdv3.xxxxxx
	ReqTimeout      int    // 请求操作ETCD的超时时间,单位秒
	RequireLockTime int64  // 抢锁等待时间,单位秒

	HostName string
	AppIP    string
	// contains filtered or unexported fields
}

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig ...

func StdConfig

func StdConfig(key string) *Config

StdConfig returns standard configuration information

func (*Config) Build

func (c *Config) Build() *Worker

Build new a instance

type Cron

type Cron struct {
	*Worker
	*cron.Cron
	// contains filtered or unexported fields
}

Cron ...

func (*Cron) AddFunc

func (c *Cron) AddFunc(spec string, cmd func() error) (EntryID, error)

AddFunc ...

func (*Cron) AddJob

func (c *Cron) AddJob(spec string, cmd NamedJob) (EntryID, error)

AddJob ...

func (*Cron) Remove

func (c *Cron) Remove(id EntryID)

Remove an entry from being run in the future.

func (*Cron) Run

func (c *Cron) Run()

Run ...

func (*Cron) Schedule

func (c *Cron) Schedule(schedule Schedule, job NamedJob) EntryID

Schedule ...

func (*Cron) Stop

func (c *Cron) Stop() error

Stop ...

type CronJob

type CronJob = cron.Job

Job ...

type CronTaskStatus

type CronTaskStatus string
var (
	CronTaskStatusProcessing CronTaskStatus = "processing"
	CronTaskStatusSuccess    CronTaskStatus = "success"
	CronTaskStatusFailed     CronTaskStatus = "failed"
	CronTaskStatusTimeout    CronTaskStatus = "timeout"
)

type EntryID

type EntryID = cron.EntryID

EntryID ...

type FuncJob

type FuncJob func() error

FuncJob ...

func (FuncJob) Name

func (f FuncJob) Name() string

Name ...

func (FuncJob) Run

func (f FuncJob) Run() error

Run ...

type Job

type Job struct {
	ID      string   `json:"id"`
	Name    string   `json:"name"`
	Script  string   `json:"script"`
	Timers  []*Timer `json:"timers"`
	Enable  bool     `json:"enable"`  // 可手工控制的状态
	Timeout int64    `json:"timeout"` // 单位时间秒,任务执行时间超时设置,大于 0 时有效
	Env     string   `json:"env"`
	Zone    string   `json:"zone"`
	Nodes   []string `json:"nodes"`

	// 执行任务失败重试次数
	// 默认为 0,不重试
	RetryCount int `json:"retry_count"`

	// 执行任务失败重试时间间隔
	// 单位秒,如果不大于 0 则马上重试
	RetryInterval int `json:"retry_interval"`

	// 任务类型
	// 0: 普通任务,各节点均可运行
	// 1: 单机任务,同时只能单节点在线
	JobType int `json:"job_type"`

	// 用于访问etcd
	*Worker `json:"-"`
	// contains filtered or unexported fields
}

需要执行的 cron cmd 命令 注册到 /cronsun/cmd/<id>

func (*Job) Cmds

func (j *Job) Cmds() (cmds map[string]*Cmd)

func (*Job) Lock

func (j *Job) Lock() error

func (*Job) Run

func (j *Job) Run(taskOptions ...TaskOption) error

func (*Job) RunWithRecovery

func (j *Job) RunWithRecovery()

func (*Job) Unlock

func (j *Job) Unlock()

func (*Job) ValidRules

func (j *Job) ValidRules() error

type JobWrapper

type JobWrapper = cron.JobWrapper

JobWrapper ...

type Jobs

type Jobs map[string]*Job

type NamedJob

type NamedJob interface {
	Run() error
}

NamedJob ..

type OnceJob

type OnceJob struct {
	Job

	TaskID uint64 `json:"task_id"`
}

单次任务

func (*OnceJob) RunWithRecovery

func (o *OnceJob) RunWithRecovery(taskOptions ...TaskOption)

type Process

type Process struct {

	// parse from key path
	ID     string `json:"id"` // pid
	JobID  string `json:"jobId"`
	NodeID string `json:"nodeId"`
	TaskID uint64 `json:"task_id"`
	// parse from value
	ProcessVal
	// contains filtered or unexported fields
}

当前执行中的任务信息 key: /{etcd_prefix}/jobId/taskId/node/pid value: 开始执行时间 key 会自动过期,防止进程意外退出后没有清除相关 key,过期时间可配置

func GetProcFromKey

func GetProcFromKey(key string) (proc *Process, err error)

func (*Process) Key

func (p *Process) Key() string

key: /{etcd_prefix}/jobId/taskId/node/pid

func (*Process) Start

func (p *Process) Start(job *Job)

func (*Process) Stop

func (p *Process) Stop(job *Job)

func (*Process) Val

func (p *Process) Val() (string, error)

type ProcessVal

type ProcessVal struct {
	Time   time.Time `json:"time"`   // 开始执行时间
	Killed bool      `json:"killed"` // 是否强制杀死
}

type Schedule

type Schedule = cron.Schedule

Schedule ...

type Task

type Task struct {
	TaskID uint64
	// contains filtered or unexported fields
}

func NewTask

func NewTask(job *Job, ops ...TaskOption) *Task

func (*Task) Key

func (t *Task) Key() string

func (*Task) SetStatus

func (t *Task) SetStatus(status CronTaskStatus, logs string) error

func (*Task) Stop

func (t *Task) Stop()

type TaskOption

type TaskOption func(t *Task)

func WithTaskID

func WithTaskID(taskId uint64) TaskOption

type TaskResult

type TaskResult struct {
	TaskID     uint64         `json:"task_id"`
	Status     CronTaskStatus `json:"status"`
	Job        *Job           `json:"job"`
	Logs       string         `json:"logs"`
	RunOn      string         `json:"run_on"`
	ExecutedAt time.Time      `json:"executed_at"`
	FinishedAt *time.Time     `json:"finished_at"`
}

type Timer

type Timer struct {
	ID   string `json:"id"`
	Cron string `json:"timer"`

	Schedule Schedule `json:"-"`
}

func (*Timer) Valid

func (rule *Timer) Valid() error

验证 timer 字段

type Worker added in v0.1.2

type Worker struct {
	*Config
	*etcdv3.Client
	*Cron

	ID             string
	ImmediatelyRun bool // 是否立即执行
	// contains filtered or unexported fields
}

Node 执行 cron 命令服务的结构体

func NewWorker

func NewWorker(conf *Config) (w *Worker)

func (*Worker) CleanJobs added in v0.1.2

func (w *Worker) CleanJobs()

func (*Worker) GetJobContentFromKv added in v0.1.2

func (w *Worker) GetJobContentFromKv(key []byte, value []byte) (*Job, error)

func (*Worker) GetOnceJobFromKv added in v0.1.2

func (w *Worker) GetOnceJobFromKv(key []byte, value []byte) (*OnceJob, error)

func (*Worker) KillExecutingProc added in v0.1.2

func (w *Worker) KillExecutingProc(process *Process)

func (*Worker) Run added in v0.1.2

func (w *Worker) Run() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL