jobsd

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2021 License: Apache-2.0 Imports: 23 Imported by: 0

README

JobsD

A distributed and reliable database backed, job execution framework

Go Report Card Test Status Go Reference

Download it
go get -u github.com/simpleframeworks/jobsd

How does it work? (in short)

  • A db table is a queue.
  • Every "job run" has an associated db row.
  • Instances in a cluster compete to acquire and run a job (without locking).
  • A worker pool runs jobs.
  • A new reoccurring scheduled "job run" is created after a "job run" is complete.
  • The db queue and the local JobsD instance queue are periodically synchronized.

Quick Example

Announce the time every minute on the minute.


jd := jobsd.New(db) // Create a JobsD service instance

// Register a Job that announces the time
jd.RegisterJob("Announce", func(name string) error {
  fmt.Printf("Hi %s! The date/time is %s.\n", name, time.Now().Format(time.RFC1123))
  return nil
})

// Register a schedule that tells JobsD when to trigger next
jd.RegisterSchedule("OnTheMin", func(now time.Time) time.Time {
  return time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute()+1, 0, 0, now.Location())
})

jd.Up() // Bring up the JobsD service

// Create and schedule the job "Announce" to run "OnTheMin"
jd.CreateRun("Announce", "Simon").Schedule("OnTheMin").Run()


<-time.After(2*time.Minute) // Should really wait for OS kill signal here

jb.Down() // Shutdown the JobsD service instance, wait for running jobs to complete, record stats, and tidy up

A runnable example can be found in the examples folder. Just run it go run main.go from the directory.

Basic Usage

Creating jobs

The characteristics of a job is as follows:

  • Jobs are just funcs
  • Jobs must return an error
  • Jobs can have a number of params
  • Across a cluster all jobs should be named the same and have the same implementation.
    • Not all jobs need to implemented across the cluster (facilitates new code and new jobs)
  • All jobs need to be registered before the instance Up() func is called
  • The first argument can optional be of the type jobsd.RunInfo
    • RunInfo contains a Cancel channel for graceful shutdown / timeout amongst other things
jobFunc1 := func() error {
  //DO SOME STUFF
  return nil
}

jobFunc2 := func(name string, age int) error {
  //DO SOME STUFF
  return nil
}

jd.RegisterJob("job1", jobFunc1)
jd.RegisterJob("job2", jobFunc2)
Creating Schedules

A schedule is a simple function that takes in the current time and returns the next scheduled time.

  • Schedules must be registered before the Up() func is called
afterASecond := func(now time.Time) time.Time {
  return now.Add(time.Second)
}

onTheMin := func(now time.Time) time.Time {
  return time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute()+1, 0, 0, now.Location())
})

onTheHour := func(now time.Time) time.Time {
  return time.Date(now.Year(), now.Month(), now.Day(), now.Hour()+1, 0, 0, 0, now.Location())
})

jd.RegisterSchedule("afterASecond", afterASecond)
jd.RegisterSchedule("onTheMin", onTheMin)
jd.RegisterSchedule("onTheHour", onTheHour)

Running jobs

A job run is an instance of a job to be executed. A job must be registered first before creating a job run using jd.CreateRun("job1", args...)

jobFunc := func(txt string) error {
  fmt.Printf("Hello %s!", txt)
  return nil
}
scheduleFunc := func(now time.Time) time.Time {
  return now.Add(time.Second)
}

jd := New(db)

jd.RegisterJob("job1", jobFunc)

jd.RegisterSchedule("schedule1", scheduleFunc)

jd.Up()

jd.CreateRun("job1", "World A").Run() // Run job1 once immediately
jd.CreateRun("job1", "World B").RunDelayed(time.Second) // Run job1 once after one second

jd.CreateRun("job1", "World C").Schedule("schedule1").Limit(2).Run() // Run job1 every second twice
jd.CreateRun("job1", "World D").Schedule("schedule1").Limit(2).RunAfter(time.Second) // After one second schedule job1 to run twice

// Runs only one "GlobalUniqueJob1" job at a time, across a cluster of JobsD instances
jd.CreateRun("job1", "World E").Unique("GlobalUniqueJob1").Run() 

// Runs and schedules only one "GlobalUniqueJob2" job at a time, across a cluster of JobsD instances. Runs only twice.
jd.CreateRun("job1", "World F").Schedule("schedule1").Limit(2).Unique("GlobalUniqueJob2").Run() 

<-time.After(10 * time.Second)
jd.Down() // Make sure to shutdown to cleanup and record stats
Getting the job run state:

id, err := jd.CreateRun("job1", "World A").Run()
checkError(err)

runState := jd.GetRunState(id) // Get the run state of the job.

spew.Dump(runState.OriginID)
spew.Dump(runState.Name)
spew.Dump(runState.Job)
spew.Dump(runState.Schedule)
spew.Dump(runState.RunSuccessCount)
spew.Dump(runState.RunStartedAt)
spew.Dump(runState.RunStartedBy)
spew.Dump(runState.RunCompletedAt)
spew.Dump(runState.RunCompletedError)
spew.Dump(runState.RetriesOnErrorCount)
spew.Dump(runState.RetriesOnTimeoutCount)
spew.Dump(runState.CreatedAt)
spew.Dump(runState.CreatedBy)

err = runState.Refresh() // Refreshes the run state.
checkError(err)

Advanced Usage


jd := New(db)

jd.WorkerNum(10) // Set the number of workers to run the jobs

jd.PollInterval(10*time.Second) // The time between checks for new jobs across the cluster

jd.PollLimit(100) // The number of jobs to retrieve across the cluster at once

Error handling

A job func needs to return an error . If an error is returned a job can be retried. You can set how many times a retry is attempted

Error retries instance defaults

jd.RetriesErrorLimit(3) // How many times to retry a job when an error is returned (-1 = unlimited)

Error retries can be set on the Job

jd.RegisterJob("job1", jobFunc).RetriesErrorLimit(2) // -1 = unlimited

Error retries can be set on the Job Run

jd.CreateRun("job1", "World A").RetriesErrorLimit(2).Run() // -1 = unlimited
Timeouts

IMPORTANT: Timeouts will not kill running jobs, they will keep running. In order to cancel a running job on time out, add the jobsd.RunInfo as the first argument in your job and use the jobsd.RunInfo.Cancel channel (see example below).

Timeouts instance defaults

jd.RunTimeout(30*time.Minute) // How long before retrying a job (0 disables time outs)

jd.RetriesTimeoutLimit(3) // How many times to retry a job when it times out (-1 = unlimited)

jd.TimeoutCheck(10*time.Second) // The time between checks for jobs that have timed out (or crashed) on other nodes in the cluster

Timeouts can be set on the Job

jd.RegisterJob("job1", jobFunc).RunTimeout(10*time.Minute).RetriesTimeoutLimit(2) 
// RunTimeout set to 0 disables time outs
// RetriesTimeoutLimit set to -1 = unlimited

Timeouts can be set on a Job Run

jd.CreateRun("job1", "World A").RunTimeout(1*time.Minute).RetriesTimeoutLimit(5).Run() 
// RunTimeout set to 0 disables time outs
// RetriesTimeoutLimit set to -1 = unlimited
Canceling a job on timeout / shutdown

Create a job like the following

jobFunc := func(info RunInfo) error {
			select {
			case <-time.After(timeout + 10*time.Second):
				fmt.Println("Did some work")
			case <-info.Cancel:
				fmt.Println("Job canceled")
			}
		return nil
}

jd.RegisterJob("CancelableJob", jobFunc)
Database

PostgreSQL, SQLite, and MySQL are supported out of the box.

Using PostgreSQL
host := os.Getenv("JOBSD_PG_HOST")
port := os.Getenv("JOBSD_PG_PORT")
dbname := os.Getenv("JOBSD_PG_DB")
user := os.Getenv("JOBSD_PG_USER")
password := os.Getenv("JOBSD_PG_PASSWORD")
dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable", host, user, password, dbname, port)

db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})

jd := New(db)
Using MySQL
host := os.Getenv("JOBSD_MY_HOST")
port := os.Getenv("JOBSD_MY_PORT")
dbname := os.Getenv("JOBSD_MY_DB")
user := os.Getenv("JOBSD_MY_USER")
password := os.Getenv("JOBSD_MY_PASSWORD")
dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", user, password, host, port, dbname)

db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
  Logger: logc.NewGormLogger(logger),
})
checkError(err)

jd := New(db)
Using SQLite
db, err0 := gorm.Open(sqlite.Open("file::memory:"), &gorm.Config{
  Logger: logc.NewGormLogger(logger),
})

sqlDB, err := db.DB()
checkError(err)

// SQLLite does not work with concurrent connections
sqlDB.SetMaxIdleConns(1)
sqlDB.SetMaxOpenConns(1)

jd := New(db)
Disable Auto Migrations

Auto migrations create the DB tables and structure required for JobsD. It is run when starting JobsD Up(). Auto migrations are only required the first time JobsD runs so it can be disabled using the following method.

jd.AutoMigrate(false)

// Register Jobs and Schedules etc...

jd.Up()
Logging

A logger can be supplied. The logger must implement the logc interface

jd := New(db)
jd.Logger(logger)

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	//ErrJobFuncNotFunc Job Func not a func
	ErrJobFuncNotFunc = errors.New("jobFunc is not a func")
	// ErrJobFuncNoErrRtn Job Func does not return an error
	ErrJobFuncNoErrRtn = errors.New("jobFunc needs to return an error")
	// ErrJobFuncArgsMismatch Calling Job Func args are mismatched
	ErrJobFuncArgsMismatch = errors.New("jobFunc calling args mismatch")
)
View Source
var ErrRunKill = errors.New("job run killed")

ErrRunKill returns if a job was killed

View Source
var ErrRunTimeout = errors.New("job run timed out")

ErrRunTimeout returns if a job times out

Functions

This section is empty.

Types

type Instance

type Instance struct {
	ID                    int64 `gorm:"primaryKey"`
	Workers               int
	AutoMigrate           bool
	SupportedJobs         string
	SupportedSchedules    string
	PollInterval          time.Duration // When to poll the DB for Runs
	PollLimit             int           // How many Runs to get during polling
	TimeoutCheck          time.Duration // Time between checking job runs for timeout or error
	RunTimeout            sql.NullInt64 // Default job retry timeout
	RetriesOnTimeoutLimit sql.NullInt64 // Default number of retries for a job after timeout
	RetriesOnErrorLimit   sql.NullInt64 // Default number of retries for a job after error
	RunsStarted           int           // Job runs started
	RunsRescheduled       int           // Job runs rescheduled after finishing
	RunsTimedOut          int           // Job runs timed out
	RunsTimedOutRes       int           // Job runs resurrected after time out
	RunsErrors            int           // Job runs that have returned an error
	LastSeenAt            sql.NullTime  // Last time instance was alive
	ShutdownAt            sql.NullTime
	CreatedAt             time.Time
}

Instance .

func (Instance) TableName added in v1.0.1

func (Instance) TableName() string

TableName specifies the db table name

type JobArgs added in v1.1.0

type JobArgs []interface{}

JobArgs holds job func parameters used to run a job. It can be serialized for DB storage

func (JobArgs) GormDataType added in v1.1.0

func (p JobArgs) GormDataType() string

GormDataType .

func (*JobArgs) Scan added in v1.1.0

func (p *JobArgs) Scan(value interface{}) error

Scan scan value into []

func (JobArgs) Value added in v1.1.0

func (p JobArgs) Value() (driver.Value, error)

Value return params value, implement driver.Valuer interface

type JobContainer

type JobContainer struct {
	// contains filtered or unexported fields
}

JobContainer .

func (*JobContainer) RetriesErrorLimit added in v1.1.0

func (j *JobContainer) RetriesErrorLimit(limit int) *JobContainer

RetriesErrorLimit sets the RetriesErrorLimit Setting it to -1 removes the limit

func (*JobContainer) RetriesTimeoutLimit added in v1.1.0

func (j *JobContainer) RetriesTimeoutLimit(limit int) *JobContainer

RetriesTimeoutLimit sets how many times a job run can timeout Setting it to -1 removes the limit

func (*JobContainer) RunTimeout added in v1.1.0

func (j *JobContainer) RunTimeout(timeout time.Duration) *JobContainer

RunTimeout sets the default timeout of a job run Setting it to 0 disables timeout

type JobFunc

type JobFunc struct {
	// contains filtered or unexported fields
}

JobFunc .

func NewJobFunc

func NewJobFunc(theFunc interface{}) JobFunc

NewJobFunc .

type JobsD

type JobsD struct {
	// contains filtered or unexported fields
}

JobsD .

Example
jobName := "ExampleJobsD" // Must be unique otherwise tests may collide

jd := testSetup(logrus.ErrorLevel)

wait := make(chan struct{})
job1Func := func(txt string) error {
	fmt.Printf("Hello %s!", txt)
	wait <- struct{}{}
	return nil
}
jd.RegisterJob(jobName, job1Func)

schedule1Func := func(now time.Time) time.Time {
	return now.Add(500 * time.Millisecond)
}
jd.RegisterSchedule("schedule1", schedule1Func)

err0 := jd.Up()
testPanicErr(err0)

_, err1 := jd.CreateRun(jobName, "World").Schedule("schedule1").Limit(1).Run()
testPanicErr(err1)

<-wait
err2 := jd.Down()
testPanicErr(err2)

testTeardown(jd)
Output:

Hello World!

func New

func New(db *gorm.DB) *JobsD

New .

func (*JobsD) AutoMigration added in v1.1.0

func (j *JobsD) AutoMigration(run bool) *JobsD

AutoMigration turns on or off auto-migration

func (*JobsD) CreateRun

func (j *JobsD) CreateRun(job string, jobParams ...interface{}) *RunOnceCreator

CreateRun . Create a job run.

func (*JobsD) Down

func (j *JobsD) Down() error

Down shutsdown the JobsD service instance

func (*JobsD) GetDB added in v1.1.0

func (j *JobsD) GetDB() *gorm.DB

GetDB return the DB currently in use

func (*JobsD) GetInstance added in v1.1.0

func (j *JobsD) GetInstance() Instance

GetInstance returns the instance record

func (*JobsD) GetLogger added in v1.1.0

func (j *JobsD) GetLogger() logc.Logger

GetLogger return the DB currently in use

func (*JobsD) GetRunState added in v1.1.0

func (j *JobsD) GetRunState(id int64) *RunState

GetRunState retrieves the current state of the job run

func (*JobsD) Logger

func (j *JobsD) Logger(logger logc.Logger) *JobsD

Logger sets logrus logger

func (*JobsD) PollInterval added in v1.1.0

func (j *JobsD) PollInterval(pollInt time.Duration) *JobsD

PollInterval sets the time between getting new Runs from the DB and cluster

func (*JobsD) PollLimit added in v1.1.0

func (j *JobsD) PollLimit(limit int) *JobsD

PollLimit sets the number of upcoming Runs to retrieve from the DB at a time

func (*JobsD) RegisterJob

func (j *JobsD) RegisterJob(name string, jobFunc interface{}) *JobContainer

RegisterJob registers a job to be run when required. name parameter should not contain a comma. jobFunc parameter should be any func that return an error. All jobFunc params must be gob serializable.

func (*JobsD) RegisterSchedule

func (j *JobsD) RegisterSchedule(name string, scheduleFunc ScheduleFunc)

RegisterSchedule adds a schedule name parameter should not contain a comma.

func (*JobsD) RetriesErrorLimit added in v1.1.0

func (j *JobsD) RetriesErrorLimit(limit int) *JobsD

RetriesErrorLimit sets the RetriesErrorLimit Setting it to -1 removes the limit

func (*JobsD) RetriesTimeoutLimit added in v1.1.0

func (j *JobsD) RetriesTimeoutLimit(limit int) *JobsD

RetriesTimeoutLimit sets how many times a job run can timeout Setting it to -1 removes the limit

func (*JobsD) RunTimeout added in v1.1.0

func (j *JobsD) RunTimeout(timeout time.Duration) *JobsD

RunTimeout sets the RunTimeout Setting it to 0 disables timeout

func (*JobsD) TimeoutCheck added in v1.1.0

func (j *JobsD) TimeoutCheck(interval time.Duration) *JobsD

TimeoutCheck sets the time between retry timeout checks

func (*JobsD) Up

func (j *JobsD) Up() error

Up starts up the JobsD service instance

func (*JobsD) WorkerNum

func (j *JobsD) WorkerNum(workers int) *JobsD

WorkerNum sets the number of workers to process jobs

type Run added in v1.1.0

type Run struct {
	ID                    int64 `gorm:"primaryKey"`
	OriginID              int64 `gorm:"index"`
	Name                  string
	NameActive            sql.NullString `gorm:"unique"`
	Job                   string
	JobArgs               JobArgs
	Delay                 time.Duration
	RunAt                 time.Time
	RunTotalCount         int
	RunSuccessCount       int
	RunSuccessLimit       sql.NullInt64
	RunStartedAt          sql.NullTime `gorm:"index"`
	RunStartedBy          sql.NullInt64
	RunCompletedAt        sql.NullTime `gorm:"index"`
	RunCompletedError     sql.NullString
	RunTimeout            sql.NullInt64
	RunTimeoutAt          sql.NullTime `gorm:"index"`
	RetriesOnErrorCount   int
	RetriesOnErrorLimit   sql.NullInt64
	RetriesOnTimeoutCount int
	RetriesOnTimeoutLimit sql.NullInt64
	Schedule              sql.NullString
	CreatedAt             time.Time `gorm:"index"`
	CreatedBy             int64
}

Run is a database representation of a job run

func (Run) TableName added in v1.1.0

func (Run) TableName() string

TableName specifies the db table name

type RunInfo added in v1.1.0

type RunInfo struct {
	ID                    int64
	OriginID              int64
	Name                  string
	Job                   string
	JobArgs               JobArgs
	RunAt                 time.Time
	RunTotalCount         int
	RunSuccessCount       int
	RunSuccessLimit       *int
	RunStartedAt          time.Time
	RunStartedBy          int64
	RunTimeout            time.Duration
	RunTimeoutAt          *time.Time
	RetriesOnErrorCount   int
	RetriesOnErrorLimit   *int
	RetriesOnTimeoutCount int
	RetriesOnTimeoutLimit *int
	Schedule              *string
	CreatedAt             time.Time
	CreatedBy             int64
	Cancel                <-chan struct{}
}

RunInfo exposes information and functions to a running job

type RunOnceCreator

type RunOnceCreator struct {
	// contains filtered or unexported fields
}

RunOnceCreator creates a job run that runs only once

func (*RunOnceCreator) RetriesErrorLimit added in v1.1.0

func (r *RunOnceCreator) RetriesErrorLimit(limit int) *RunOnceCreator

RetriesErrorLimit sets the RetriesErrorLimit Setting it to -1 removes the limit

func (*RunOnceCreator) RetriesTimeoutLimit added in v1.1.0

func (r *RunOnceCreator) RetriesTimeoutLimit(limit int) *RunOnceCreator

RetriesTimeoutLimit sets how many times a job run can timeout Setting it to -1 removes the limit

func (*RunOnceCreator) Run

func (r *RunOnceCreator) Run() (int64, error)

Run the job returns the Job Run ID and Error

func (*RunOnceCreator) RunAfter

func (r *RunOnceCreator) RunAfter(delay time.Duration) (int64, error)

RunAfter the job returns the Job Run ID and Error

func (*RunOnceCreator) RunTimeout added in v1.1.0

func (r *RunOnceCreator) RunTimeout(timeout time.Duration) *RunOnceCreator

RunTimeout sets the RunTimeout Setting it to 0 disables timeout

func (*RunOnceCreator) Schedule

func (r *RunOnceCreator) Schedule(schedule string) *RunScheduleCreator

Schedule the job

func (*RunOnceCreator) Unique

func (r *RunOnceCreator) Unique(name string) *RunOnceCreator

Unique gives the run a unique name across the cluster. i.e only one job with a unique name can be running or jobsd at the same time.

type RunRes added in v1.1.0

type RunRes int

RunRes is the result of trying to run the job

const (
	// RunResLockLost It could not lock the job to run it
	RunResLockLost RunRes = iota
	// RunResTO It ran and timed out
	RunResTO
	// RunResError It ran and returned an error
	RunResError
	// RunResSuccess It ran successfully
	RunResSuccess
)

type RunScheduleCreator

type RunScheduleCreator struct {
	// contains filtered or unexported fields
}

RunScheduleCreator create a job run that runs according to a schedule

func (*RunScheduleCreator) Limit

func (r *RunScheduleCreator) Limit(limit int) *RunScheduleCreator

Limit sets how many times the job can successfully run

func (*RunScheduleCreator) RetriesErrorLimit added in v1.1.0

func (r *RunScheduleCreator) RetriesErrorLimit(limit int) *RunScheduleCreator

RetriesErrorLimit sets the RetriesErrorLimit Setting it to -1 removes the limit

func (*RunScheduleCreator) RetriesTimeoutLimit added in v1.1.0

func (r *RunScheduleCreator) RetriesTimeoutLimit(limit int) *RunScheduleCreator

RetriesTimeoutLimit sets how many times a job run can timeout Setting it to -1 removes the limit

func (*RunScheduleCreator) Run

func (r *RunScheduleCreator) Run() (int64, error)

Run the job according to the schedule returns the Job Run ID and Error

func (*RunScheduleCreator) RunAfter

func (r *RunScheduleCreator) RunAfter(delay time.Duration) (int64, error)

RunAfter the specified duration returns the Job Run ID and Error

func (*RunScheduleCreator) RunTimeout added in v1.1.0

func (r *RunScheduleCreator) RunTimeout(timeout time.Duration) *RunScheduleCreator

RunTimeout sets the RunTimeout Setting it to 0 disables timeout

func (*RunScheduleCreator) Unique

func (r *RunScheduleCreator) Unique(name string) *RunScheduleCreator

Unique gives the run a unique name across the cluster. i.e only one job with a unique name can be running or jobsd at the same time.

type RunState added in v1.1.0

type RunState struct {
	OriginID              int64
	Name                  string
	Job                   string
	Schedule              *string
	RunSuccessCount       int
	RunStartedAt          *time.Time
	RunStartedBy          *int64
	RunCompletedAt        *time.Time
	RunCompletedError     *string
	RetriesOnErrorCount   int
	RetriesOnTimeoutCount int
	CreatedAt             time.Time
	CreatedBy             int64
	// contains filtered or unexported fields
}

RunState is a snapshot of a job runs latest state

func (*RunState) Refresh added in v1.1.0

func (j *RunState) Refresh() error

Refresh gets and loads the latest data into RunState

type Runnable added in v1.1.0

type Runnable struct {
	// contains filtered or unexported fields
}

Runnable represents a single runnable job run

type RunnableQueue added in v1.1.0

type RunnableQueue struct {
	// contains filtered or unexported fields
}

RunnableQueue is a priority queue of jobs to run

func NewRunnableQueue added in v1.1.0

func NewRunnableQueue() *RunnableQueue

NewRunnableQueue .

func (*RunnableQueue) Len added in v1.1.0

func (q *RunnableQueue) Len() int

Len .

func (*RunnableQueue) Peek added in v1.1.0

func (q *RunnableQueue) Peek() *Runnable

Peek .

func (*RunnableQueue) Pop added in v1.1.0

func (q *RunnableQueue) Pop() *Runnable

Pop .

func (*RunnableQueue) Push added in v1.1.0

func (q *RunnableQueue) Push(j *Runnable) bool

Push .

type ScheduleFunc

type ScheduleFunc func(now time.Time) time.Time

ScheduleFunc .

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL