Documentation
¶
Overview ¶
Package cronutil provides an extra light implementation of distributed cron scheduler. The Scheduler guarantees that only one Scheduler can execute a given action at a time, and the action should be executed periodically.
It relies on Redis to store lock and the time schedule. It works by continuously polling Redis to check if the time is up to execute an action and executes it if it is time. After executing, the time stored in Redis will be updated to the next execution time. If during execution of the action, another scheduler (call it scheduler B) with the same id requests to execute an action, it will hang until the execution by the original scheduler (call it scheduler A) is over. Eventually when scheduler A has completed its action, it will update the next execution time and then release the lock, causing scheduler B to check the execution time and determine that it is not the time to execute. This means, only scheduler A will execute the given action, the other schedulers will execute their given actions when scheduler A no longer requests a lock and updates next execution time in Redis (e.g. if scheduler A is down).
See also documentation for NewScheduler.
Index ¶
- Variables
- type MutableScheduler
- func (s *MutableScheduler) Err() <-chan error
- func (s *MutableScheduler) LockCtx(ctx context.Context) error
- func (s *MutableScheduler) Pause() (err error)
- func (s *MutableScheduler) Ping(ctx context.Context) error
- func (s *MutableScheduler) Reset()
- func (s *MutableScheduler) Start() error
- func (s *MutableScheduler) Stop()
- func (s *MutableScheduler) Trigger()
- func (s *MutableScheduler) UnlockCtx(ctx context.Context) error
- func (s *MutableScheduler) Unpause() (err error)
- func (s *MutableScheduler) UpdatePeriod(period time.Duration, jitter float64) (err error)
- type Scheduler
- func (s *Scheduler) Err() <-chan error
- func (s *Scheduler) LockCtx(ctx context.Context) error
- func (s *Scheduler) Ping(ctx context.Context) error
- func (s *Scheduler) Reset()
- func (s *Scheduler) Start() error
- func (s *Scheduler) Stop()
- func (s *Scheduler) Trigger()
- func (s *Scheduler) UnlockCtx(ctx context.Context) error
Constants ¶
This section is empty.
Variables ¶
var ErrMutexLocked = errors.New("mutex is locked")
Functions ¶
This section is empty.
Types ¶
type MutableScheduler ¶
type MutableScheduler struct { // A standard Redis client. Client redis.UniversalClient // Polling period to try to acquire the lock. PollingTime time.Duration // Timeout for the action. ActionTimeout time.Duration // Prefix can be configured to give a unique key to store execution time in Redis. // Execution time will be stored as Prefix:id key in the configured Redis database. Prefix string Logger logutil.ContextLogger // contains filtered or unexported fields }
MutableScheduler works like Scheduler, but with initialPeriod shared across scheduler and can be modified at will. This scheduler can also be paused, which will also pause all instances of the schedulers across all services. Because it relies on storing period time in Redis together with pause status and mutex, it must have a very short PollingTime. This is set to 5 seconds by default, the lowest resolution we currently support. You must make sure the network also supports Redis polling in this short time. Although polling time can actually be set to any values, we recommend that it is left at the default 5 seconds.
func NewMutableScheduler ¶
func NewMutableScheduler(id string, client redis.UniversalClient, initialPeriod time.Duration, action func(context.Context) error) *MutableScheduler
NewMutableScheduler creates a new mutable scheduler. initialPeriod must be set, to the same values set to all schedulers. This value is used only in the beginning, when the first scheduler is started and it initializes the period config in Redis. All other schedulers will then follow the period config set in Redis, until one of them call a function to update the period (and period jitter).
func (*MutableScheduler) Err ¶
func (s *MutableScheduler) Err() <-chan error
Err returns the error channel. This error channel should be consumed to avoid getting too many errors in the channel.
func (*MutableScheduler) LockCtx ¶
func (s *MutableScheduler) LockCtx(ctx context.Context) error
LockCtx manually locks the distributed mutex, effectively pausing this scheduler, until the lock is released again. If err is returned, that means the lock cannot be acquired, and the action will continue. The poller will also be stopped, but can be rerun with UnlockCtx.
func (*MutableScheduler) Pause ¶
func (s *MutableScheduler) Pause() (err error)
Pause sets active status to false.
func (*MutableScheduler) Ping ¶
func (s *MutableScheduler) Ping(ctx context.Context) error
Ping pings Redis to check the connection.
func (*MutableScheduler) Reset ¶
func (s *MutableScheduler) Reset()
Reset manually resets the next execution schedule to current time + initialPeriod.
func (*MutableScheduler) Start ¶
func (s *MutableScheduler) Start() error
Start starts the scheduler synchronously. You can start it in goroutine to start the Scheduler asynchronously. It will return error immediately if connection to Redis cannot be made. It will not execute its task immediately. It will wait for Scheduler.PollingTime duration to execute the action in the next period.
If Start fails, the error channel will be closed.
func (*MutableScheduler) Stop ¶
func (s *MutableScheduler) Stop()
Stop stops the scheduler and closes the error channel.
func (*MutableScheduler) Trigger ¶
func (s *MutableScheduler) Trigger()
Trigger triggers the execution of action immediately, resetting the timer on action completion.
func (*MutableScheduler) UnlockCtx ¶
func (s *MutableScheduler) UnlockCtx(ctx context.Context) error
UnlockCtx manually releases the distributed mutex.
func (*MutableScheduler) Unpause ¶
func (s *MutableScheduler) Unpause() (err error)
Unpause sets the timer setting to active. When unpausing, the timer is like recreated again and so the next run time will be set to next period.
func (*MutableScheduler) UpdatePeriod ¶
func (s *MutableScheduler) UpdatePeriod(period time.Duration, jitter float64) (err error)
UpdatePeriod updates period config, resetting the next run time.
type Scheduler ¶
type Scheduler struct { // A standard Redis client. Client redis.UniversalClient // Polling period to try to acquire the lock. PollingTime time.Duration // Timeout for the action. ActionTimeout time.Duration // The period of which the action should be executed. Period time.Duration // PeriodJitter defines how much jitter delay needs to be added to the next action run time. // It must be >= 0, while by default it is set to 0 (no jitter). // In case jitter is set to 0.1, period will be multiplied by 0.1 and a random seconds are picked between 0 and // period * period jitter. Chosen random seconds are then added to the next run time, preventing other schedulers // to start at the same time. PeriodJitter float64 // Prefix can be configured to give a unique key to store execution time in Redis. // Execution time will be stored as Prefix:id key in the configured Redis database. Prefix string Logger logutil.ContextLogger AlwaysLog bool // contains filtered or unexported fields }
Scheduler schedules action to be executed in periodically (determined by Period). Due to the polling nature of the Scheduler, it cannot be used for quick jobs (less than a few seconds). Even with very low polling time, it could result in too many lock requests which could waste resource. Due to the polling nature action is not guaranteed to execute in the exact period time. It will probably miss by a few milliseconds to seconds. It is designed for long-running jobs that are not executed too frequently. See also this package documentation.
func NewScheduler ¶
func NewScheduler(id string, client redis.UniversalClient, period time.Duration, pollingTime time.Duration, action func(context.Context) error) *Scheduler
NewScheduler creates a new scheduler. See also the package documentation and Scheduler type documentation. You should store and consume the error channel by calling Scheduler.Err(). All Schedulers that share the same action must have the exact same period parameter. Otherwise, it will result in unpredictable scheduling due to unpredictable next execution time in Redis. The "id" is the id of the job, and will have to match the other schedulers that execute the same action. period determines the duration between action execution. pollingTime determines the period of which the Scheduler polls Redis for lock and execution time. pollingTime is typically a few seconds, and can be longer for long-running jobs that are not executed frequently. pollingTime of 10 seconds means that the Scheduler will ask Redis if it's the right time to execute the given action every 10 seconds. context is given to the action function, which will have a timeout context given to it. The timeout is determined by Scheduler.ActionTimeout (default to 30 seconds), which can be changed.
It is obvious that period must be > pollingTime, and will give panic if period is <= pollingTime.
It is possible to give different pollingTime and action. pollingTime can be differentiated for example to reduce the possibility of multiple services asking for the same lock at the same time. But as start time of different Schedulers are often different, we suggest keeping the pollingTime the same as other Schedulers. action can be different for each scheduler, and there is no way to enforce them across multiple Schedulers. However, it is obvious that all Schedulers should execute the same kind of actions.
The execution time is stored in Redis with key "<prefix>:<id>" as UNIX epoch seconds. Prefix can be changed as you wish, default to defaultPrefix.
func (*Scheduler) Err ¶
Err returns the error channel. This error channel should be consumed to avoid getting too many errors in the channel.
func (*Scheduler) LockCtx ¶
LockCtx manually locks the distributed mutex, effectively pausing this scheduler, until the lock is released again. If err is returned, that means the lock cannot be acquired, and the action will continue. The poller will also be stopped, but can be rerun with UnlockCtx.
func (*Scheduler) Reset ¶
func (s *Scheduler) Reset()
Reset manually resets the next execution schedule to current time + Period.
func (*Scheduler) Start ¶
Start starts the scheduler synchronously. You can start it in goroutine to start the Scheduler asynchronously. It will return error immediately if connection to Redis cannot be made. It will not execute its task immediately. It will wait for Scheduler.PollingTime duration to execute the action in the next period.
If Start fails, the error channel will be closed.
func (*Scheduler) Stop ¶
func (s *Scheduler) Stop()
Stop stops the scheduler and closes the error channel.