etcdcron

package module
v0.0.0-...-1864f74 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2024 License: MIT Imports: 18 Imported by: 0

README

Go Etcd Cron

Work-in-progress

GoDoc

Goal

This package aims at implementing a distributed and fault tolerant cron in order to:

  • Run an identical process on several hosts
  • Each of these process instantiate a subset of the cron jobs
  • Allow concurrent instances to execute the same job
  • Ensure only one of these processes can trigger a job
  • Number of cron jobs can scale by increasing the number of hosts
  • Jobs are persisted and loaded from etcd
  • Jobs can have a TTL and be auto-deleted
  • Jobs can have a delayed start
  • Jobs can have a max run count
  • Cron can be used for multiple tenants, via namespacing

Getting started

By default the library creates an etcd client on 127.0.0.1:2379

import etcdclientv3 "go.etcd.io/etcd/client/v3"
import etcdcron "github.com/diagridio/go-etcd-cron"

c, _ := etcdclientv3.New(etcdclientv3.Config{
  Endpoints: []string{"etcd-host1:2379", "etcd-host2:2379"},
})
cron, _ := etcdcron.New(
  WithEtcdClient(c),
  WithNamespace("my-example"),  // multi-tenancy
	WithTriggerFunc(func(ctx context.Context, triggerType string, payload *anypb.Any) error {
    log.Printf("Trigger from pid %d: %s %s\n", os.Getpid(), triggerType, string(payload.Value))
    return nil
	}),
  )
cron.AddJob(Job{
  Name: "job0",
  Rhythm: "*/2 * * * * *",
  Metadata: map[string]string{
    "type", "my-job-type"
  },
	Payload: &anypb.Any{Value: []byte("hello every 2s")},
})

Error Handling

errorsHandler := func(ctx context.Context, job etcdcron.Job, err error) {
  // Do something with the error which happened during 'job'
}
etcdErrorsHandler := func(ctx context.Context, job etcdcron.Job, err error) {
  // Do something with the error which happened at the time of the execution of 'job'
  // But locking mecanism fails because of etcd error
}

cron, _ := etcdcron.New(
  WithErrorsHandler(errorsHandler),
  WithEtcdErrorsHandler(etcdErrorsHandler),
	WithTriggerFunc(func(ctx context.Context, triggerType string, payload *anypb.Any) error {
    log.Printf("Trigger from pid %d: %s %s\n", os.Getpid(), triggerType, string(payload.Value))
    return nil
	}),
)

cron.AddJob(context.TODO(), Job{
  Name: "job0",
  Rhythm: "*/2 * * * * *",
	Payload: &anypb.Any{Value: []byte("hello every 2s")},
})

Tests

Pre-requisites to run the tests locally:

  • Run etcd locally via one of the options below:
    • Locally: Install etcd then run etcd --logger=zap
    • Docker: Running a single node etcd, for example:
      docker run -d -p 2379:2379 \
      -e ETCD_ADVERTISE_CLIENT_URLS=http://0.0.0.0:2379 \
      -e ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 \
      --name etcd quay.io/coreos/etcd:v3.5.5
      
make test

OR

go test -timeout 300s --race ./...

History

This is a fork of https://github.com/Scalingo/go-etcd-cron, which had been based on https://github.com/robfig/cron.

This fork has different goals from Scalingo's go-etcd-cron library.

Documentation

Overview

Package cron implements a cron spec parser and job runner.

Usage

Callers registers a single callback function and crons provide context via `type` and `payload`. Cron will run them in their own goroutines.

c := etcdcron.New(
	etcdcron.WithTriggerFunc(func(ctx context.Context, triggerType string, payload *anypb.Any) error {
		log.Printf("Trigger from pid %d: %s %s\n", os.Getpid(), triggerType, string(payload.Value))
		return nil
	}),
)
c.AddJob(ctx, etcdcron.Job{
	Name:    "job-100",
	Rhythm:  "*\/2 * * * * *",
	Payload: &anypb.Any{Value: []byte("Hello every 2s")},
})
c.AddJob(ctx, etcdcron.Job{
	Name:    "job-101",
	Rhythm:  "0 30 * * * *",
	Payload: &anypb.Any{Value: []byte("Every hour on the half hour")},
})
c.AddJob(ctx, etcdcron.Job{
	Name:    "job-102",
	Rhythm:  "@hourly",
	Payload: &anypb.Any{Value: []byte("Every hour")},
})
c.AddJob(ctx, etcdcron.Job{
	Name:    "job-103",
	Rhythm:  "@every 1h30m",
	Payload: &anypb.Any{Value: []byte("Every hour thirty")},
})

ctx, cancel := context.WithCancel(context.Background())
c.Start(ctx)
..
// Jobs are invoked in their own goroutine, asynchronously.
...
// Jobs may also be added to a running Cron
c.AddJob(ctx, etcdcron.Job{
	Name:    "job-103",
	Rhythm:  "@daily",
	Payload: &anypb.Any{Value: []byte("Every day")},
})
..
// Inspect the cron job entries' next and previous run times.
inspect(c.Entries())
..

// Stop the scheduler via context (it can cancel jobs already running).
cancel()
c.Wait()

CRON Expression Format

A cron expression represents a set of times, using 6 space-separated fields.

Field name   | Mandatory? | Allowed values  | Allowed special characters
----------   | ---------- | --------------  | --------------------------
Seconds      | Yes        | 0-59            | * / , -
Minutes      | Yes        | 0-59            | * / , -
Hours        | Yes        | 0-23            | * / , -
Day of month | Yes        | 1-31            | * / , - ?
Month        | Yes        | 1-12 or JAN-DEC | * / , -
Day of week  | Yes        | 0-6 or SUN-SAT  | * / , - ?

Note: Month and Day-of-week field values are case insensitive. "SUN", "Sun", and "sun" are equally accepted.

Special Characters

Asterisk ( * )

The asterisk indicates that the cron expression will match for all values of the field; e.g., using an asterisk in the 5th field (month) would indicate every month.

Slash ( / )

Slashes are used to describe increments of ranges. For example 3-59/15 in the 1st field (minutes) would indicate the 3rd minute of the hour and every 15 minutes thereafter. The form "*\/..." is equivalent to the form "first-last/...", that is, an increment over the largest possible range of the field. The form "N/..." is accepted as meaning "N-MAX/...", that is, starting at N, use the increment until the end of that specific range. It does not wrap around.

Comma ( , )

Commas are used to separate items of a list. For example, using "MON,WED,FRI" in the 5th field (day of week) would mean Mondays, Wednesdays and Fridays.

Hyphen ( - )

Hyphens are used to define ranges. For example, 9-17 would indicate every hour between 9am and 5pm inclusive.

Question mark ( ? )

Question mark may be used instead of '*' for leaving either day-of-month or day-of-week blank.

Predefined schedules

You may use one of several pre-defined schedules in place of a cron expression.

Entry                  | Description                                | Equivalent To
-----                  | -----------                                | -------------
@yearly (or @annually) | Run once a year, midnight, Jan. 1st        | 0 0 0 1 1 *
@monthly               | Run once a month, midnight, first of month | 0 0 0 1 * *
@weekly                | Run once a week, midnight on Sunday        | 0 0 0 * * 0
@daily (or @midnight)  | Run once a day, midnight                   | 0 0 0 * * *
@hourly                | Run once an hour, beginning of hour        | 0 0 * * * *

Intervals

You may also schedule a job to execute at fixed intervals. This is supported by formatting the cron spec like this:

@every <duration>

where "duration" is a string accepted by time.ParseDuration (http://golang.org/pkg/time/#ParseDuration).

For example, "@every 1h30m10s" would indicate a schedule that activates every 1 hour, 30 minutes, 10 seconds.

Note: The interval does not take the job runtime into account. For example, if a job takes 3 minutes to run, and it is scheduled to run every 5 minutes, it will have only 2 minutes of idle time between each run.

Time zones

All interpretation and scheduling is done in the machine's local time zone (as provided by the Go time package (http://www.golang.org/pkg/time).

Be aware that jobs scheduled during daylight-savings leap-ahead transitions will not be run!

Thread safety

Since the Cron service runs concurrently with the calling code, some amount of care must be taken to ensure proper synchronization.

All cron methods are designed to be correctly synchronized as long as the caller ensures that invocations have a clear happens-before ordering between them.

Implementation

Cron entries are stored in an array, sorted by their next activation time. Cron sleeps until the next job is due to be run.

Upon waking:

  • it runs each entry that is active on that second
  • it calculates the next run times for the jobs that were run
  • it re-sorts the array of entries by next activation time.
  • it goes to sleep until the soonest job.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cron

type Cron struct {
	// contains filtered or unexported fields
}

Cron keeps track of any number of entries, invoking the associated func as specified by the schedule. It may be started, stopped, and the entries may be inspected while running.

func New

func New(opts ...CronOpt) (*Cron, error)

New returns a new Cron job runner.

func (*Cron) AddJob

func (c *Cron) AddJob(ctx context.Context, job Job) error

AddJob adds a Job.

func (*Cron) DeleteJob

func (c *Cron) DeleteJob(ctx context.Context, jobName string) error

DeleteJob removes a job from store, eventually removed from cron.

func (*Cron) Entries

func (c *Cron) Entries() []*Entry

Entries returns a snapshot of the cron entries.

func (*Cron) GetJob

func (c *Cron) GetJob(jobName string) *Job

GetJob retrieves a job by name.

func (*Cron) Start

func (c *Cron) Start(ctx context.Context) error

Start the cron scheduler in its own go-routine.

func (*Cron) Wait

func (c *Cron) Wait()

Wait the cron to stop after context is cancelled.

type CronOpt

type CronOpt func(cron *Cron)

func WithErrorsHandler

func WithErrorsHandler(f func(context.Context, Job, error)) CronOpt

func WithEtcdClient

func WithEtcdClient(c *etcdclient.Client) CronOpt

func WithEtcdErrorsHandler

func WithEtcdErrorsHandler(f func(context.Context, Job, error)) CronOpt

func WithFuncCtx

func WithFuncCtx(f func(context.Context, Job) context.Context) CronOpt

func WithJobStore

func WithJobStore(s storage.JobStore) CronOpt

func WithLogConfig

func WithLogConfig(c *zap.Config) CronOpt

func WithNamespace

func WithNamespace(n string) CronOpt

func WithPartitioning

func WithPartitioning(p partitioning.Partitioner) CronOpt

func WithTriggerFunc

func WithTriggerFunc(f TriggerFunction) CronOpt

type Entry

type Entry struct {
	// The schedule on which this job should be run.
	Schedule rhythm.Schedule

	// The Job o run.
	Job Job

	// Optimization to avoid accessing Job's object.
	// This is OK because the job's name never changes for a given entry.
	JobName string
	// contains filtered or unexported fields
}

Entry consists of a schedule and the func to execute on that schedule.

type Job

type Job struct {
	// Name of the job
	Name string
	// Cron-formatted rhythm (ie. 0,10,30 1-5 0 * * *)
	Rhythm string
	// Optional metadata that the client understands.
	Metadata map[string]string
	// Optional payload containg all the information for the trigger
	Payload *anypb.Any
	// Optional start time for the first trigger of the schedule
	Repeats int32
	// Optional start time for the first trigger of the schedule
	StartTime time.Time
	// Optional time when the job must expire
	Expiration time.Time
	// Status to allow job clean up
	Status JobStatus
}

Job contains 3 mandatory options to define a job

type JobStatus

type JobStatus int
const (
	JobStatusActive JobStatus = iota
	JobStatusDeleted
)

Job Statuses

type TriggerFunction

type TriggerFunction func(ctx context.Context, req TriggerRequest) (TriggerResult, error)

type TriggerRequest

type TriggerRequest struct {
	JobName   string
	Timestamp time.Time
	Actual    time.Time
	Metadata  map[string]string
	Payload   *anypb.Any
}

type TriggerResult

type TriggerResult int
const (
	OK TriggerResult = iota
	Failure
	Delete
)

Directories

Path Synopsis
Package time contains utilities for working with times, dates, and durations.
Package time contains utilities for working with times, dates, and durations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL