Documentation
¶
Overview ¶
Package scheduler provides a concurrent and a serial task scheduler with support for task cancellation.
Example (Cancel) ¶
Demonstrate how to cancel a scheduled task. It shows scheduling a future task and a recursive future task, then canceling the recursive task. After waiting for all tasks to complete, it verifies that no tasks remain in the scheduler queue.
package main import ( "fmt" "time" "github.com/reactivego/scheduler" ) func main() { const ms = time.Millisecond concurrent := scheduler.Goroutine concurrent.ScheduleFuture(10*ms, func() { // do nothing.... }) running := concurrent.ScheduleFutureRecursive(10*ms, func(again func(due time.Duration)) { // do nothing.... again(10 * ms) }) running.Cancel() concurrent.Wait() fmt.Println("tasks =", concurrent.Count()) }
Output: tasks = 0
Example (Concurrent) ¶
The concurrent Goroutine scheduler will dispatch a task asynchronously and run it concurrently with previously scheduled tasks. Nested tasks dispatched inside ScheduleRecursive by calling the function again() will be asynchronous and serial.
package main import ( "fmt" "github.com/reactivego/scheduler" ) func main() { concurrent := scheduler.Goroutine i := 0 concurrent.ScheduleRecursive(func(again func()) { fmt.Println(i) i++ if i < 5 { again() } }) // Wait for the goroutine to finish. concurrent.Wait() fmt.Println("tasks =", concurrent.Count()) }
Output: 0 1 2 3 4 tasks = 0
Example (Serial) ¶
The serial scheduler will dispatch tasks asynchronously by adding them to a serial queue and running them when the Wait method is called.
package main import ( "fmt" "github.com/reactivego/scheduler" ) func main() { serial := scheduler.New() // Asynchronous & serial serial.Schedule(func() { fmt.Println("> outer") // Asynchronous & Serial serial.Schedule(func() { fmt.Println("> inner") // Asynchronous & Serial serial.Schedule(func() { fmt.Println("leaf") }) fmt.Println("< inner") }) fmt.Println("< outer") }) fmt.Println("BEFORE WAIT") serial.Wait() fmt.Printf("AFTER WAIT (tasks = %d)\n", serial.Count()) }
Output: BEFORE WAIT > outer < outer > inner < inner leaf AFTER WAIT (tasks = 0)
Index ¶
Examples ¶
Constants ¶
const UnrecognizedGID = Error("unrecognized gid")
Variables ¶
var Goroutine = ConcurrentScheduler(&goroutine{})
Goroutine is a concurrent scheduler. Schedule methods dispatch tasks asynchronously, running them concurrently with previously scheduled tasks. It is safe to call the Goroutine scheduling methods from multiple concurrently running goroutines. Nested tasks dispatched inside e.g. ScheduleRecursive by calling the function again() will be added to a serial queue and run in the order they were dispatched in.
Functions ¶
func Gid ¶
func Gid() uint64
Gid returns the numerical part of the goroutine id as a uint64. So, for: "goroutine 18446744073709551615" it will return uint64:18446744073709551615. If for some reason the id cannot be determined, the function panics with either UnrecognizedGID or the parsing error. The function works by getting a stack trace of the current goroutine, extracting the goroutine ID prefix, and parsing it into an integer. Calling this function takes in the order of 10 microseconds.
Types ¶
type ConcurrentScheduler ¶ added in v0.0.5
type ConcurrentScheduler interface { Concurrent() Scheduler }
ConcurrentScheduler is a Scheduler that schedules tasks concurrently. Tasks will need to use synchronization primitives like mutexes to properly guard against race conditions when accessing shared data.
func NewConcurrentScheduler ¶ added in v0.1.0
func NewConcurrentScheduler() ConcurrentScheduler
type Runner ¶
type Runner interface {
// Cancel the running task.
Cancel()
}
Runner is an interface to a running task. It can be used to cancel the running task by calling its Cancel() method.
type Scheduler ¶
type Scheduler interface { // Now returns the current time according to the scheduler. Now() time.Time // Since returns the time elapsed, is a shorthand for Now().Sub(t). Since(t time.Time) time.Duration // Schedule dispatches a task to the scheduler. Schedule(task func()) Runner // ScheduleRecursive dispatches a task to the scheduler. Use the again // function to schedule another iteration of a repeating algorithm on // the scheduler. ScheduleRecursive(task func(again func())) Runner // ScheduleLoop dispatches a task to the scheduler. Use the again // function to schedule another iteration of a repeating algorithm on // the scheduler. The current loop index is passed to the task. The loop // index starts at the value passed in the from argument. The task is // expected to pass the next loop index to the again function. ScheduleLoop(from int, task func(index int, again func(next int))) Runner // ScheduleFuture dispatches a task to the scheduler to be executed later. // The due time specifies when the task should be executed. ScheduleFuture(due time.Duration, task func()) Runner // ScheduleFutureRecursive dispatches a task to the scheduler to be // executed later. Use the again function to schedule another iteration of a // repeating algorithm on the scheduler. The due time specifies when the // task should be executed. ScheduleFutureRecursive(due time.Duration, task func(again func(due time.Duration))) Runner // Wait will return when there are no more tasks running. Wait() // Gosched will give the scheduler an oportunity to run another task Gosched() // IsConcurrent returns true for a scheduler that runs tasks concurrently. // When using a concurrent scheduler, tasks will need to use synchronization // primitives like mutexes to properly guard against race conditions when // accessing shared data. IsConcurrent() bool // Count returns the number of currently active tasks. Count() int // String representation when printed. String() string }
Scheduler defines an interface for task execution management. Task scheduling happens asynchronously without blocking the caller. Implementation may execute tasks sequentially or concurrently.
type SerialScheduler ¶ added in v0.0.5
type SerialScheduler interface { Serial() Scheduler }
SerialScheduler is a Scheduler that schedules tasks to run sequentially. Tasks scheduled on this scheduler never access shared data at the same time.
func New ¶ added in v0.0.3
func New() SerialScheduler
New creates and returns a serial (non-concurrent) scheduler that runs all tasks on a single goroutine. The returned scheduler is returned as a SerialScheduler interface. Tasks scheduled will be dispatched asynchronously because they are added to a serial queue. When the Wait method is called all tasks scheduled on the serial queue will be performed in the same order in which they were added to the queue.
The returned scheduler is not safe to be shared by multiple goroutines concurrently. It should be used purely from a single goroutine to schedule tasks to run sequentially.
Example (ScheduleFuture) ¶
package main import ( "fmt" "time" "github.com/reactivego/scheduler" ) func main() { serial := scheduler.New() // Asynchronous & Serial serial.ScheduleFuture(10*time.Millisecond, func() { fmt.Println("> outer") // Asynchronous & Serial serial.Schedule(func() { fmt.Println("> inner") // Asynchronous & Serial serial.Schedule(func() { fmt.Println("leaf") }) fmt.Println("< inner") }) fmt.Println("< outer") }) fmt.Println("BEFORE WAIT") serial.Wait() fmt.Printf("AFTER WAIT (tasks = %d)\n", serial.Count()) }
Output: BEFORE WAIT > outer < outer > inner < inner leaf AFTER WAIT (tasks = 0)
Example (ScheduleFutureRecursive) ¶
package main import ( "fmt" "time" "github.com/reactivego/scheduler" ) func main() { const ms = time.Millisecond serial := scheduler.New() serial.ScheduleFutureRecursive(0*ms, func(again func(time.Duration)) { fmt.Println("> outer") serial.ScheduleFutureRecursive(10*ms, func(again func(time.Duration)) { fmt.Println("leaf 10ms") }) serial.ScheduleFutureRecursive(5*ms, func(again func(time.Duration)) { fmt.Println("leaf 5ms") }) fmt.Println("< outer") }) fmt.Println("BEFORE WAIT") serial.Wait() fmt.Printf("AFTER WAIT (tasks = %d)\n", serial.Count()) }
Output: BEFORE WAIT > outer < outer leaf 5ms leaf 10ms AFTER WAIT (tasks = 0)
Example (ScheduleLoop) ¶
package main import ( "fmt" "github.com/reactivego/scheduler" ) func main() { serial := scheduler.New() serial.ScheduleLoop(1, func(index int, again func(next int)) { fmt.Println(index) if index < 3 { again(index + 1) } }) fmt.Println("BEFORE") serial.Wait() fmt.Println("AFTER") fmt.Println("tasks =", serial.Count()) }
Output: BEFORE 1 2 3 AFTER tasks = 0
Example (ScheduleRecursive) ¶
package main import ( "fmt" "github.com/reactivego/scheduler" ) func main() { serial := scheduler.New() i := 0 serial.ScheduleRecursive(func(again func()) { fmt.Println(i) i++ if i < 3 { again() } }) fmt.Println("BEFORE") serial.Wait() fmt.Println("AFTER") fmt.Println("tasks =", serial.Count()) }
Output: BEFORE 0 1 2 AFTER tasks = 0
func NewSerialScheduler ¶ added in v0.1.0
func NewSerialScheduler() SerialScheduler