timer

package module
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2025 License: MIT Imports: 8 Imported by: 0

README

timer

Go implementation of Kafka's Hierarchical Timing Wheels.

Go.Dev reference codecov Tests Go Report Card License Tag

Feature

  • Unlimited hierarchical wheel.
  • insert, delete, scan task almost O(1).
  • Different from the time wheel of Linux, it has no maximum time limit.
  • It is not advancing per TickMs, it uses DelayQueue to directly take out the most recently expired Spoke, and then advances to the expiration time of the Spoke in one step, preventing empty advances.
  • built-in a global timer instance, that tick is 1ms. wheel size is 128, use ants goroutine pool.

Usage

Installation

Use go get.

    go get github.com/thinkgos/timer

Then import the package into your own code.

    import "github.com/thinkgos/timer"
Example
monitor
package main

import (
	"log"
	"math"
	"math/rand/v2"
	"net/http"
	"sync/atomic"
	"time"

	_ "net/http/pprof"

	"github.com/thinkgos/timer"
)

// almost 1,000,000 task
func main() {
	go func() {
		sum := &atomic.Int64{}
		t := time.NewTicker(time.Second)
		for {
			<-t.C
			added := 0
			ranv := rand.IntN(10)
			max := int(rand.Uint32N(math.MaxUint16 << 2))
			for i := 100; i < max; i += 200 {
				added++
				ii := i + ranv

				timer.Go(func() {
					sum.Add(1)
					delayms := int64(ii) * 20
					task := timer.NewTask(time.Duration(delayms) * time.Millisecond).WithJob(&job{
						sum:          sum,
						expirationMs: time.Now().UnixMilli() + delayms,
					})
					timer.AddTask(task)

					// for test race
					// if ii%0x03 == 0x00 {
					// 	timer.Go(func() {
					// 		task.Cancel()
					// 	})
					// }
				})
			}
			log.Printf("task: %v - %v added: %d", timer.TaskCounter(), sum.Load(), added)
		}
	}()

	addr := ":9990"
	log.Printf("http stated '%v'\n", addr)
	log.Println(http.ListenAndServe(addr, nil))
}

type job struct {
	sum          *atomic.Int64
	expirationMs int64
}

func (j *job) Run() {
	j.sum.Add(-1)
	now := time.Now().UnixMilli()
	if diff := now - j.expirationMs; diff > 1 {
		log.Printf("this task no equal, diff: %d %d %d\n", now, j.expirationMs, diff)
	}
}
repetition
package main

import (
	"fmt"
	"time"

	"github.com/thinkgos/timer"
)

// one or two second delay repetition example
func main() {
	job := NewRepetitionJob()
	_ = timer.AddDerefTask(job)
	select {}
}

type RepetitionJob struct {
	task *timer.Task
	i    int
}

var _ timer.TaskContainer = (*RepetitionJob)(nil)

func NewRepetitionJob() *RepetitionJob {
	j := &RepetitionJob{
		task: timer.NewTask(time.Second),
		i:    1,
	}
	j.task.WithJob(j)
	return j
}

func (j *RepetitionJob) Run() {
	now := time.Now().String()
	j.i++
	_ = timer.AddTask(j.task.SetDelay(time.Second * time.Duration((j.i%2 + 1))))
	fmt.Printf("%s: repetition executed,\n", now)
}

func (j *RepetitionJob) DerefTask() *timer.Task { return j.task }
sample
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/thinkgos/timer"
)

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		index := i
		_, _ = timer.AfterFunc(time.Duration(i)*100*time.Millisecond, func() {
			fmt.Printf("%s: timer task %d is executed, remain task: %d\n", time.Now().String(), index, timer.TaskCounter())
			wg.Done()
		})
	}
	wg.Wait()
}

How it works

References

License

This project is under MIT License. See the LICENSE file for the full license text.

Documentation

Index

Examples

Constants

View Source
const (
	// DefaultTickMs default tick milliseconds.
	DefaultTickMs = 1
	// DefaultWheelSize default wheel size.
	DefaultWheelSize = 128
)

Variables

View Source
var ErrClosed = errors.New("timer: use of closed timer")

ErrClosed is returned when the timer is closed.

Functions

func AddDerefTask added in v0.8.1

func AddDerefTask(task DerefTask) error

AddDerefTask adds a task from DerefTask to the timer.

func AddTask added in v0.7.0

func AddTask(task *Task) error

AddTask adds a task to the timer.

func CompareSpoke added in v0.5.2

func CompareSpoke(sp1, sp2 *Spoke) int

CompareSpoke compares two Spoke instances based on their expiration time.

func Go added in v0.7.0

func Go(f func())

Go run a function in `ants` goroutine pool, if submit failed, fallback to use goroutine.

func IsPowOf2

func IsPowOf2(x int) bool

IsPowOf2 is the power of 2

func NextPowOf2

func NextPowOf2(x int) int

NextPowOf2 the x next power of 2.

func Start added in v0.7.0

func Start()

Start the timer.

func Started added in v0.7.0

func Started() bool

Started have started or not.

func Stop added in v0.7.0

func Stop()

Stop the timer.

func TaskCounter added in v0.7.0

func TaskCounter() int64

TaskCounter return the total number of tasks.

func TickMs added in v0.7.0

func TickMs() int64

TickMs return Basic time tick milliseconds.

func WheelSize added in v0.7.0

func WheelSize() int

WheelSize return the wheel size.

Types

type DerefTask added in v0.8.4

type DerefTask interface {
	DerefTask() *Task
}

DerefTask a container hold task

type GoPool

type GoPool interface {
	Go(f func())
}

GoPool goroutine pool.

type Job

type Job interface {
	Run()
}

Job job interface

type JobFunc

type JobFunc func()

JobFunc job function

func (JobFunc) Run

func (f JobFunc) Run()

Run implement job interface

type Option

type Option func(*Timer)

Option `Timer` custom options.

func WithGoPool

func WithGoPool(p GoPool) Option

WithGoPool set goroutine pool.

func WithTickMs

func WithTickMs(tickMs int64) Option

WithTickMs set basic time tick milliseconds.

func WithWheelSize

func WithWheelSize(size int) Option

WithWheelSize set wheel size.

type Result added in v0.7.0

type Result int
const (
	Result_Success        Result = iota // success added
	Result_Canceled                     // already canceled
	Result_AlreadyExpired               // already expired
)

the result of adding a task entry to the timing wheel.

type Spoke

type Spoke struct {
	// contains filtered or unexported fields
}

Spoke a spoke of the wheel.

func NewSpoke

func NewSpoke(taskCounter *atomic.Int64) *Spoke

func (*Spoke) Add

func (sp *Spoke) Add(te *taskEntry)

Add the timer task to this list

func (*Spoke) Delay added in v0.4.1

func (sp *Spoke) Delay() int64

Delay implements delayqueue.Delayed.

func (*Spoke) Flush

func (sp *Spoke) Flush(f func(*taskEntry))

Flush all task entries and apply the supplied function to each of them

func (*Spoke) GetExpiration

func (sp *Spoke) GetExpiration() int64

GetExpiration the spoke's expiration time

func (*Spoke) Remove

func (sp *Spoke) Remove(te *taskEntry)

Remove the specified timer task from this list

func (*Spoke) SetExpiration

func (sp *Spoke) SetExpiration(expirationMs int64) bool

SetExpiration set the spoke's expiration time Returns true if the expiration time changes.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task timer task.

func AfterFunc added in v0.7.0

func AfterFunc(d time.Duration, f func()) (*Task, error)

AfterFunc adds a function to the timer.

func NewTask

func NewTask(d time.Duration) *Task

NewTask new task with delay duration and an empty job, the accuracy is milliseconds.

func NewTaskFunc

func NewTaskFunc(d time.Duration, f func()) *Task

NewTaskFunc new task with delay duration and a function job, the accuracy is milliseconds.

func NewTaskJob added in v0.5.2

func NewTaskJob(d time.Duration, job Job) *Task

NewTaskJob new task with delay duration and a job, the accuracy is milliseconds.

func (*Task) Activated added in v0.5.0

func (t *Task) Activated() bool

Activated return true if the task is activated.

func (*Task) Cancel

func (t *Task) Cancel()

Cancel the task.

func (*Task) Delay

func (t *Task) Delay() time.Duration

Delay return the delay duration.

func (*Task) DerefTask added in v0.8.1

func (t *Task) DerefTask() *Task

DerefTask implements TaskContainer.

func (*Task) Expiry added in v0.7.1

func (t *Task) Expiry() int64

Expiry return the milliseconds as a Unix time when the task will be expired. the number of milliseconds elapsed since January 1, 1970 UTC. the value -1 indicate the task not activated.

func (*Task) ExpiryAt added in v0.7.1

func (t *Task) ExpiryAt() time.Time

ExpiryAt return the local time when the task will be expired. the zero time indicate the task not activated.

func (*Task) Run

func (t *Task) Run()

Run immediate call job. implement Job interface.

func (*Task) SetDelay added in v0.5.4

func (t *Task) SetDelay(d time.Duration) *Task

SetDelay set a new delay duration, the accuracy is milliseconds. NOTE: Only effect when re-add to `Timer`, It has no effect on the task being running!

func (*Task) WithJob

func (t *Task) WithJob(j Job) *Task

WithJob with a job

func (*Task) WithJobFunc

func (t *Task) WithJobFunc(f func()) *Task

WithJobFunc with a function job

type TaskContainer added in v0.8.1

type TaskContainer = DerefTask

TaskContainer DerefTask's alias.

type Timer

type Timer struct {
	// contains filtered or unexported fields
}

Timer is a timer

Example
tm := NewTimer()
tm.Start()
_, _ = tm.AfterFunc(100*time.Millisecond, func() {
	fmt.Println(100)
})
canceledTaskThenAddAgain := NewTask(1100 * time.Millisecond).WithJobFunc(func() {
	fmt.Println("canceled then add again")
})
_ = tm.AddTask(canceledTaskThenAddAgain)
canceledTaskThenAddAgain.Cancel()
_ = tm.AddTask(NewTask(1025 * time.Millisecond).WithJobFunc(func() {
	fmt.Println(200)
}))
_ = tm.AddTask(canceledTaskThenAddAgain)
time.Sleep(time.Second + time.Millisecond*200)
tm.Stop()
Output:

100
200
canceled then add again

func DefaultTimer added in v0.7.0

func DefaultTimer() *Timer

Timer return the default timer.

Example
fmt.Println(Started())
Start()
_, _ = AfterFunc(100*time.Millisecond, func() {
	fmt.Println(100)
})
canceledTaskThenAddAgain := NewTask(1100 * time.Millisecond).WithJobFunc(func() {
	fmt.Println("canceled then add again")
})
_ = AddTask(canceledTaskThenAddAgain)
canceledTaskThenAddAgain.Cancel()
_ = AddDerefTask(NewTask(1025 * time.Millisecond).WithJobFunc(func() {
	fmt.Println(200)
}))
_ = AddTask(canceledTaskThenAddAgain)
time.Sleep(time.Second + time.Millisecond*200)
Stop()
Output:

true
100
200
canceled then add again

func NewTimer

func NewTimer(opts ...Option) *Timer

NewTimer new timer instance. default tick is 1 milliseconds, wheel size is 512.

func (*Timer) AddDerefTask added in v0.8.1

func (t *Timer) AddDerefTask(tc DerefTask) error

AddDerefTask adds a task from DerefTask to the timer.

func (*Timer) AddTask

func (t *Timer) AddTask(task *Task) error

AddTask adds a task to the timer.

func (*Timer) AfterFunc

func (t *Timer) AfterFunc(d time.Duration, f func()) (*Task, error)

AfterFunc adds a function to the timer.

func (*Timer) Start

func (t *Timer) Start()

Start the timer.

func (*Timer) Started

func (t *Timer) Started() bool

Started have started or not.

func (*Timer) Stop

func (t *Timer) Stop()

Stop the timer, graceful shutdown waiting the goroutine until it's stopped.

func (*Timer) TaskCounter

func (t *Timer) TaskCounter() int64

TaskCounter return the total number of tasks.

func (*Timer) TickMs

func (t *Timer) TickMs() int64

TickMs return basic time tick milliseconds.

func (*Timer) WheelMask

func (t *Timer) WheelMask() int

WheelMask return the wheel mask.

func (*Timer) WheelSize

func (t *Timer) WheelSize() int

WheelSize return the wheel size.

type TimingWheel

type TimingWheel struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis
_examples
go
heap
Package heap provides heap operations for any type that implements heap.Interface.
Package heap provides heap operations for any type that implements heap.Interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL