timewheel

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2026 License: MIT Imports: 7 Imported by: 0

README

timewheel

Go Reference Go Report Card

A generic timer wheel for Go.

Requirements

Go 1.25 or later.

Installation

go get github.com/lib-x/timewheel

Quick Start

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/lib-x/timewheel"
)

func main() {
    tw, err := timewheel.New[string](
        100*time.Millisecond,
        60,
        func(msg string) {
            fmt.Println("fired:", msg)
        },
    )
    if err != nil {
        log.Fatal(err)
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    if err := tw.Start(ctx); err != nil {
        log.Fatal(err)
    }
    defer tw.Close()

    id, err := tw.AddTimer(500*time.Millisecond, "hello")
    if err != nil {
        log.Fatal(err)
    }

    if fireAt, ok := tw.NextFireTime(id); ok {
        fmt.Printf("'hello' fires in %s\n", time.Until(fireAt).Round(time.Millisecond))
    }
}

API

Construction
func New[T any](
    interval   time.Duration,
    slotNum    int,
    defaultJob Job[T],
    opts       ...Option[T],
) (*TimeWheel[T], error)

interval is the tick resolution. Delays shorter than one interval are rounded up to one tick. slotNum controls how many buckets the wheel uses before circle counting handles longer delays.

Lifecycle
func (tw *TimeWheel[T]) Start(ctx context.Context) error
func (tw *TimeWheel[T]) Stop() error
func (tw *TimeWheel[T]) Close() error
func (tw *TimeWheel[T]) Wait()

Lifecycle is explicit:

new -> running -> closed
  • Start(nil) returns ErrNilContext.
  • Start may succeed once.
  • Starting an already running wheel returns ErrRunning.
  • Starting a closed wheel returns ErrClosed.
  • Stop before Start returns ErrNotStarted.
  • Stop is idempotent after the wheel is running or closed.
  • Close is idempotent, stops the wheel, and waits for the event loop and worker pool.
  • After Close returns, PendingTimers is empty and Stats().Pending is zero.
  • Canceling the context passed to Start stops the wheel.

Timer registration requires a running wheel. Add* and RemoveTimer return ErrNotStarted before Start and ErrClosed after shutdown begins.

Timer IDs
type TimerID uint64

All timer APIs use TimerID instead of raw integers.

One-Shot Timers
type Job[T any] func(T)
type JobContext[T any] func(context.Context, T) error

func (tw *TimeWheel[T]) AddTimer(delay time.Duration, data T) (TimerID, error)
func (tw *TimeWheel[T]) AddTimerWithJob(delay time.Duration, data T, job Job[T]) (TimerID, error)
func (tw *TimeWheel[T]) AddTimerWithContextJob(delay time.Duration, data T, job JobContext[T]) (TimerID, error)
func (tw *TimeWheel[T]) AddTimerFunc(delay time.Duration, fn func()) (TimerID, error)

Nil per-timer jobs return ErrNilJob. A nil default job is allowed only when timers provide their own job. If a timer fires without any job, the wheel logs a warning when a logger is configured and removes the timer.

Repeating Timers
type RepeatMode uint8

const (
    FixedRate RepeatMode = iota
    FixedDelay
    SkipIfRunning
)

type RepeatOptions struct {
    Mode RepeatMode
}

func (tw *TimeWheel[T]) AddRepeatingTimer(delay time.Duration, data T, opts RepeatOptions) (TimerID, error)
func (tw *TimeWheel[T]) AddRepeatingTimerWithJob(delay time.Duration, data T, job Job[T], opts RepeatOptions) (TimerID, error)
func (tw *TimeWheel[T]) AddRepeatingTimerWithContextJob(delay time.Duration, data T, job JobContext[T], opts RepeatOptions) (TimerID, error)

Repeat modes:

  • FixedRate: schedules the next fire when the current fire is dispatched. Jobs may overlap.
  • FixedDelay: waits for the previous job to return, then waits the delay. Jobs do not overlap.
  • SkipIfRunning: keeps a fixed-rate cadence but skips a fire if the previous job is still running.

FixedRate and SkipIfRunning stay anchored to the schedule grid: occurrence n is scheduled for start + n*delay, so per-dispatch lateness does not accumulate into long-term drift. When the wheel falls more than one full period behind (for example after a stall), missed periods are skipped and the timer realigns to the next future grid point instead of firing a burst. JobEvent.Lateness reports how far behind an execution started. FixedDelay intentionally re-anchors at job completion time.

The zero-value RepeatOptions{} uses FixedRate.

Removing Timers
func (tw *TimeWheel[T]) RemoveTimer(id TimerID) error

Unknown, already-fired, and already-removed timer IDs are successful no-ops. After RemoveTimer returns nil, the wheel will not dispatch any future not-yet-started execution for that timer.

RemoveTimer does not cancel a job that has already started. Use JobContext if a job needs to observe root wheel shutdown.

Observability
type JobEvent[T any] struct {
    TimerID      TimerID
    Data         T
    StartedAt    time.Time
    FinishedAt   time.Time
    ScheduledFor time.Time
    Lateness     time.Duration
    Duration     time.Duration
    Err          error
    Panic        any
    Dropped      bool
    Skipped      bool
}

type JobObserver[T any] func(JobEvent[T])

func WithJobObserver[T any](observer JobObserver[T]) Option[T]
func WithErrorHandler[T any](h func(recovered any)) Option[T]
func WithLogger[T any](l Logger) Option[T]
func WithClock[T any](clk Clock) Option[T]

JobContext errors are reported through JobEvent.Err. Panics are recovered when an error handler or observer is configured. Without either, a panic keeps normal Go behavior and crashes the program.

WithClock overrides the wheel's time source through the Clock and Ticker interfaces. It exists so tests can drive the wheel deterministically with a fake clock.

Worker Pool
type BackpressurePolicy uint8

const (
    Block BackpressurePolicy = iota
    Drop
    RunInline
)

func WithWorkerPool[T any](workers int, queueSize int, policy BackpressurePolicy) Option[T]

workers <= 0 disables the pool and runs jobs in independent goroutines. queueSize bounds the worker queue when the pool is enabled.

Backpressure policies:

  • Block: wait for queue capacity unless shutdown starts.
  • Drop: record a dropped job and do not run it when the queue is full.
  • RunInline: run the job on the event loop when the queue is full.

RunInline preserves execution but can delay ticks. When a repeating execution is dropped, the timer remains active and schedules its next attempt according to its repeat mode.

Stats
type Stats struct {
    Pending  int64
    Executed int64
    Removed  int64
    Queued   int64
    Running  int64
    Dropped  int64
    Skipped  int64
}

Pending counts timers currently waiting in wheel slots. It does not count jobs waiting in the worker queue or jobs already running.

Inspecting Timers
func (tw *TimeWheel[T]) NextFireTime(id TimerID) (time.Time, bool)
func (tw *TimeWheel[T]) PendingTimers() []TimerInfo
func (tw *TimeWheel[T]) Stats() Stats

NextFireTime and PendingTimers are snapshots. They are estimates based on the wheel state when queried, not hard real-time guarantees. Actual dispatch happens no earlier than the scheduled time and can be delayed by up to one tick plus runtime scheduling jitter.

For FixedDelay timers, there is no pending next fire while the previous job is still running; the next fire is scheduled after that job returns.

Design Notes

Time Wheel Placement
ticks  = ceil(delay / interval)
offset = ticks - 1
circle = offset / slotNum
pos    = (currentPos + offset) % slotNum

The event loop scans one slot on every tick, dispatches due tasks, then advances the wheel pointer.

Timing Guarantees

Dispatch happens no earlier than the scheduled time and can be delayed by up to one tick plus runtime scheduling jitter. The wheel position advances one slot per received tick: if the event loop stalls long enough that the runtime ticker drops ticks (for example under the Block backpressure policy or a slow RunInline job), the wheel falls behind wall-clock time and dispatches late rather than compensating. Grid anchoring keeps repeating timers from accumulating that lag into permanent drift, and JobEvent.Lateness makes it observable.

Concurrency Model

All slot and index mutations are serialized onto the event loop goroutine through an internal command channel; wheel slots are accessed without locks. Add* and RemoveTimer block until the event loop acknowledges the command, so a timer is queryable through NextFireTime as soon as its Add* call returns. Inspection APIs read a shared index guarded by an RWMutex.

Deletion Complexity

The wheel keeps a TimerID -> slot/index location index. RemoveTimer uses the index to find the timer in O(1), then removes it from the slot with swap-and-shrink. When another task is swapped into the removed position, its index is updated immediately.

Core Scope

The core package handles delay, repeat, cancel, execution, and inspection. Cron expressions, persistent scheduling, orchestration, and business-key mapping belong in separate packages layered on top.

Keyed Scheduler

The scheduler subpackage provides a keyed dynamic scheduler built on top of the core time wheel. It keeps cron-like, calendar, and business schedule logic outside the core package by requiring callers to provide the next-run calculation.

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/lib-x/timewheel/scheduler"
)

func main() {
    s, err := scheduler.NewScheduler[string, string](
        scheduler.Options[string, string]{
            Next: func(now time.Time, key string, data string) (time.Time, bool, error) {
                return now.Add(time.Minute), true, nil
            },
            Run: func(ctx context.Context, key string, data string) error {
                fmt.Println("run", key, data)
                return nil
            },
        },
        scheduler.WithReschedulePolicy(scheduler.RescheduleAfterFinish),
        scheduler.WithWheel(time.Second, 3600),
    )
    if err != nil {
        log.Fatal(err)
    }

    if err := s.Upsert(scheduler.Item[string, string]{
        Key:  "daily-report",
        Data: "payload",
    }); err != nil {
        log.Fatal(err)
    }

    if err := s.Start(context.Background()); err != nil {
        log.Fatal(err)
    }
    defer s.Close()
}

Scheduler features:

  • Upsert, ReplaceAll, and Remove manage items by key.
  • Snapshot returns pending, running, disabled, and invalid runtime state.
  • NextFunc is the only place that calculates the next execution time.
  • generation tracking prevents stale timers and stale completions from rescheduling removed or replaced items.
  • CancelRunningOnRemove, CancelRunningOnReplace, WaitRunningOnClose, and RunTimeout control running job lifecycle and can be configured with functional options.
  • WithClock injects a fake clock into the scheduler and its wheel for deterministic tests.
  • RescheduleAfterFinish avoids self-overlap, RescheduleBeforeRun supports a fixed cadence, and NoAutoReschedule leaves rescheduling to the caller.

License

MIT

Documentation

Overview

Package timewheel provides a generic timer wheel implementation.

TimeWheel is safe for concurrent use. Timer placement and deletion are serialized through the wheel event loop; job execution is dispatched outside the event loop. Wheel slots are owned exclusively by the event loop goroutine and are accessed without locks.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotStarted   = errors.New("timewheel: not started")
	ErrRunning      = errors.New("timewheel: already running")
	ErrClosed       = errors.New("timewheel: closed")
	ErrNilContext   = errors.New("timewheel: nil context")
	ErrNilJob       = errors.New("timewheel: nil job")
	ErrUnknownTimer = errors.New("timewheel: unknown timer")
	ErrQueueFull    = errors.New("timewheel: worker queue full")
)

Functions

This section is empty.

Types

type BackpressurePolicy added in v0.2.0

type BackpressurePolicy uint8

BackpressurePolicy controls behavior when the bounded worker queue is full.

const (
	// Block waits for queue capacity unless the wheel is shutting down.
	Block BackpressurePolicy = iota

	// Drop records a dropped job and does not run it when the queue is full.
	Drop

	// RunInline runs the job on the event loop when the queue is full.
	RunInline
)

type Clock added in v0.4.0

type Clock interface {
	Now() time.Time
	NewTicker(time.Duration) Ticker
}

Clock abstracts the time source used by the wheel. Implementations must be safe for concurrent use. The default clock uses the time package.

type Job

type Job[T any] func(data T)

Job is the callback signature invoked when a timer fires.

type JobContext added in v0.2.0

type JobContext[T any] func(context.Context, T) error

JobContext is the context-aware callback signature invoked when a timer fires.

type JobEvent added in v0.2.0

type JobEvent[T any] struct {
	TimerID      TimerID
	Data         T
	StartedAt    time.Time
	FinishedAt   time.Time
	ScheduledFor time.Time
	Lateness     time.Duration
	Duration     time.Duration
	Err          error
	Panic        any
	Dropped      bool
	Skipped      bool
}

JobEvent describes the result of one job scheduling or execution attempt.

type JobObserver added in v0.2.0

type JobObserver[T any] func(JobEvent[T])

JobObserver receives job execution, drop, and skip events.

type Logger

type Logger interface {
	Info(msg string, args ...any)
	Warn(msg string, args ...any)
}

Logger is the minimal logging interface used by TimeWheel.

type Option

type Option[T any] func(*config[T])

Option is a functional option for New.

func WithClock added in v0.4.0

func WithClock[T any](clk Clock) Option[T]

WithClock overrides the wheel's time source. A nil clock keeps the default real-time clock. Intended for tests that need deterministic time.

func WithErrorHandler

func WithErrorHandler[T any](h func(recovered any)) Option[T]

WithErrorHandler registers a function called with recovered job panics.

func WithJobObserver added in v0.2.0

func WithJobObserver[T any](observer JobObserver[T]) Option[T]

WithJobObserver registers a function called for job execution, drop, and skip events.

func WithLogger

func WithLogger[T any](l Logger) Option[T]

WithLogger configures the logger used for internal diagnostic messages.

func WithWorkerPool

func WithWorkerPool[T any](workers int, queueSize int, policy BackpressurePolicy) Option[T]

WithWorkerPool configures a fixed worker pool and bounded queue.

workers <= 0 disables the pool and runs each job in its own goroutine.

type RepeatMode added in v0.2.0

type RepeatMode uint8

RepeatMode controls how a repeating timer schedules its next execution.

const (
	// FixedRate schedules the next fire when the current fire is dispatched.
	// Jobs may overlap.
	FixedRate RepeatMode = iota

	// FixedDelay schedules the next fire after the previous job returns.
	// Jobs do not overlap.
	FixedDelay

	// SkipIfRunning keeps a fixed-rate cadence but skips a fire when the
	// previous job for the same timer is still running.
	SkipIfRunning
)

type RepeatOptions added in v0.2.0

type RepeatOptions struct {
	// Mode defaults to FixedRate.
	Mode RepeatMode
}

RepeatOptions configures repeating timer behavior.

type Stats

type Stats struct {
	Pending  int64
	Executed int64
	Removed  int64
	Queued   int64
	Running  int64
	Dropped  int64
	Skipped  int64
}

Stats is a snapshot of runtime counters. All fields are read atomically.

type Ticker added in v0.4.0

type Ticker interface {
	C() <-chan time.Time
	Stop()
}

Ticker is the minimal ticker surface consumed by the event loop.

type TimeWheel

type TimeWheel[T any] struct {
	// contains filtered or unexported fields
}

TimeWheel is a generic timer wheel.

Use New to construct a wheel, Start to run it, and Close to stop and wait. The zero value is not usable.

func New

func New[T any](interval time.Duration, slotNum int, defaultJob Job[T], opts ...Option[T]) (*TimeWheel[T], error)

New creates and initialises a new TimeWheel.

func (*TimeWheel[T]) AddRepeatingTimer added in v0.2.0

func (tw *TimeWheel[T]) AddRepeatingTimer(delay time.Duration, data T, opts RepeatOptions) (TimerID, error)

AddRepeatingTimer enqueues a repeating timer that uses the default job.

func (*TimeWheel[T]) AddRepeatingTimerWithContextJob added in v0.2.0

func (tw *TimeWheel[T]) AddRepeatingTimerWithContextJob(delay time.Duration, data T, job JobContext[T], opts RepeatOptions) (TimerID, error)

AddRepeatingTimerWithContextJob enqueues a repeating timer with a context-aware job.

func (*TimeWheel[T]) AddRepeatingTimerWithJob added in v0.2.0

func (tw *TimeWheel[T]) AddRepeatingTimerWithJob(delay time.Duration, data T, job Job[T], opts RepeatOptions) (TimerID, error)

AddRepeatingTimerWithJob enqueues a repeating timer with a per-timer job.

func (*TimeWheel[T]) AddTimer

func (tw *TimeWheel[T]) AddTimer(delay time.Duration, data T) (TimerID, error)

AddTimer enqueues a one-shot timer that uses the wheel's default job.

func (*TimeWheel[T]) AddTimerFunc

func (tw *TimeWheel[T]) AddTimerFunc(delay time.Duration, fn func()) (TimerID, error)

AddTimerFunc enqueues a one-shot timer that calls fn after delay.

func (*TimeWheel[T]) AddTimerWithContextJob added in v0.2.0

func (tw *TimeWheel[T]) AddTimerWithContextJob(delay time.Duration, data T, job JobContext[T]) (TimerID, error)

AddTimerWithContextJob enqueues a one-shot timer with a context-aware job.

func (*TimeWheel[T]) AddTimerWithJob

func (tw *TimeWheel[T]) AddTimerWithJob(delay time.Duration, data T, job Job[T]) (TimerID, error)

AddTimerWithJob enqueues a one-shot timer with a per-timer job.

func (*TimeWheel[T]) Close added in v0.2.0

func (tw *TimeWheel[T]) Close() error

Close stops the wheel and waits for the event loop and worker pool to exit.

func (*TimeWheel[T]) NextFireTime

func (tw *TimeWheel[T]) NextFireTime(id TimerID) (time.Time, bool)

NextFireTime returns the estimated next fire time for a pending timer.

func (*TimeWheel[T]) PendingTimers

func (tw *TimeWheel[T]) PendingTimers() []TimerInfo

PendingTimers returns a sorted snapshot of pending timers.

func (*TimeWheel[T]) RemoveTimer

func (tw *TimeWheel[T]) RemoveTimer(id TimerID) error

RemoveTimer cancels future not-yet-started executions for id.

func (*TimeWheel[T]) Start

func (tw *TimeWheel[T]) Start(ctx context.Context) error

Start launches the event loop. It may be called once successfully.

func (*TimeWheel[T]) Stats

func (tw *TimeWheel[T]) Stats() Stats

Stats returns a snapshot of runtime counters.

func (*TimeWheel[T]) Stop added in v0.2.0

func (tw *TimeWheel[T]) Stop() error

Stop begins shutdown. It does not wait for the event loop or workers to exit.

func (*TimeWheel[T]) Wait

func (tw *TimeWheel[T]) Wait()

Wait blocks until the event loop and worker pool have exited.

type TimerID added in v0.2.0

type TimerID uint64

TimerID uniquely identifies a timer within one TimeWheel instance.

type TimerInfo

type TimerInfo struct {
	Key        TimerID
	NextFireAt time.Time
	Delay      time.Duration
	Repeating  bool
	RepeatMode RepeatMode
}

TimerInfo describes a pending timer returned by PendingTimers.

Directories

Path Synopsis
Package scheduler provides a keyed dynamic scheduling layer on top of github.com/lib-x/timewheel.
Package scheduler provides a keyed dynamic scheduling layer on top of github.com/lib-x/timewheel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL