cronsun

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2018 License: Apache-2.0 Imports: 28 Imported by: 0

README

cronsun Build Status

cronsun is a distributed cron-style job system. It's similar with crontab on stand-alone *nix.

简体中文

Purpose

The goal of this project is to make it much easier to manage jobs on lots of machines and provides high availability. cronsun is different from Azkaban, Chronos, Airflow.

Features

  • Easy manage jobs on multiple machines
  • Managemant panel
  • Mail service
  • Multi-language support
  • Simple authentication and accounts manager(default administrator email and password: admin@admin.com/admin)

Status

cronsun has been tested in production for years on hundreds of servers. Although the current version is not release as an stable version, but we think it is completely available for the production environment. We encourage you to try it, it's easy to use, see how it works for you. We believe you will like this tool.

Architecture

                                                [web]
                                                  |
                                     --------------------------
           (add/del/update/exec jobs)|                        |(query job exec result)
                                   [etcd]                 [mongodb]
                                     |                        ^
                            --------------------              |
                            |        |         |              |
                         [node.1]  [node.2]  [node.n]         |
             (job exec fail)|        |         |              |
          [send mail]<-----------------------------------------(job exec result)

Security

cronsun support security with security.json config. When open=true, job command is only allow local files with special extension on the node.

{
    "open": true,
    "#users": "allowed execution users",
    "users": [
        "www", "db"
    ],
    "#ext": "allowed execution file extensions",
    "ext": [
        ".cron.sh", ".cron.py"
    ]
}

Getting started

Setup / installation

Install from binary latest release

Or build from source (feature/glide), require go >= 1.9+, glide

go get -u github.com/shunfei/cronsun
cd $GOPATH/src/github.com/shunfei/cronsun
git checkout feature/glide
glide update
sh build.sh
Run
  1. Install MongoDB
  2. Install etcd3
  3. Open and update Etcd(conf/etcd.json) and MongoDB(conf/db.json) configurations
  4. Start cronnode: ./cronnode -conf conf/base.json, start cronweb: ./cronweb -conf conf/base.json
  5. Open http://127.0.0.1:7079 in browser

Screenshot

Brief:

Exec result:

Job:

Node:

Credits

cron is base on robfig/cron

Documentation

Index

Constants

View Source
const (
	KindCommon   = iota
	KindAlone    // 任何时间段只允许单机执行
	KindInterval // 一个任务执行间隔内允许执行一次
)
View Source
const (
	Coll_JobLog       = "job_log"
	Coll_JobLatestLog = "job_latest_log"
	Coll_Stat         = "stat"
)
View Source
const (
	Coll_Account = "account"
)
View Source
const (
	Coll_Node = "node"
)
View Source
const (
	DefaultJobGroup = "default"
)
View Source
const VersionNumber = "0.3.1"

Variables

View Source
var (
	ErrNotFound        = errors.New("Record not found.")
	ErrValueMayChanged = errors.New("The value has been changed by others on this time.")

	ErrEmptyJobName        = errors.New("Name of job is empty.")
	ErrEmptyJobCommand     = errors.New("Command of job is empty.")
	ErrIllegalJobId        = errors.New("Invalid id that includes illegal characters such as '/'.")
	ErrIllegalJobGroupName = errors.New("Invalid job group name that includes illegal characters such as '/'.")

	ErrEmptyNodeGroupName = errors.New("Name of node group is empty.")
	ErrIllegalNodeGroupId = errors.New("Invalid node group id that includes illegal characters such as '/'.")

	ErrSecurityInvalidCmd  = errors.New("Security error: the suffix of script file is not on the whitelist.")
	ErrSecurityInvalidUser = errors.New("Security error: the user is not on the whitelist.")
	ErrNilRule             = errors.New("invalid job rule, empty timer.")
)
View Source
var (
	Version = fmt.Sprintf("v%s (build %s)", VersionNumber, runtime.Version())
)

Functions

func BanAccount added in v0.2.1

func BanAccount(email string) error

func CreateAccount added in v0.2.1

func CreateAccount(u *Account) error

func CreateJobLog

func CreateJobLog(j *Job, t time.Time, rs string, success bool)

func DeleteGroupById

func DeleteGroupById(id string) (*client.DeleteResponse, error)

func DeleteJob

func DeleteJob(group, id string) (resp *client.DeleteResponse, err error)

func EnsureAccountIndex added in v0.2.1

func EnsureAccountIndex() error

func Exit

func Exit(i interface{})

func GetDb

func GetDb() *db.Mdb

func GetGroups

func GetGroups(nid string) (groups map[string]*Group, err error)

GetGroups 获取包含 nid 的 group 如果 nid 为空,则获取所有的 group

func GetIDFromKey

func GetIDFromKey(key string) string

从 etcd 的 key 中取 id

func GetJobLatestLogListByJobIds

func GetJobLatestLogListByJobIds(jobIds []string) (m map[string]*JobLatestLog, err error)

func GetJobs

func GetJobs() (jobs map[string]*Job, err error)

func GroupKey

func GroupKey(id string) string

func ISNodeAlive added in v0.2.2

func ISNodeAlive(id string) (bool, error)

func Init

func Init(baseConfFile string, watchConfiFile bool) (err error)

func IsValidAsKeyPath

func IsValidAsKeyPath(s string) bool

func JobKey

func JobKey(group, id string) string

func NewEtcdTimeoutContext added in v0.3.1

func NewEtcdTimeoutContext(c *Client) (context.Context, context.CancelFunc)

NewEtcdTimeoutContext return a new etcdTimeoutContext

func NextID

func NextID() string

func PutOnce

func PutOnce(group, jobID, nodeID string) error

马上执行 job 任务 注册到 /cronsun/once/group/<jobID> value 若执行单个结点,则值为 NodeID 若 job 所在的结点都需执行,则值为空 ""

func Reload

func Reload(i interface{})

func RemoveNode added in v0.2.2

func RemoveNode(query interface{}) error

func StartNoticer

func StartNoticer(n Noticer)

func StartProc

func StartProc() error

维持 lease id 服务

func UpdateAccount added in v0.2.1

func UpdateAccount(query bson.M, change bson.M) error

func WatchGroups

func WatchGroups() client.WatchChan

func WatchJobs

func WatchJobs() client.WatchChan

func WatchNode

func WatchNode() client.WatchChan

func WatchOnce

func WatchOnce() client.WatchChan

Types

type Account added in v0.2.1

type Account struct {
	ID       bson.ObjectId `bson:"_id" json:"id"`
	Role     Role          `bson:"role" json:"role"`
	Email    string        `bson:"email" json:"email"`
	Password string        `bson:"password" json:"password"`
	Salt     string        `bson:"salt" json:"salt"`
	Status   UserStatus    `bson:"status" json:"status"`
	Session  string        `bson:"session" json:"-"`
	// If true, role and status are unchangeable, email and password can be change by it self only.
	Unchangeable bool      `bson:"unchangeable" json:"-"`
	CreateTime   time.Time `bson:"createTime" json:"createTime"`
}

func GetAccountByEmail added in v0.2.1

func GetAccountByEmail(email string) (u *Account, err error)

func GetAccounts added in v0.2.1

func GetAccounts(query bson.M) (list []Account, err error)

type Client

type Client struct {
	*client.Client
	// contains filtered or unexported fields
}
var (
	DefalutClient *Client
)

func NewClient

func NewClient(cfg *conf.Conf) (c *Client, err error)

func (*Client) DelLock

func (c *Client) DelLock(key string) error

func (*Client) Delete

func (c *Client) Delete(key string, opts ...client.OpOption) (*client.DeleteResponse, error)

func (*Client) Get

func (c *Client) Get(key string, opts ...client.OpOption) (*client.GetResponse, error)

func (*Client) GetLock

func (c *Client) GetLock(key string, id client.LeaseID) (bool, error)

func (*Client) Grant

func (c *Client) Grant(ttl int64) (*client.LeaseGrantResponse, error)

func (*Client) KeepAliveOnce

func (c *Client) KeepAliveOnce(id client.LeaseID) (*client.LeaseKeepAliveResponse, error)

func (*Client) Put

func (c *Client) Put(key, val string, opts ...client.OpOption) (*client.PutResponse, error)

func (*Client) PutWithModRev

func (c *Client) PutWithModRev(key, val string, rev int64) (*client.PutResponse, error)

func (*Client) Revoke added in v0.3.1

func (*Client) Watch

func (c *Client) Watch(key string, opts ...client.OpOption) client.WatchChan

type Cmd

type Cmd struct {
	*Job
	*JobRule
}

func (*Cmd) GetID

func (c *Cmd) GetID() string

func (*Cmd) Run

func (c *Cmd) Run()

type Group

type Group struct {
	ID   string `json:"id"`
	Name string `json:"name"`

	NodeIDs []string `json:"nids"`
}

结点类型分组 注册到 /cronsun/group/<id>

func GetGroupById

func GetGroupById(gid string) (g *Group, err error)

func GetGroupFromKv

func GetGroupFromKv(key, value []byte) (g *Group, err error)

func GetNodeGroups

func GetNodeGroups() (list []*Group, err error)

func (*Group) Check

func (g *Group) Check() error

func (*Group) Included

func (g *Group) Included(nid string) bool

func (*Group) Key

func (g *Group) Key() string

func (*Group) Put

func (g *Group) Put(modRev int64) (*client.PutResponse, error)

type HttpAPI

type HttpAPI struct{}

func (*HttpAPI) Send

func (h *HttpAPI) Send(msg *Message)

func (*HttpAPI) Serve

func (h *HttpAPI) Serve()

type Job

type Job struct {
	ID      string     `json:"id"`
	Name    string     `json:"name"`
	Group   string     `json:"group"`
	Command string     `json:"cmd"`
	User    string     `json:"user"`
	Rules   []*JobRule `json:"rules"`
	Pause   bool       `json:"pause"`   // 可手工控制的状态
	Timeout int64      `json:"timeout"` // 任务执行时间超时设置,大于 0 时有效
	// 设置任务在单个节点上可以同时允许多少个
	// 针对两次任务执行间隔比任务执行时间要长的任务启用
	Parallels int64 `json:"parallels"`
	// 执行任务失败重试次数
	// 默认为 0,不重试
	Retry int `json:"retry"`
	// 执行任务失败重试时间间隔
	// 单位秒,如果不大于 0 则马上重试
	Interval int `json:"interval"`
	// 任务类型
	// 0: 普通任务
	// 1: 单机任务
	// 如果为单机任务,node 加载任务的时候 Parallels 设置 1
	Kind int `json:"kind"`
	// 平均执行时间,单位 ms
	AvgTime int64 `json:"avg_time"`
	// 执行失败发送通知
	FailNotify bool `json:"fail_notify"`
	// 发送通知地址
	To []string `json:"to"`
	// 单独对任务指定日志清除时间
	LogExpiration int `json:"log_expiration"`

	// 控制同时执行任务数
	Count *int64 `json:"-"`
	// contains filtered or unexported fields
}

需要执行的 cron cmd 命令 注册到 /cronsun/cmd/groupName/<id>

func GetJob

func GetJob(group, id string) (job *Job, err error)

Note: this function did't check the job.

func GetJobAndRev

func GetJobAndRev(group, id string) (job *Job, rev int64, err error)

func GetJobFromKv

func GetJobFromKv(key, value []byte) (job *Job, err error)

func (*Job) Avg

func (j *Job) Avg(t, et time.Time)

func (*Job) Check

func (j *Job) Check() error

func (*Job) Cmds

func (j *Job) Cmds(nid string, gs map[string]*Group) (cmds map[string]*Cmd)

func (*Job) CountRunning

func (j *Job) CountRunning() (int64, error)

获取结点正在执行任务的数量

func (*Job) Fail

func (j *Job) Fail(t time.Time, msg string)

func (*Job) Init added in v0.1.2

func (j *Job) Init(nodeID, hostname, ip string)

func (Job) IsRunOn

func (j Job) IsRunOn(nid string, gs map[string]*Group) bool

func (*Job) Key

func (j *Job) Key() string

func (*Job) Notify

func (j *Job) Notify(t time.Time, msg string)

func (*Job) Run

func (j *Job) Run() bool

Run 执行任务

func (*Job) RunWithRecovery

func (j *Job) RunWithRecovery()

func (*Job) ShortName

func (j *Job) ShortName() string

func (*Job) String

func (j *Job) String() string

func (*Job) Success

func (j *Job) Success(t time.Time, out string)

执行结果写入 mongoDB

func (*Job) Valid

func (j *Job) Valid() error

安全选项验证

func (*Job) ValidRules

func (j *Job) ValidRules() error

type JobLatestLog

type JobLatestLog struct {
	JobLog   `bson:",inline"`
	RefLogId string `bson:"refLogId,omitempty" json:"refLogId"`
}

func GetJobLatestLogList

func GetJobLatestLogList(query bson.M, page, size int, sort string) (list []*JobLatestLog, total int, err error)

type JobLog

type JobLog struct {
	Id        bson.ObjectId `bson:"_id,omitempty" json:"id"`
	JobId     string        `bson:"jobId" json:"jobId"`               // 任务 Id,索引
	JobGroup  string        `bson:"jobGroup" json:"jobGroup"`         // 任务分组,配合 Id 跳转用
	User      string        `bson:"user" json:"user"`                 // 执行此次任务的用户
	Name      string        `bson:"name" json:"name"`                 // 任务名称
	Node      string        `bson:"node" json:"node"`                 // 运行此次任务的节点 id,索引
	Hostname  string        `bson:"hostname" json:"hostname"`         // 运行此次任务的节点主机名称,索引
	IP        string        `bson:"ip" json:"ip"`                     // 运行此次任务的节点主机IP,索引
	Command   string        `bson:"command" json:"command,omitempty"` // 执行的命令,包括参数
	Output    string        `bson:"output" json:"output,omitempty"`   // 任务输出的所有内容
	Success   bool          `bson:"success" json:"success"`           // 是否执行成功
	BeginTime time.Time     `bson:"beginTime" json:"beginTime"`       // 任务开始执行时间,精确到毫秒,索引
	EndTime   time.Time     `bson:"endTime" json:"endTime"`           // 任务执行完毕时间,精确到毫秒
	Cleanup   time.Time     `bson:"cleanup,omitempty" json:"-"`       // 日志清除时间标志
}

任务执行记录

func GetJobLogById

func GetJobLogById(id bson.ObjectId) (l *JobLog, err error)

func GetJobLogList

func GetJobLogList(query bson.M, page, size int, sort string) (list []*JobLog, total int, err error)

type JobRule

type JobRule struct {
	ID             string   `json:"id"`
	Timer          string   `json:"timer"`
	GroupIDs       []string `json:"gids"`
	NodeIDs        []string `json:"nids"`
	ExcludeNodeIDs []string `json:"exclude_nids"`

	Schedule cron.Schedule `json:"-"`
}

func (*JobRule) Valid

func (rule *JobRule) Valid() error

验证 timer 字段

type Mail

type Mail struct {
	// contains filtered or unexported fields
}

func NewMail

func NewMail(timeout time.Duration) (m *Mail, err error)

func (*Mail) Send

func (m *Mail) Send(msg *Message)

func (*Mail) Serve

func (m *Mail) Serve()

type Message

type Message struct {
	Subject string
	Body    string
	To      []string
}

type Node

type Node struct {
	ID       string `bson:"_id" json:"id"`  // machine id
	PID      string `bson:"pid" json:"pid"` // 进程 pid
	IP       string `bson:"ip" json:"ip"`   // node ip
	Hostname string `bson:"hostname" json:"hostname"`

	Version  string    `bson:"version" json:"version"`
	UpTime   time.Time `bson:"up" json:"up"`     // 启动时间
	DownTime time.Time `bson:"down" json:"down"` // 上次关闭时间

	Alived    bool `bson:"alived" json:"alived"` // 是否可用
	Connected bool `bson:"-" json:"connected"`   // 当 Alived 为 true 时有效,表示心跳是否正常
}

执行 cron cmd 的进程 注册到 /cronsun/node/<id>

func GetNodes

func GetNodes() (nodes []*Node, err error)

func GetNodesBy

func GetNodesBy(query interface{}) (nodes []*Node, err error)

func GetNodesByID added in v0.3.1

func GetNodesByID(id string) (node *Node, err error)

func (*Node) Del

func (n *Node) Del() (*client.DeleteResponse, error)

func (*Node) Down

func (n *Node) Down()

On 结点实例停用后,在 mongoDB 中去掉存活信息

func (*Node) Exist

func (n *Node) Exist() (pid int, err error)

判断 node 是否已注册到 etcd 存在则返回进行 pid,不存在返回 -1

func (*Node) On

func (n *Node) On()

On 结点实例启动后,在 mongoDB 中记录存活信息

func (*Node) Put

func (n *Node) Put(opts ...client.OpOption) (*client.PutResponse, error)

func (*Node) String

func (n *Node) String() string

type Noticer

type Noticer interface {
	Serve()
	Send(*Message)
}

type Process

type Process struct {
	ID     string    `json:"id"` // pid
	JobID  string    `json:"jobId"`
	Group  string    `json:"group"`
	NodeID string    `json:"nodeId"`
	Time   time.Time `json:"time"` // 开始执行时间
	// contains filtered or unexported fields
}

当前执行中的任务信息 key: /cronsun/proc/node/group/jobId/pid value: 开始执行时间 key 会自动过期,防止进程意外退出后没有清除相关 key,过期时间可配置

func GetProcFromKey

func GetProcFromKey(key string) (proc *Process, err error)

func (*Process) Key

func (p *Process) Key() string

func (*Process) Start

func (p *Process) Start()

func (*Process) Stop

func (p *Process) Stop()

func (*Process) Val

func (p *Process) Val() string

type Role added in v0.2.1

type Role int
const (
	Administrator Role = 1
	Developer     Role = 2
)

func (Role) Defined added in v0.2.1

func (r Role) Defined() bool

func (Role) String added in v0.2.1

func (r Role) String() string

type StatExecuted

type StatExecuted struct {
	Total     int64  `bson:"total" json:"total"`
	Successed int64  `bson:"successed" json:"successed"`
	Failed    int64  `bson:"failed" json:"failed"`
	Date      string `bson:"date" json:"date"`
}

func JobLogDailyStat added in v0.2.3

func JobLogDailyStat(begin, end time.Time) (ls []*StatExecuted, err error)

func JobLogStat

func JobLogStat() (s *StatExecuted, err error)

type UserStatus added in v0.2.1

type UserStatus int
const (
	UserBanned  UserStatus = -1
	UserActived UserStatus = 1
)

func (UserStatus) Defined added in v0.2.1

func (s UserStatus) Defined() bool

Directories

Path Synopsis
bin
node
node 服务 用于在所需要执行 cron 任务的机器启动服务,替代 cron 执行所需的任务
node 服务 用于在所需要执行 cron 任务的机器启动服务,替代 cron 执行所需的任务
web
db
mid
cron
This library implements a cron spec parser and runner.
This library implements a cron spec parser and runner.
加载json(可配置扩展字段)配置文件 { "Debug": true, "Log": "@extend:./log.json" }
加载json(可配置扩展字段)配置文件 { "Debug": true, "Log": "@extend:./log.json" }
web

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL