slate

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2023 License: MIT Imports: 5 Imported by: 1

README

slate

slate (verb) : to plan that something will happen at a particular time in the future

slate is an in memory non persistent scheduler written in Go that simplifies handling repeat function calls.

Go Report Card


slate allows you to

  • run a scheduled event n number of times
  • guarantees a scheduled event will not run before a predefined start time
  • or a predefined expiration
  • at every n unit of time (nano, micro, milli seconds, minutes...)
  • with automatic expirations

But slate also keeps track of these scheduled events running asynchronously in go routines so you can forcefully stop them at anytime.

You can skip straight to the code and see a working example/main.go or continue to read to see how to run a simple function Every second.

If you like this package or are using for your own needs, then *star** and let me know via https://twitter.com/kylehqcom

Install

First add the package with go get gitlab.com/kylehqcom/slate

import "gitlab.com/kylehqcom/slate"

func Simple() {
    // Do some repeated work.
}

e := slate.ScheduleEntry{
    Fn:    Simple,
    Every: time.Duration(time.Second),
}
s := slate.NewSchedule()

ID, err := s.Submit(e)
if err != nil {
     return err
}

// Too easy! Now your scheduled entry will invoke the function Simple() every second.

// We can easily remove an individual scheduled entry by calling remove
s.Drop(ID)

// Or remove all scheduled entries with a reset
s.Reset()

Schedules

Schedules hold and track all the submitted ScheduleEntry entries. Create a new schedule with an optional ScheduleConfig configuration.

// Schedule created with default config options
s := slate.NewSchedule()

// To add config options to your New Schedule, you have a couple of options.
// Either make new from the ScheduleConfig struct

c := slate.ScheduleConfig{
	DisallowDuplicateEntryID bool
	FlushEvery               time.Duration
	EntryRetention           time.Duration
}

// Or call New with ScheduleConfigOption's
c := NewScheduleConfig(opts ...ScheduleConfigOption)

s, err := slate.NewSchedule(c)

The optional FlushEvery and EntryRetention fields help define a periodic background process which takes care of cleaning up completed/expired ScheduleEntry's for you. You can also manually remove your ScheduleEntry with

// Remove a single schedule entry by the ScheduleEntry.ID
s.Drop(ID EntryID)

// Or remove all schedule entries on a schedule instance
s.Reset()

Both s.Drop(ID EntryID) and s.Reset() will first mark each ScheduleEntry as stopped so that the background processing will halt accordingly. The ScheduleEntry is then removed from the schedule itself.

Manager

If you have multiple schedules to keep track of, you can utilise the convenience Manager struct and funcs. Of particular importance is the m.Reset() func which will iterate over all Schedules bound to the manager and invoke s.Reset() on each.

func NewManager() *Manager

// Add will add a schedule to this manager.
func (m *Manager) Add(name string, s *Schedule)

// All will return all Schedules on this manager.
func (m *Manager) All() Schedules

// Drop will stop and remove all related jobs on the schedule
// before removing the schedule from this manager.
func (m *Manager) Drop(name string)

// Get will return by name a schedule instance from this manager.
func (m *Manager) Get(name string) (*Schedule, error)

// Reset will stop and remove all schedules along with their associated entries.
func (m *Manager) Reset()

ScheduleEntry

In slate, a ScheduleEntry defines all of the required values to schedule your function for execution.

ScheduleEntry struct {
    // Ensure that this entry does not run before the begins time.
    Begins time.Time

    // Cap the number of times this entry will run.
    Cap uint64

    // Run this entry **every...
    Every time.Duration

    // Ensure that this entry does not run after the expires time.
    Expires time.Time

    // function to be called on **every. Wrap in closure for funcs with args.
    Fn func()

    // The generated or user assigned ID for this entry
    ID EntryID
}

Note that when assigning the Begins and/or Expires fields, the time.Time given WILL NOT ensure that a job Begins or Expires exactly at the given time.Time. slate WILL guarantee however, that a ScheduleEntry will not run before or after the time.Time given. By default, the Fn associated with the ScheduleEntry will be called immediately on submission, and on Every tick duration thereafter. If you need to submit your schedule at a given time, then I suggest you check out the sister package of https://gitlab.com/kylehqcom/slater which leverages slate and allows you to SubmitAt a future time.

As an example to better explain how the start time is determined:

If you assign an Every duration of an hour, a Begins of 12:00pm and s.Submit(e ScheduleEntry) the entry at 10:50am, the first Every will occur immediately, skip, then again at roughly 11:50am and skip due to the Begins of 12:00pm. The next Every will call the supplied Fn value at roughly 12:50pm. An error will be returned on ScheduleEntry submission should the Begins be after the Expires, or the Every duration added to the Begins is greater than Expires. By default both Begins and Expires are zero valued and therefore ignored.

As slate calls on every a Go closure/anon func(), you are required to wrap your own methods with receivers. Example

// HasMany has x4 required params
func HasMany(x, y, z string, hit int64) {

// Wrap your funcs and bind to the ScheduleEntryand by calling
e := ScheduleEntry{
    Cap:   10,
    Fn:    func() {
               HasMany("www.kylehq.com", "slate", "is great", 24)
           },
    Every: time.Duration(time.Second),
}

ID, err := s.Submit(e)
EntryIDs

By default a unique EntryID is returned on s.Submit(e ScheduleEntry) enabling you to store and track your ScheduleEntry progress. You can create directly by calling slate.GenerateEntryID(). An EntryID is unique ONLY to each schedule. If you are creating numerous schedules (use the provided manager), it may be beneficial to provide your own universal ID. Such as a user_id.

For example, imagine you have a broadcast manager with x3 schedules bound. One for push notifications, one for sms notifications and another for emails. In such a scenario is would make sense to use a user_id as your EntryID for each schedule. This gives you the flexibility to manipulate an entire schedule or individual schedule entries pertaining to a specific user. Eg:

// Add your `ScheduleEntry` instance with your Generated Entry ID
userEntryID := slate.GenerateEntryID("your user_id here")
e := ScheduleEntry {
    ID: userEntryID,
}
s := NewSchedule()
s.Submit(e)
m := NewManager()
m.Add("sms", s)

// Remove all `ScheduleEntry` instances from  the "sms" schedule.
s, err := m.Get("sms")

// Call Reset() on the Schedule to drop all `ScheduleEntry` instances from the schedule
s.Reset()

// Call DropSchedule() on the Manager to Reset() AND drop the schedule instance from the manager
m.Drop("sms")


// To remove all ScheduleEntry instances that have your userEntryID
for _, s := range m.All() {
    s.Drop(userEntryID)
}

Common Questions

Will a runner run forever?

No! Internally slate has a safety check based on a uint64 value range. If the ScheduleEntry iterates to the uint64 limit, slate will break from the running process. If you want/need to keep processing over the uint64 limit, then simply check the EntryStats() of the ScheduleEntry confirming the State and the Err of ErrMaximumRun. You can then re-add the ScheduleEntry as required. But do you really need to execute you func 18446744073709551615 times?

TODO's

  • Profiling

Documentation

Overview

Package slate is an in memory non persistent scheduler that simplifies handling repeat method calls. slate allows you to

  • Run a job n number of times
  • At a predefined start time
  • At every n unit of time (nano, micro, milli seconds, minutes...)
  • With automatic expirations

slate also keeps track of these jobs running asynchronously in go routines so you can forcefully stop them at anytime.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrDurationZero occurs when adding a new job, but the every duration is zero.
	ErrDurationZero = NewSlateError("Task duration of zero is not allowed.")

	// ErrExpirationInPast occurs when submitting a new schedule entry with an expiration in the past
	ErrExpirationInPast = NewSlateError("Schedule entry submitted with a Expires value in the past.")

	// ErrInvalidTiming occurs when submitting a new schedule entry with an expiration less than the begins time plus the every duration
	ErrInvalidTiming = NewSlateError("Schedule entry submitted cannot run due to Begins, Expires and Every values.")

	// ErrFunctionParamMismatch occurs when the fn params passed do not match the fn params required.
	ErrFunctionParamMismatch = NewSlateError("Mismatch in params passed to assigned Fn.")

	// ErrInvalidFunction occurs when the fn param bound to a schedule entry is not a valid func.
	ErrInvalidFunction = NewSlateError("Fn assigned not of type func.")

	// ErrMaximumRun occurs when a running count breaches the uint64 limit.
	ErrMaximumRun = NewSlateError("The runners Every ticker has reached the maximum uint64 number of ticks.")

	// ErrOrphanedEvent occurs when a running event cannot find the schedule that should contain it.
	ErrOrphanedEvent = NewSlateError("The executing event is no longer assigned to it's schedule. Orphaned.")

	// ErrScheduleEntryDuplicateID occurs when submitting a new schedule entry with a id that already exists on the schedule.
	ErrScheduleEntryDuplicateID = NewSlateError("Schedule entry submitted with a duplicate ID.")

	// ErrScheduleEntryNotFound occurs when unable to find a job runner by id.
	ErrScheduleEntryNotFound = NewSlateError("Scheduled entry not found.")

	// ErrScheduleNotFound occurs when unable to find a schedule by name.
	ErrScheduleNotFound = NewSlateError("Schedule not found.")
)

Functions

func IsSlateError

func IsSlateError(err error) bool

IsSlateError will return true on slateError instance.

func NewSlateError

func NewSlateError(errorMessage string) error

NewSlateError will return a new SlateError instance.

Types

type EntryID

type EntryID string

EntryID is the identifier to reference a Schedule Entry

func GenerateEntryID

func GenerateEntryID(ID ...string) EntryID

GenerateEntryID will return a unique entry ID. You can always assign your own string ID to schedules. This is especially useful when using multiple schedules that pertain to a universal ID, such as a user_id.

func (EntryID) String

func (eid EntryID) String() string

EntryID implements the Stringer interface

type Manager

type Manager struct {
	Schedules Schedules
	sync.Mutex
}

Manager is a convenience struct to manage many Schedules

func NewManager

func NewManager() *Manager

NewManager will create a new schedule manager. Managers are simply a wrapper for multiple schedule to help callers organise their workflow. If you only require a single schedule, then call NewSchedule() and work with directly.

func (*Manager) Add

func (m *Manager) Add(name string, s *Schedule)

Add will add a schedule to this manager.

func (*Manager) All

func (m *Manager) All() Schedules

All will return all Schedules on this manager.

func (*Manager) Drop

func (m *Manager) Drop(name string)

Drop will stop and remove all related jobs on the schedule before removing the schedule from this manager.

func (*Manager) Get

func (m *Manager) Get(name string) (*Schedule, error)

Get will return by name a schedule instance from this manager.

func (*Manager) Reset

func (m *Manager) Reset()

Reset will stop and remove all Schedules along with their associated entries.

type Schedule

type Schedule struct {
	Config ScheduleConfig

	sync.Mutex
	// contains filtered or unexported fields
}

Schedule is the default struct that houses job runners.

func NewSchedule

func NewSchedule(cfg ...ScheduleConfig) *Schedule

NewSchedule will return a new Scheduler instance. If nil config is passed, the defaults are used.

func (*Schedule) Drop

func (s *Schedule) Drop(ID EntryID)

Drop will first stop any job if applicable and then remove the entry from this schedule.

func (*Schedule) EntryStats

func (s *Schedule) EntryStats(ID EntryID) ScheduleEntryStats

EntryStats will return a snapshot of stats for a scheduleEntry on this schedule instance. Note that this will lock the internal runner so be mindful of multiple calls

func (*Schedule) Reset

func (s *Schedule) Reset()

Reset will stop and remove ALL jobs from this schedule.

func (*Schedule) Stats

func (s *Schedule) Stats() ScheduleStats

Stats will return a snapshot of this schedules running events. Note that this will lock the schedule so be mindful of multiple calls

func (*Schedule) Submit

func (s *Schedule) Submit(e ScheduleEntry) (EntryID, error)

Submit will add & run a new entry to the current schedule.

type ScheduleConfig

type ScheduleConfig struct {
	DisallowDuplicateEntryID bool
	FlushEvery               time.Duration
	EntryRetention           time.Duration
}

ScheduleConfig contains all available config options for a schedule

func NewScheduleConfig

func NewScheduleConfig(opts ...ScheduleConfigOption) ScheduleConfig

NewScheduleConfig will return a Config for a schedule. Takes variadic ScheduleConfigOption

type ScheduleConfigOption

type ScheduleConfigOption func(*ScheduleConfig)

ScheduleConfigOption is a func used to bind schedule config options

func WithDisallowDuplicateEntryID

func WithDisallowDuplicateEntryID(disallow bool) ScheduleConfigOption

WithDisallowDuplicateEntryID will bind the config option to (dis)allow duplicate schedule entry IDs

func WithEntryRetention

func WithEntryRetention(retention time.Duration) ScheduleConfigOption

WithEntryRetention will bind the config option to retain entries on a schedule.

func WithFlushEvery

func WithFlushEvery(every time.Duration) ScheduleConfigOption

WithFlushEvery will bind the config option to flush completed/stopped schedule entries.

type ScheduleEntry

type ScheduleEntry struct {
	// Ensure that this entry does not run before the begins time.
	Begins time.Time

	// Cap the number of times this entry will run.
	Cap uint64

	// Run this entry **every...
	Every time.Duration

	// Ensure that this entry does not run after the expires time.
	Expires time.Time

	// function to be called on **every. Wrap in closure for funcs with args.
	Fn func()

	// The generated or user assigned ID for this entry
	ID EntryID
	// contains filtered or unexported fields
}

ScheduleEntry is the default struct to assign running parameters.

type ScheduleEntryStats

type ScheduleEntryStats struct {
	Completed    time.Time
	Err          error
	RunningCount uint64
	Started      time.Time
	State        string
	Stopped      time.Time
}

ScheduleEntryStats return stats about a Schedule Entry instance

type ScheduleStats

type ScheduleStats struct {
	Config struct {
		AllowDuplicateEntryID bool
		EntryRetention        time.Duration
		FlushEvery            time.Duration
	}
	EntryCount       int
	CapBreachedCount int
	ErrorCount       int
	ExpiredCount     int
	PendingCount     int
	RunningCount     int
	StoppedCount     int
}

ScheduleStats return stats about a Schedule instance

type Schedules

type Schedules map[string]*Schedule

Schedules is a map of Schedule instances on a Manager

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL