cronutil

package module
v1.3.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2025 License: MIT Imports: 15 Imported by: 0

README

cronutil

This library provides a standard implementation for a distributed cron for use in PotatoBeans standard Go services and applications.

The distributed cron are used to schedule periodic actions across multiple services. This utilizes Redis to provide a global lock and the schedule to the action.

TODO: This library has not yet support standard cron notations. However, it has provided schedulers with a distributed mutex to do actions periodically.

This module is provided for all people to use, including PotatoBeans former clients. It is actively maintained by PotatoBeans.

Scheduler

The go-cronutil primary reason for existence is to provide Scheduler, and then also MutableScheduler. The primary goal of a scheduler is to run action periodically, but can work in multi-service and replication environment. As it uses Redis as its backend, services using this Scheduler can be stateless.

The Scheduler works with polling, so it is not recommended for highly-precise applications to do actions at precisely set time. It can be used for example to do a very long-running batch jobs without having to set stuff outside of the applications, such as Kubernetes or Nomad jobs. This way, all logic can remain written in the application in the hands of the developers.

Scheduler works by polling a distributed lock in Redis multiple times. It is defined by the scheduler pollingTime. The action period is configured with actionPeriod. So 10 minutes action period can have less than action period polling time, such as 1 minute. This way, all schedulers configured with the same scheduler ID and polling time will try to poll Redis at roughly the same time. When it has passed the time where the action is required to be executed, one of the service winning the lock will run the action. This is analogous to a distributed periodic sync.Once.

It depends on the github.com/redis/go-redis/v9 library and you must have Redis running.

Example
package main

import (
	"context"
	"github.com/redis/go-redis/v9"
	"github.com/potatobeansco/go-cronutil"
)

func main() {
	rc := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379"})

	s := cronutil.NewScheduler("myscheduler", rc, 1*time.Hour, 10*time.Minute, func(ctx context.Context) error {
        fmt.Println("hello world")
		return nil
	})
	
	// Start will start blocking. Better to call start in a goroutine and wait it using sync.WaitGroup
	go func() {
		err := s.Start()
		if err != nil {
			fmt.Printf("unable to start: %s\n", err.Error())
			os.Exit(1)
        }
    }()
	
	...
	
	// Avoid memory leak by calling stop somewhere
	s.Stop()
}

The MutableScheduler is a variant of Scheduler where period setting is stored also in Redis and can be modified. The MutableScheduler can also be paused and unpaused. See Scheduler and MutableScheduler documentation for details.

Logging and OpenTelemetry

Scheduler and MutableScheduler support OpenTelemetry logging and trace. It will try to receive the default trace provider as usual. Consult the Golang OpenTelemetry guide to set up OpenTelemetry in your application.

The schedulers have Logger attribute which can be changed. It uses the PotatoBeans logutil.ContextLogger interface. The logutil.OpenTelemetryLogger can be used to log into OpenTelemetry collector/services. See potatobeansco/go-logutil module for details.

Future Tasks

  • Enchancing documentation and example
  • Support for high precision timer
  • Support for cron notation

Documentation

Overview

Package cronutil provides an extra light implementation of distributed cron scheduler. The Scheduler guarantees that only one Scheduler can execute a given action at a time, and the action should be executed periodically.

It relies on Redis to store lock and the time schedule. It works by continuously polling Redis to check if the time is up to execute an action and executes it if it is time. After executing, the time stored in Redis will be updated to the next execution time. If during execution of the action, another scheduler (call it scheduler B) with the same id requests to execute an action, it will hang until the execution by the original scheduler (call it scheduler A) is over. Eventually when scheduler A has completed its action, it will update the next execution time and then release the lock, causing scheduler B to check the execution time and determine that it is not the time to execute. This means, only scheduler A will execute the given action, the other schedulers will execute their given actions when scheduler A no longer requests a lock and updates next execution time in Redis (e.g. if scheduler A is down).

See also documentation for NewScheduler.

Index

Constants

This section is empty.

Variables

View Source
var ErrMutexLocked = errors.New("mutex is locked")

Functions

This section is empty.

Types

type MutableScheduler

type MutableScheduler struct {
	// A standard Redis client.
	Client redis.UniversalClient

	// Polling period to try to acquire the lock.
	PollingTime time.Duration

	// Timeout for the action.
	ActionTimeout time.Duration

	// Prefix can be configured to give a unique key to store execution time in Redis.
	// Execution time will be stored as Prefix:id key in the configured Redis database.
	Prefix string

	Logger logutil.ContextLogger
	// contains filtered or unexported fields
}

MutableScheduler works like Scheduler, but with initialPeriod shared across scheduler and can be modified at will. This scheduler can also be paused, which will also pause all instances of the schedulers across all services. Because it relies on storing period time in Redis together with pause status and mutex, it must have a very short PollingTime. This is set to 5 seconds by default, the lowest resolution we currently support. You must make sure the network also supports Redis polling in this short time. Although polling time can actually be set to any values, we recommend that it is left at the default 5 seconds.

func NewMutableScheduler

func NewMutableScheduler(id string, client redis.UniversalClient, initialPeriod time.Duration, action func(context.Context) error) *MutableScheduler

NewMutableScheduler creates a new mutable scheduler. initialPeriod must be set, to the same values set to all schedulers. This value is used only in the beginning, when the first scheduler is started and it initializes the period config in Redis. All other schedulers will then follow the period config set in Redis, until one of them call a function to update the period (and period jitter).

func (*MutableScheduler) Err

func (s *MutableScheduler) Err() <-chan error

Err returns the error channel. This error channel should be consumed to avoid getting too many errors in the channel.

func (*MutableScheduler) LockCtx

func (s *MutableScheduler) LockCtx(ctx context.Context) error

LockCtx manually locks the distributed mutex, effectively pausing this scheduler, until the lock is released again. If err is returned, that means the lock cannot be acquired, and the action will continue. The poller will also be stopped, but can be rerun with UnlockCtx.

func (*MutableScheduler) Pause

func (s *MutableScheduler) Pause() (err error)

Pause sets active status to false.

func (*MutableScheduler) Ping

func (s *MutableScheduler) Ping(ctx context.Context) error

Ping pings Redis to check the connection.

func (*MutableScheduler) Reset

func (s *MutableScheduler) Reset()

Reset manually resets the next execution schedule to current time + initialPeriod.

func (*MutableScheduler) Start

func (s *MutableScheduler) Start() error

Start starts the scheduler synchronously. You can start it in goroutine to start the Scheduler asynchronously. It will return error immediately if connection to Redis cannot be made. It will not execute its task immediately. It will wait for Scheduler.PollingTime duration to execute the action in the next period.

If Start fails, the error channel will be closed.

func (*MutableScheduler) Stop

func (s *MutableScheduler) Stop()

Stop stops the scheduler and closes the error channel.

func (*MutableScheduler) Trigger

func (s *MutableScheduler) Trigger()

Trigger triggers the execution of action immediately, resetting the timer on action completion.

func (*MutableScheduler) UnlockCtx

func (s *MutableScheduler) UnlockCtx(ctx context.Context) error

UnlockCtx manually releases the distributed mutex.

func (*MutableScheduler) Unpause

func (s *MutableScheduler) Unpause() (err error)

Unpause sets the timer setting to active. When unpausing, the timer is like recreated again and so the next run time will be set to next period.

func (*MutableScheduler) UpdatePeriod

func (s *MutableScheduler) UpdatePeriod(period time.Duration, jitter float64) (err error)

UpdatePeriod updates period config, resetting the next run time.

type Scheduler

type Scheduler struct {
	// A standard Redis client.
	Client redis.UniversalClient

	// Polling period to try to acquire the lock.
	PollingTime time.Duration

	// Timeout for the action.
	ActionTimeout time.Duration

	// The period of which the action should be executed.
	Period time.Duration

	// PeriodJitter defines how much jitter delay needs to be added to the next action run time.
	// It must be >= 0, while by default it is set to 0 (no jitter).
	// In case jitter is set to 0.1, period will be multiplied by 0.1 and a random seconds are picked between 0 and
	// period * period jitter. Chosen random seconds are then added to the next run time, preventing other schedulers
	// to start at the same time.
	PeriodJitter float64

	// Prefix can be configured to give a unique key to store execution time in Redis.
	// Execution time will be stored as Prefix:id key in the configured Redis database.
	Prefix string

	Logger logutil.ContextLogger

	AlwaysLog bool
	// contains filtered or unexported fields
}

Scheduler schedules action to be executed in periodically (determined by Period). Due to the polling nature of the Scheduler, it cannot be used for quick jobs (less than a few seconds). Even with very low polling time, it could result in too many lock requests which could waste resource. Due to the polling nature action is not guaranteed to execute in the exact period time. It will probably miss by a few milliseconds to seconds. It is designed for long-running jobs that are not executed too frequently. See also this package documentation.

func NewScheduler

func NewScheduler(id string, client redis.UniversalClient, period time.Duration, pollingTime time.Duration, action func(context.Context) error) *Scheduler

NewScheduler creates a new scheduler. See also the package documentation and Scheduler type documentation. You should store and consume the error channel by calling Scheduler.Err(). All Schedulers that share the same action must have the exact same period parameter. Otherwise, it will result in unpredictable scheduling due to unpredictable next execution time in Redis. The "id" is the id of the job, and will have to match the other schedulers that execute the same action. period determines the duration between action execution. pollingTime determines the period of which the Scheduler polls Redis for lock and execution time. pollingTime is typically a few seconds, and can be longer for long-running jobs that are not executed frequently. pollingTime of 10 seconds means that the Scheduler will ask Redis if it's the right time to execute the given action every 10 seconds. context is given to the action function, which will have a timeout context given to it. The timeout is determined by Scheduler.ActionTimeout (default to 30 seconds), which can be changed.

It is obvious that period must be > pollingTime, and will give panic if period is <= pollingTime.

It is possible to give different pollingTime and action. pollingTime can be differentiated for example to reduce the possibility of multiple services asking for the same lock at the same time. But as start time of different Schedulers are often different, we suggest keeping the pollingTime the same as other Schedulers. action can be different for each scheduler, and there is no way to enforce them across multiple Schedulers. However, it is obvious that all Schedulers should execute the same kind of actions.

The execution time is stored in Redis with key "<prefix>:<id>" as UNIX epoch seconds. Prefix can be changed as you wish, default to defaultPrefix.

func (*Scheduler) Err

func (s *Scheduler) Err() <-chan error

Err returns the error channel. This error channel should be consumed to avoid getting too many errors in the channel.

func (*Scheduler) LockCtx

func (s *Scheduler) LockCtx(ctx context.Context) error

LockCtx manually locks the distributed mutex, effectively pausing this scheduler, until the lock is released again. If err is returned, that means the lock cannot be acquired, and the action will continue. The poller will also be stopped, but can be rerun with UnlockCtx.

func (*Scheduler) Ping

func (s *Scheduler) Ping(ctx context.Context) error

Ping pings Redis to check the connection.

func (*Scheduler) Reset

func (s *Scheduler) Reset()

Reset manually resets the next execution schedule to current time + Period.

func (*Scheduler) Start

func (s *Scheduler) Start() error

Start starts the scheduler synchronously. You can start it in goroutine to start the Scheduler asynchronously. It will return error immediately if connection to Redis cannot be made. It will not execute its task immediately. It will wait for Scheduler.PollingTime duration to execute the action in the next period.

If Start fails, the error channel will be closed.

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop stops the scheduler and closes the error channel.

func (*Scheduler) Trigger

func (s *Scheduler) Trigger()

Trigger triggers the execution of action immediately, resetting the timer on action completion.

func (*Scheduler) UnlockCtx

func (s *Scheduler) UnlockCtx(ctx context.Context) error

UnlockCtx manually releases the distributed mutex.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL