sched

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2021 License: MIT Imports: 7 Imported by: 0

README



🕰 Sched

Go In-Process Scheduler with Cron Expression Support

Run Jobs on a schedule, supports fixed interval, timely, and cron-expression timers; Instrument your processes and expose metrics for each job.

Go Doc Go Version Go Report GitHub license

Introduction

A simple process manager that allows you to specify a Schedule that execute a Job based on a Timer. Schedule manage the state of this job allowing you to start/stop/restart in concurrent safe way. Schedule also instrument this Job and gather metrics and optionally expose them via uber-go/tally scope.

Install

go get github.com/sherifabdlnaby/sched
import "github.com/sherifabdlnaby/sched"

Requirements

Go 1.13 >=


Concepts

Job

Simply a func(){} implementation that is the schedule goal to run, and instrument.

Timer

An Object that Implements the type Timer interface{}. A Timer is responsible for providing a schedule with the next time the job should run and if there will be subsequent runs.

Packaged Implementations:

  1. Fixed :- Infinitely Fires a job at a Fixed Interval (time.Duration)
  2. Cron :- Infinitely Fires a job based on a Cron Expression, all Expressions supported by gorhill/cronexpr are supported.
  3. Once :- A Timer that run ONCE after an optional specific delay or at a specified time, schedule will stop after it fires.

You can Implement your own Timer for your specific scheduling needs by implementing

type Timer interface {
	// done indicated that there will be no more runs.
    Next() (next time.Time, done bool)
}

Schedule

A Schedule wraps a Job and fires it according to Timer.

	fixedTimer30second, _ := sched.NewFixed(30 * time.Second)

	job := func() {
		log.Println("Doing some work...")
		time.Sleep(1 * time.Second)
		log.Println("Finished Work.")
	}

	// Create Schedule
	schedule := sched.NewSchedule("every30s", fixedTimer30second, job)

	// Start
	schedule.Start()
Options

Additional Options can be passed to Schedule to change its behavior.

// Create Schedule
schedule := sched.NewSchedule("every30s", fixedTimer30second, job,
	sched.WithLogger(sched.DefaultLogger()),
	opt2,
	opt3,
	....,
)
Logger Option

WithLogger( logger Logger) -> Supply the Schedule the Logger it is going to use for logging.

  1. func DefaultLogger() Logger : Provide a Default Logging Interface to be used Instead of Implementing your own.
  2. func NopLogger() Logger : A nop Logger that will not output anything to stdout.
Metrics Option

WithMetrics( scope tally.Scope) -> Supply the Schedule with a metrics scope it can use to export metrics.

  1. Use any of uber-go/tally implementations (Prometheus, statsd, etc)

Use func WithConsoleMetrics(printEvery time.Duration) Option Implementation to Output Metrics to stdout (good for debugging)

Expected Runtime

WithExpectedRunTime(d time.Duration) -> Supply the Schedule with the expected duration for the job to run, schedule will output corresponding logs and metrics if job run exceeded expected.

Schedule(r)

Scheduler manage one or more Schedule creating them using common options, enforcing unique IDs, and supply methods to Start / Stop all schedule(s).


Exported Metrics

Metric Type Desc
up Gauge If the schedule is Running / Stopped
runs Counter Number of Runs Since Starting
runs_overlapping Counter Number of times more than one job was running together. (Overlapped)
run_actual_elapsed_time Time Elapsed Time between Starting and Ending of Job Execution
run_total_elapsed_time Time Total Elapsed Time between Creating the Job and Ending of Job Execution, This differ from Actual Elapsed time when Overlapping blocking is Implemented
run_errors Counter Count Number of Times a Job error'd(Panicked) during execution.
run_exceed_expected_time Counter Count Number of Times a Job Execution Time exceeded the Expected Time
In Prometheus Format
# HELP sched_run_actual_elapsed_time sched_run_actual_elapsed_time summary
# TYPE sched_run_actual_elapsed_time summary
sched_run_actual_elapsed_time{id="every5s",quantile="0.5"} 0.203843151
sched_run_actual_elapsed_time{id="every5s",quantile="0.75"} 1.104031623
sched_run_actual_elapsed_time{id="every5s",quantile="0.95"} 1.104031623
sched_run_actual_elapsed_time{id="every5s",quantile="0.99"} 1.104031623
sched_run_actual_elapsed_time{id="every5s",quantile="0.999"} 1.104031623
sched_run_actual_elapsed_time_sum{id="every5s"} 1.307874774
sched_run_actual_elapsed_time_count{id="every5s"} 2
# HELP sched_run_errors sched_run_errors counter
# TYPE sched_run_errors counter
sched_run_errors{id="every5s"} 0
# HELP sched_run_exceed_expected_time sched_run_exceed_expected_time counter
# TYPE sched_run_exceed_expected_time counter
sched_run_exceed_expected_time{id="every5s"} 0
# HELP sched_run_total_elapsed_time sched_run_total_elapsed_time summary
# TYPE sched_run_total_elapsed_time summary
sched_run_total_elapsed_time{id="every5s",quantile="0.5"} 0.203880714
sched_run_total_elapsed_time{id="every5s",quantile="0.75"} 1.104065614
sched_run_total_elapsed_time{id="every5s",quantile="0.95"} 1.104065614
sched_run_total_elapsed_time{id="every5s",quantile="0.99"} 1.104065614
sched_run_total_elapsed_time{id="every5s",quantile="0.999"} 1.104065614
sched_run_total_elapsed_time_sum{id="every5s"} 1.307946328
sched_run_total_elapsed_time_count{id="every5s"} 2
# HELP sched_runs sched_runs counter
# TYPE sched_runs counter
sched_runs{id="every5s"} 2
# HELP sched_runs_overlapping sched_runs_overlapping counter
# TYPE sched_runs_overlapping counter
sched_runs_overlapping{id="every5s"} 0
# HELP sched_up sched_up gauge
# TYPE sched_up gauge
sched_up{id="every5s"} 1

Examples

  1. schedule-console-metrics
  2. schedule-cron
  3. schedule-fixed
  4. schedule-four-mixed-timers
  5. schedule-once
  6. schedule-overlapping
  7. schedule-panic
  8. schedule-prom-metrics
  9. schedule-warn-expected
  10. scheduler
  11. scheduler-extra-opts

Inline Example

package main

import (
    "fmt"
    "github.com/sherifabdlnaby/sched"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {

    cronTimer, err := sched.NewCron("* * * * *")
    if err != nil {
        panic(fmt.Sprintf("invalid cron expression: %s", err.Error()))
    }

    job := func() {
        log.Println("Doing some work...")
        time.Sleep(1 * time.Second)
        log.Println("Finished Work.")
    }

    // Create Schedule
    schedule := sched.NewSchedule("cron", cronTimer, job, sched.WithLogger(sched.DefaultLogger()))

    // Start Schedule
    schedule.Start()

    // Stop schedule after 5 Minutes
    time.AfterFunc(5*time.Minute, func() {
        schedule.Stop()
    })

    // Listen to CTRL + C
    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
    _ = <-signalChan

    // Stop before shutting down.
    schedule.Stop()

    return
}

Output for 3 minutes
2021-04-10T12:30: 13.132+0200    INFO    sched   sched/schedule.go: 96    Job Schedule Started    {"id": "cron"}
2021-04-10T12:30: 13.132+0200    INFO    sched   sched/schedule.go: 168   Job Next Run Scheduled  {"id": "cron", "After": "47s", "At": "2021-04-10T12:31:00+02:00"}
2021-04-10T12: 31: 00.000+0200    INFO    sched   sched/schedule.go: 168   Job Next Run Scheduled  {"id": "cron", "After": "1m0s", "At": "2021-04-10T12:32:00+02:00"}
2021-04-10T12: 31:00.000+0200    INFO    sched   sched/schedule.go: 193   Job Run Starting        {"id": "cron", "Instance": "8e1044ab-20b6-4acf-8a15-e06c0418522c"}
2021/04/10 12: 31: 00 Doing some work...
2021/04/10 12: 31: 01 Finished Work.
2021-04-10T12: 31: 01.001+0200    INFO    sched   sched/schedule.go: 208   Job Finished    {"id": "cron", "Instance": "8e1044ab-20b6-4acf-8a15-e06c0418522c", "Duration": "1.001s", "State": "FINISHED"}
2021-04-10T12:32: 00.002+0200    INFO    sched   sched/schedule.go: 168   Job Next Run Scheduled  {"id": "cron", "After": "1m0s", "At": "2021-04-10T12:33:00+02:00"}
2021-04-10T12: 32: 00.002+0200    INFO    sched   sched/schedule.go: 193   Job Run Starting        {"id": "cron", "Instance": "baae94eb-f818-4b34-a1f4-45b521a360a1"}
2021/04/10 12: 32: 00 Doing some work...
2021/04/10 12: 32: 01 Finished Work.
2021-04-10T12:32: 01.005+0200    INFO    sched   sched/schedule.go: 208   Job Finished    {"id": "cron", "Instance": "baae94eb-f818-4b34-a1f4-45b521a360a1", "Duration": "1.003s", "State": "FINISHED"}
2021-04-10T12: 33: 00.001+0200    INFO    sched   sched/schedule.go:168   Job Next Run Scheduled  {"id": "cron", "After": "1m0s", "At": "2021-04-10T12:34:00+02:00"}
2021-04-10T12:33: 00.001+0200    INFO    sched   sched/schedule.go: 193   Job Run Starting        {"id": "cron", "Instance": "71c8f0bf-3624-4a92-909c-b4149f3c62a3"}
2021/04/10 12: 33: 00 Doing some work...
2021/04/10 12: 33: 01 Finished Work.
2021-04-10T12: 33: 01.004+0200    INFO    sched   sched/schedule.go:208   Job Finished    {"id": "cron", "Instance": "71c8f0bf-3624-4a92-909c-b4149f3c62a3", "Duration": "1.003s", "State": "FINISHED"}


Output With CTRL+C
2021-04-10T12:28: 45.591+0200    INFO    sched   sched/schedule.go: 96    Job Schedule Started    {"id": "cron"}
2021-04-10T12:28: 45.592+0200    INFO    sched   sched/schedule.go: 168   Job Next Run Scheduled  {"id": "cron", "After": "14s", "At": "2021-04-10T12:29:00+02:00"}
2021-04-10T12: 29: 00.000+0200    INFO    sched   sched/schedule.go: 168   Job Next Run Scheduled  {"id": "cron", "After": "1m0s", "At": "2021-04-10T12:30:00+02:00"}
2021-04-10T12: 29:00.000+0200    INFO    sched   sched/schedule.go: 193   Job Run Starting        {"id": "cron", "Instance": "786540f1-594b-44a0-9a66-7181619e38a6"}
2021/04/10 12: 29: 00 Doing some work...
CTRL+C
2021-04-10T12: 29: 00.567+0200    INFO    sched   sched/schedule.go: 125   Stopping Schedule...    {"id": "cron"}
2021-04-10T12: 29: 00.567+0200    INFO    sched   sched/schedule.go: 130   Waiting active jobs to finish...        {"id": "cron"}
2021-04-10T12: 29: 00.567+0200    INFO    sched   sched/schedule.go: 171   Job Next Run Canceled   {"id": "cron", "At": "2021-04-10T12:30:00+02:00"}
2021/04/10 12: 29: 01 Finished Work.
2021-04-10T12: 29:01.000+0200    INFO    sched   sched/schedule.go: 208   Job Finished    {"id": "cron", "Instance": "786540f1-594b-44a0-9a66-7181619e38a6", "Duration": "1s", "State": "FINISHED"}
2021-04-10T12: 29: 01.000+0200    INFO    sched   sched/schedule.go: 133   Job Schedule Stopped {"id": "cron" }

Todo(s) and Enhancements

  • Control Logging Verbosity
  • Make Panic Recovery Optional
  • Make Job a func() error and allow retry(s), backoff, and collect errors and their metrics
  • Make Jobs context aware and support canceling Jobs Context.
  • Make allow Overlapping Optional and Configure How Overlapping is handled/denied.
  • Global Package-Level Metrics

License

MIT License Copyright (c) 2021 Sherif Abdel-Naby

Contribution

PR(s) are Open and Welcomed. ❤️

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cron

type Cron struct {
	// contains filtered or unexported fields
}

Cron A Timer that fires at according to a cron expression. All expresion supported by `https://github.com/gorhill/cronexpr` are supported.

func NewCron

func NewCron(cronExpression string) (*Cron, error)

NewCron returns a Timer that fires at according to a cron expression. All expresion supported by `https://github.com/gorhill/cronexpr` are supported.

func (*Cron) Next

func (c *Cron) Next() (time.Time, bool)

Next Return Next fire time.

type Fixed

type Fixed struct {
	// contains filtered or unexported fields
}

Fixed A Timer that fires at a fixed duration intervals

func NewFixed

func NewFixed(duration time.Duration) (*Fixed, error)

NewFixed Returns Fixed Timer; A Timer that fires at a fixed duration intervals.

func (*Fixed) Next

func (f *Fixed) Next() (time.Time, bool)

Next Return Next fire time.

type Logger

type Logger interface {
	Debugw(msg string, keysAndValues ...interface{})
	Errorw(msg string, keysAndValues ...interface{})
	Fatalw(msg string, keysAndValues ...interface{})
	Infow(msg string, keysAndValues ...interface{})
	Panicw(msg string, keysAndValues ...interface{})
	Warnw(msg string, keysAndValues ...interface{})
	With(args ...interface{}) Logger
	Named(name string) Logger
	Sync() error
}

Logger Sched logging interface similar to uber-go/zap, while keeping the option to change the logging implementation It is a sub-interface of uber-go/zap SugaredLogger.

func DefaultLogger

func DefaultLogger() Logger

DefaultLogger Return Default Sched Logger based on Zap's sugared logger

func NopLogger

func NopLogger() Logger

NopLogger Return a No Op Logger that prints nothing.

type Once

type Once struct {
	// contains filtered or unexported fields
}

Once A timer that run ONCE after an optional specific delay.

func NewOnce

func NewOnce(d time.Duration) (*Once, error)

NewOnce Return a timer that trigger ONCE after `d` delay as soon as Timer is inquired for the next Run. Delay = 0 means the Timer return now(), aka as soon as time is inquired.

func NewOnceTime

func NewOnceTime(t time.Time) (*Once, error)

NewOnceTime Return a timer that trigger ONCE at `t` time.Time. If `t` is in the past at inquery time, timer will NOT run.

func (*Once) Next

func (o *Once) Next() (time.Time, bool)

Next Return Next Time OR a boolean indicating no more Next()(s)

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option to customize schedule behavior, check the sched.With*() functions that implement Option interface for the available options

func WithConsoleMetrics

func WithConsoleMetrics(printEvery time.Duration) Option

WithConsoleMetrics a predefined console metrics reporter, uses the Logger interface of the schedule to print out metrics logs.

func WithExpectedRunTime

func WithExpectedRunTime(d time.Duration) Option

WithExpectedRunTime Use to indicate the expected Runtime ( Logs a warning and adds in metrics when it exceeds )

func WithLogger

func WithLogger(logger Logger) Option

WithLogger Use the supplied Logger as the logger.

func WithMetrics

func WithMetrics(metricsScope tally.Scope) Option

WithMetrics Supply a tally.Scope to expose schedule metrics with. Ex. uber-go/tally/prometheus scope to expose schedule metrics via Prometheus endpoint. Use WithConsoleMetrics() to supply a predefined metrics console reporter without the need to implement any special metrics reporter scope.

type Schedule

type Schedule struct {
	// contains filtered or unexported fields
}

Schedule A Schedule is an object that wraps a Job (func(){}) and runs it on a schedule according to the supplied Timer; With the the ability to expose metrics, and write logs to indicate job health, state, and stats.

func NewSchedule

func NewSchedule(id string, timer Timer, jobFunc func(), opts ...Option) *Schedule

NewSchedule Create a new schedule for` jobFunc func()` that will run according to `timer Timer` with the supplied []Options

func (*Schedule) Finish

func (s *Schedule) Finish()

Finish stops the scheduler and put it FINISHED state so that schedule cannot re-start again. Finish() is called automatically if Schedule Timer returned `done bool` == true. Method is **Blocking** and concurrent safe.

func (*Schedule) Start

func (s *Schedule) Start()

Start Start the scheduler. Method is concurrent safe. Calling Start() have the following effects according to the

scheduler state:
	1. NEW: Start the Schedule; running the defined Job on the first Timer's Next() time.
	2. STARTED: No Effect (and prints warning)
	3. STOPPED: Restart the schedule
	4. FINISHED: No Effect (and prints warning)

func (*Schedule) Stop

func (s *Schedule) Stop()

Stop stops the scheduler. Method is **Blocking** and concurrent safe. When called:

  1. Schedule will cancel all waiting scheduled jobs.
  2. Schedule will wait for all running jobs to finish. Calling Stop() has the following effects depending on the state of the schedule:
  3. NEW: No Effect
  4. STARTED: Stop Schedule
  5. STOPPED: No Effect
  6. FINISHED: No Effect

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler manage one or more Schedule creating them using common options, enforcing unique IDs, and supply methods to Start / Stop all schedule(s).

func NewScheduler

func NewScheduler(opts ...Option) *Scheduler

NewScheduler Creates new Scheduler, opt Options are applied to *every* schedule added and created by this scheduler.

func (*Scheduler) Add

func (s *Scheduler) Add(id string, timer Timer, job func(), extraOpts ...Option) error

Add Create a new schedule for` jobFunc func()` that will run according to `timer Timer` with the []Options of the Scheduler.

func (*Scheduler) Start

func (s *Scheduler) Start(id string) error

Start Start the Schedule with the given id. Return error if no Schedule with the given id exist.

func (*Scheduler) StartAll

func (s *Scheduler) StartAll()

StartAll Start All Schedules managed by the Scheduler

func (*Scheduler) Stop

func (s *Scheduler) Stop(id string) error

Stop Stop the Schedule with the given id. Return error if no Schedule with the given id exist.

func (*Scheduler) StopAll

func (s *Scheduler) StopAll()

StopAll Stops All Schedules managed by the Scheduler concurrently, but will block until ALL of them have stopped.

type State

type State int64

State Indicate the state of the Schedule

const (
	//NEW Schedule has just been created and hasn't started before
	NEW State = iota

	// STARTED Start Schedule has started and is running.
	STARTED

	// STOPPING Schedule is Stopping and is waiting for all active jobs to finish.
	STOPPING

	// STOPPED Schedule has stopped and no longer scheduling new Jobs.
	STOPPED

	// FINISHED Schedule has finished, and will not be able to start again.
	FINISHED
)

func (State) String

func (s State) String() string

type Timer

type Timer interface {
	Next() (next time.Time, done bool)
}

Timer is an Interface for a Timer object that is used by a Schedule to determine when to run the next run of a job. Timer need to implement the Next() method returning the time of the next Job run. Timer indicates that no jobs shall be scheduled anymore by returning done == true. The `next time.Time` returned with `done bool` == true IS IGNORED. Next() shall not return time in the past. Time in the past is reset to time.Now() at evaluation time in the scheduler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL