scheduler

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2025 License: MIT Imports: 8 Imported by: 2

README

scheduler

import "github.com/reactivego/scheduler"

Go Reference

Package scheduler provides a concurrent and a serial task scheduler with support for task cancellation.

The concurrent scheduler is exported as a global public variable with the name Goroutine. This scheduler can be used directly. Alternatively, you can create a new concurrent scheduler by calling NewConcurrentScheduler().

A serial scheduler can be instantiated by calling either New() or NewSerialScheduler() function.

Examples

Concurrent

The concurrent Goroutine scheduler will dispatch tasks asynchronously and run them concurrently with previously scheduled tasks. Nested tasks dispatched inside ScheduleRecursive by calling the function again() will be added to a serial queue and run in the order they were dispatched in.

Code:

func Example_concurrent() {
	concurrent := scheduler.Goroutine

	i := 0
	concurrent.ScheduleRecursive(func(again func()) {
		fmt.Println(i)
		i++
		if i < 5 {
			again()
		}
	})

	// Wait for the goroutine to finish.
	concurrent.Wait()
	fmt.Println("tasks =", concurrent.Count())
}

Output:

0
1
2
3
4
tasks = 0
Serial

The serial scheduler will dispatch tasks asynchronously by adding them to a serial queue and running them when the Wait method is called.

Code:

func Example_serial() {
	serial := scheduler.New()

	// Asynchronous & serial
	serial.Schedule(func() {
		fmt.Println("> outer")

		// Asynchronous & Serial
		serial.Schedule(func() {
			fmt.Println("> inner")

			// Asynchronous & Serial
			serial.Schedule(func() {
				fmt.Println("leaf")
			})

			fmt.Println("< inner")
		})

		fmt.Println("< outer")
	})

	fmt.Println("BEFORE WAIT")

	serial.Wait()

	fmt.Printf("AFTER WAIT (tasks = %d)\n", serial.Count())
}

Output:

BEFORE WAIT
> outer
< outer
> inner
< inner
leaf
AFTER WAIT (tasks = 0)
Task Cancellation

You can cancel a scheduled task as shown in the example below:

Code:

func Example_cancel() {
	const ms = time.Millisecond

	concurrent := scheduler.Goroutine

	concurrent.ScheduleFuture(10*ms, func() {
		// do nothing....
	})

	running := concurrent.ScheduleFutureRecursive(10*ms, func(again func(due time.Duration)) {
		// do nothing....
		again(10 * ms)
	})
	running.Cancel()

	concurrent.Wait()
	fmt.Println("tasks =", concurrent.Count())
}

Output:

tasks = 0
Loop Scheduling

The ScheduleLoop method provides an easy way to implement loop-like behavior:

Code:

func ExampleNew_scheduleLoop() {
	serial := scheduler.New()

	serial.ScheduleLoop(1, func(index int, again func(next int)) {
		fmt.Println(index)
		if index < 3 {
			again(index + 1)
		}
	})

	fmt.Println("BEFORE")
	serial.Wait()
	fmt.Println("AFTER")
	fmt.Println("tasks =", serial.Count())
}

Output:

BEFORE
1
2
3
AFTER
tasks = 0

Interfaces

Scheduler

Scheduler defines an interface for task execution management. Task scheduling happens asynchronously without blocking the caller. Implementation may execute tasks sequentially or concurrently.

type Scheduler interface {
	// Now returns the current time according to the scheduler.
	Now() time.Time

	// Since returns the time elapsed, is a shorthand for Now().Sub(t).
	Since(t time.Time) time.Duration

	// Schedule dispatches a task to the scheduler.
	Schedule(task func()) Runner

	// ScheduleRecursive dispatches a task to the scheduler. Use the again
	// function to schedule another iteration of a repeating algorithm on
	// the scheduler.
	ScheduleRecursive(task func(again func())) Runner

	// ScheduleLoop dispatches a task to the scheduler. Use the again
	// function to schedule another iteration of a repeating algorithm on
	// the scheduler. The current loop index is passed to the task. The loop
	// index starts at the value passed in the from argument. The task is
	// expected to pass the next loop index to the again function.
	ScheduleLoop(from int, task func(index int, again func(next int))) Runner

	// ScheduleFuture dispatches a task to the scheduler to be executed later.
	// The due time specifies when the task should be executed.
	ScheduleFuture(due time.Duration, task func()) Runner

	// ScheduleFutureRecursive dispatches a task to the scheduler to be
	// executed later. Use the again function to schedule another iteration of a
	// repeating algorithm on the scheduler. The due time specifies when the
	// task should be executed.
	ScheduleFutureRecursive(due time.Duration, task func(again func(due time.Duration))) Runner

	// Wait will return when there are no more tasks running.
	Wait()

	// Gosched will give the scheduler an oportunity to run another task
	Gosched()

	// IsConcurrent returns true for a scheduler that runs tasks concurrently.
	// When using a concurrent scheduler, tasks will need to use synchronization
	// primitives like mutexes to properly guard against race conditions when
	// accessing shared data.
	IsConcurrent() bool

	// Count returns the number of currently active tasks.
	Count() int

	// String representation when printed.
	String() string
}
SerialScheduler

SerialScheduler is a Scheduler that schedules tasks to run sequentially. Tasks scheduled on this scheduler never access shared data at the same time.

type SerialScheduler interface {
	Serial()
	Scheduler
}
ConcurrentScheduler

ConcurrentScheduler is a Scheduler that schedules tasks concurrently. Tasks will need to use synchronization primitives like mutexes to properly guard against race conditions when accessing shared data.

type ConcurrentScheduler interface {
	Concurrent()
	Scheduler
}
Runner

Runner is an interface to a running task. It can be used to cancel the running task by calling its Cancel() method.

type Runner interface {
	// Cancel the running task.
	Cancel()
}

Documentation

Overview

Package scheduler provides a concurrent and a serial task scheduler with support for task cancellation.

Example (Cancel)

Demonstrate how to cancel a scheduled task. It shows scheduling a future task and a recursive future task, then canceling the recursive task. After waiting for all tasks to complete, it verifies that no tasks remain in the scheduler queue.

package main

import (
	"fmt"
	"time"

	"github.com/reactivego/scheduler"
)

func main() {
	const ms = time.Millisecond

	concurrent := scheduler.Goroutine

	concurrent.ScheduleFuture(10*ms, func() {
		// do nothing....
	})

	running := concurrent.ScheduleFutureRecursive(10*ms, func(again func(due time.Duration)) {
		// do nothing....
		again(10 * ms)
	})
	running.Cancel()

	concurrent.Wait()
	fmt.Println("tasks =", concurrent.Count())
}
Output:

tasks = 0
Example (Concurrent)

The concurrent Goroutine scheduler will dispatch a task asynchronously and run it concurrently with previously scheduled tasks. Nested tasks dispatched inside ScheduleRecursive by calling the function again() will be asynchronous and serial.

package main

import (
	"fmt"

	"github.com/reactivego/scheduler"
)

func main() {
	concurrent := scheduler.Goroutine

	i := 0
	concurrent.ScheduleRecursive(func(again func()) {
		fmt.Println(i)
		i++
		if i < 5 {
			again()
		}
	})

	// Wait for the goroutine to finish.
	concurrent.Wait()
	fmt.Println("tasks =", concurrent.Count())
}
Output:

0
1
2
3
4
tasks = 0
Example (Serial)

The serial scheduler will dispatch tasks asynchronously by adding them to a serial queue and running them when the Wait method is called.

package main

import (
	"fmt"

	"github.com/reactivego/scheduler"
)

func main() {
	serial := scheduler.New()

	// Asynchronous & serial
	serial.Schedule(func() {
		fmt.Println("> outer")

		// Asynchronous & Serial
		serial.Schedule(func() {
			fmt.Println("> inner")

			// Asynchronous & Serial
			serial.Schedule(func() {
				fmt.Println("leaf")
			})

			fmt.Println("< inner")
		})

		fmt.Println("< outer")
	})

	fmt.Println("BEFORE WAIT")

	serial.Wait()

	fmt.Printf("AFTER WAIT (tasks = %d)\n", serial.Count())
}
Output:

BEFORE WAIT
> outer
< outer
> inner
< inner
leaf
AFTER WAIT (tasks = 0)

Index

Examples

Constants

View Source
const UnrecognizedGID = Error("unrecognized gid")

Variables

View Source
var Goroutine = ConcurrentScheduler(&goroutine{})

Goroutine is a concurrent scheduler. Schedule methods dispatch tasks asynchronously, running them concurrently with previously scheduled tasks. It is safe to call the Goroutine scheduling methods from multiple concurrently running goroutines. Nested tasks dispatched inside e.g. ScheduleRecursive by calling the function again() will be added to a serial queue and run in the order they were dispatched in.

Functions

func Gid

func Gid() uint64

Gid returns the numerical part of the goroutine id as a uint64. So, for: "goroutine 18446744073709551615" it will return uint64:18446744073709551615. If for some reason the id cannot be determined, the function panics with either UnrecognizedGID or the parsing error. The function works by getting a stack trace of the current goroutine, extracting the goroutine ID prefix, and parsing it into an integer. Calling this function takes in the order of 10 microseconds.

Types

type ConcurrentScheduler added in v0.0.5

type ConcurrentScheduler interface {
	Concurrent()
	Scheduler
}

ConcurrentScheduler is a Scheduler that schedules tasks concurrently. Tasks will need to use synchronization primitives like mutexes to properly guard against race conditions when accessing shared data.

func NewConcurrentScheduler added in v0.1.0

func NewConcurrentScheduler() ConcurrentScheduler

type Error added in v0.0.8

type Error string

func (Error) Error added in v0.0.8

func (e Error) Error() string

type Runner

type Runner interface {
	// Cancel the running task.
	Cancel()
}

Runner is an interface to a running task. It can be used to cancel the running task by calling its Cancel() method.

type Scheduler

type Scheduler interface {
	// Now returns the current time according to the scheduler.
	Now() time.Time

	// Since returns the time elapsed, is a shorthand for Now().Sub(t).
	Since(t time.Time) time.Duration

	// Schedule dispatches a task to the scheduler.
	Schedule(task func()) Runner

	// ScheduleRecursive dispatches a task to the scheduler. Use the again
	// function to schedule another iteration of a repeating algorithm on
	// the scheduler.
	ScheduleRecursive(task func(again func())) Runner

	// ScheduleLoop dispatches a task to the scheduler. Use the again
	// function to schedule another iteration of a repeating algorithm on
	// the scheduler. The current loop index is passed to the task. The loop
	// index starts at the value passed in the from argument. The task is
	// expected to pass the next loop index to the again function.
	ScheduleLoop(from int, task func(index int, again func(next int))) Runner

	// ScheduleFuture dispatches a task to the scheduler to be executed later.
	// The due time specifies when the task should be executed.
	ScheduleFuture(due time.Duration, task func()) Runner

	// ScheduleFutureRecursive dispatches a task to the scheduler to be
	// executed later. Use the again function to schedule another iteration of a
	// repeating algorithm on the scheduler. The due time specifies when the
	// task should be executed.
	ScheduleFutureRecursive(due time.Duration, task func(again func(due time.Duration))) Runner

	// Wait will return when there are no more tasks running.
	Wait()

	// Gosched will give the scheduler an oportunity to run another task
	Gosched()

	// IsConcurrent returns true for a scheduler that runs tasks concurrently.
	// When using a concurrent scheduler, tasks will need to use synchronization
	// primitives like mutexes to properly guard against race conditions when
	// accessing shared data.
	IsConcurrent() bool

	// Count returns the number of currently active tasks.
	Count() int

	// String representation when printed.
	String() string
}

Scheduler defines an interface for task execution management. Task scheduling happens asynchronously without blocking the caller. Implementation may execute tasks sequentially or concurrently.

type SerialScheduler added in v0.0.5

type SerialScheduler interface {
	Serial()
	Scheduler
}

SerialScheduler is a Scheduler that schedules tasks to run sequentially. Tasks scheduled on this scheduler never access shared data at the same time.

func New added in v0.0.3

func New() SerialScheduler

New creates and returns a serial (non-concurrent) scheduler that runs all tasks on a single goroutine. The returned scheduler is returned as a SerialScheduler interface. Tasks scheduled will be dispatched asynchronously because they are added to a serial queue. When the Wait method is called all tasks scheduled on the serial queue will be performed in the same order in which they were added to the queue.

The returned scheduler is not safe to be shared by multiple goroutines concurrently. It should be used purely from a single goroutine to schedule tasks to run sequentially.

Example (ScheduleFuture)
package main

import (
	"fmt"
	"time"

	"github.com/reactivego/scheduler"
)

func main() {
	serial := scheduler.New()

	// Asynchronous & Serial
	serial.ScheduleFuture(10*time.Millisecond, func() {
		fmt.Println("> outer")

		// Asynchronous & Serial
		serial.Schedule(func() {
			fmt.Println("> inner")

			// Asynchronous & Serial
			serial.Schedule(func() {
				fmt.Println("leaf")
			})

			fmt.Println("< inner")
		})

		fmt.Println("< outer")
	})

	fmt.Println("BEFORE WAIT")

	serial.Wait()

	fmt.Printf("AFTER WAIT (tasks = %d)\n", serial.Count())
}
Output:

BEFORE WAIT
> outer
< outer
> inner
< inner
leaf
AFTER WAIT (tasks = 0)
Example (ScheduleFutureRecursive)
package main

import (
	"fmt"
	"time"

	"github.com/reactivego/scheduler"
)

func main() {
	const ms = time.Millisecond

	serial := scheduler.New()

	serial.ScheduleFutureRecursive(0*ms, func(again func(time.Duration)) {
		fmt.Println("> outer")

		serial.ScheduleFutureRecursive(10*ms, func(again func(time.Duration)) {
			fmt.Println("leaf 10ms")
		})

		serial.ScheduleFutureRecursive(5*ms, func(again func(time.Duration)) {
			fmt.Println("leaf 5ms")
		})

		fmt.Println("< outer")
	})

	fmt.Println("BEFORE WAIT")

	serial.Wait()

	fmt.Printf("AFTER WAIT (tasks = %d)\n", serial.Count())
}
Output:

BEFORE WAIT
> outer
< outer
leaf 5ms
leaf 10ms
AFTER WAIT (tasks = 0)
Example (ScheduleLoop)
package main

import (
	"fmt"

	"github.com/reactivego/scheduler"
)

func main() {
	serial := scheduler.New()

	serial.ScheduleLoop(1, func(index int, again func(next int)) {
		fmt.Println(index)
		if index < 3 {
			again(index + 1)
		}
	})

	fmt.Println("BEFORE")
	serial.Wait()
	fmt.Println("AFTER")
	fmt.Println("tasks =", serial.Count())
}
Output:

BEFORE
1
2
3
AFTER
tasks = 0
Example (ScheduleRecursive)
package main

import (
	"fmt"

	"github.com/reactivego/scheduler"
)

func main() {
	serial := scheduler.New()

	i := 0
	serial.ScheduleRecursive(func(again func()) {
		fmt.Println(i)
		i++
		if i < 3 {
			again()
		}
	})

	fmt.Println("BEFORE")
	serial.Wait()
	fmt.Println("AFTER")
	fmt.Println("tasks =", serial.Count())
}
Output:

BEFORE
0
1
2
AFTER
tasks = 0

func NewSerialScheduler added in v0.1.0

func NewSerialScheduler() SerialScheduler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL