delaywheel

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2024 License: MIT Imports: 11 Imported by: 0

README

Go Reference

DelayWheel

A Go-implemented, Kafka-style timing wheel that utilizes a DelayQueue to minimize frequent tick operations, making it suitable for managing large volumes of frequent delayed and scheduled tasks.

It features built-in support for Graceful Shutdown, allowing it to wait for all tasks to finish upon shutdown, thus ensuring tasks are executed correctly.

The aim of this project is to create a universal, efficient, and user-friendly timing wheel solution.

Features

  • A Kafka-like timing wheel implementation, based on a min heap DelayQueue.
  • Supports graceful shutdown, waiting for all tasks to complete before terminating.
  • Supports both one-time execution and scheduled execution.
  • Supports custom task execution by implementing the Executor interface.
  • Features built-in logging functionality, with options to customize the logger used.

Getting Started

Delaywheel requires Go 1.18 or later. Start by installing in your project:

go get github.com/saweima12/delaywheel
Exmaple
func main() {
	// Create and startup the delaywheel
	dw, _ := delaywheel.New(1*time.Second, 20,
		delaywheel.WithLogLevel(delaywheel.LOG_DEBUG),
	)
	dw.Start()

	// Insert a new task
	dw.AfterFunc(2*time.Second, func(taskCtx *delaywheel.TaskCtx) {
		fmt.Println(taskCtx)
	})

	// Insert a scheduled task to execute every 3 seconds.
	taskId, _ := dw.ScheduleFunc(3*time.Second, func(taskCtx *delaywheel.TaskCtx) {
		fmt.Println(taskCtx)
	})
    // Cancel the target task.
	wheel.CancelTask(taskId)
    
	// Launch a separate goroutine for executing tasks, 
    // which can be replaced with a goroutine pool.
	go func() {
		for task := range dw.PendingChan() {
			task()
		}
	}()

	// Shutdown the delaywheel
	dw.Stop(func(ctx *delaywheel.StopCtx) error {
		ctx.WaitForDone(context.Background())
		return nil
	})
}
Submit a task

Use AfterFunc to submit a one-time delayed task or ScheduleFunc to submit a regularly scheduled task.

// Insert a new task
dw.AfterFunc(2*time.Second, func(taskCtx *delaywheel.TaskCtx) {
    fmt.Println(taskCtx)
})

// Insert a scheduled task to execute every 3 seconds.
dw.ScheduleFunc(3*time.Second, func(taskCtx *delaywheel.TaskCtx) {
    fmt.Println(taskCtx)
})
Customize the Executor.

After creating a struct and implementing the Executor interface, use AfterExecute and ScheduleExecute to submit tasks. This allows tasks to carry more parameters, enabling the implementation of more complex tasks.

type TestExecutor struct {
	Name string
}

func (te *TestExecutor) Execute(taskCtx *delaywheel.TaskCtx) {
	fmt.Println(taskCtx.TaskID(), te.Name)
}

func main() {
	wheel, _ := delaywheel.New(time.Second, 20, delaywheel.WithAutoRun())
	wheel.Start()
    // Execute once
	wheel.AfterExecute(time.Second, &TestExecutor{Name: "John"})
    // Execute per second
	wheel.ScheduleExecute(time.Second, &TestExecutor{Name: "John"})
}
About TaskCtx

TaskCtx provides auxiliary functions, such as:

  • TaskID() -> Obtaining the task ID
  • ExpireTime() -> Getting the expiration time
  • Cancel() -> Canceling the task
  • ReSchedule(time.Duration) -> Rescheduling the task (Note: ReSchedule will not have an effect when using ScheduleExec and ScheduleFunc)
func (te *TestExecutor) Execute(taskCtx *delaywheel.TaskCtx) {
	fmt.Println(taskCtx.TaskID(), taskCtx.ExpireTime())
	if te.Name == "Herry" {
		taskCtx.Cancel()
	} else {
		taskCtx.ReSchedule(5 * time.Second)
	}
}
About StopCtx

StopCtx assists with graceful shutdown-related functionalities, such as:

  • GetAllTask() -> Retrieves a list of all scheduled tasks
  • WaitForDone(context.Context) -> Block until all tasks are completed.
// Wait until all tasks are completed or the context times out, allowing for flexible flow control in combination with the context.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
wheel.Stop(func(stopCtx *delaywheel.StopCtx) error {
    stopCtx.WaitForDone(ctx)
    return nil
})
About Options

DelayWheel provides a few simple configuration options:

  • WithAutoRun() -> Automatically launches a goroutine to execute tasks when they expire. This is suitable for scenarios where performance demands are low and there is no desire to introduce a separate goroutine pool.
  • WithCurTaskID() -> Sets the initial task ID number, with the default being 0. This is useful for cases where synchronization with database IDs is desired.
  • WithPendingBufferSize() -> Sets the size of the pendingTask channel, with the default being 1. Evaluate the number of tasks and set accordingly, recommended to match the size of the corresponding goroutine pool.
  • WithLogLevel() -> Sets the logging level, with WARN as the default. Adjusting to INFO or DEBUG can provide more detailed log information.
  • WithLogger() -> Integrates a custom logger by implementing the Logger interface, making it compatible with other popular logging packages such as logrus, zerolog, or zap.
type TestLogger struct {
	logger *zap.SugaredLogger
}

func newTestLogger() *TestLogger {
	logger, _ := zap.NewProduction()
	return &TestLogger{
		logger: logger.Sugar(),
	}
}

// Implement from delaywheel.Logger
func (te *TestLogger) Debug(format string, args ...any) {
	te.logger.Debugf(format, args...)
}

// Implement from delaywheel.Logger
func (te *TestLogger) Info(format string, args ...any) {
	te.logger.Infof(format, args...)
}

// Implement from delaywheel.Logger
func (te *TestLogger) Error(format string, args ...any) {
	te.logger.Errorf(format, args...)
}

// Implement from delaywheel.Logger
func (te *TestLogger) Warn(format string, args ...any) {
	te.logger.Warnf(format, args...)
}

func main() {
	wheel, _ := delaywheel.New(time.Second, 20,
		delaywheel.WithAutoRun(),
		delaywheel.WithCurTaskID(30),
		delaywheel.WithLogLevel(delaywheel.LOG_ERROR),
		delaywheel.WithPendingBufferSize(30),
		delaywheel.WithLogger(newTestLogger()),
	)
	wheel.Start()
}

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	STATE_TERMINATED wheelState = iota
	STATE_READY
	STATE_SHUTTING_DOWN
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DelayWheel

type DelayWheel struct {
	// contains filtered or unexported fields
}

func New

func New(tick time.Duration, wheelSize int, options ...Option) (*DelayWheel, error)

func (*DelayWheel) AfterExecute

func (de *DelayWheel) AfterExecute(d time.Duration, executor Executor) (taskId uint64, err error)

Submit a delayed execution of a executor.

func (*DelayWheel) AfterFunc

func (de *DelayWheel) AfterFunc(d time.Duration, f func(task *TaskCtx)) (taskId uint64, err error)

Submit a delayed execution of a function.

func (*DelayWheel) CancelTask

func (de *DelayWheel) CancelTask(taskID uint64)

Cancel a task by TaskID

func (*DelayWheel) PendingChan

func (de *DelayWheel) PendingChan() <-chan func()

Acquire the channel for pending running tasks

func (*DelayWheel) ScheduleExecute

func (de *DelayWheel) ScheduleExecute(d time.Duration, executor Executor) (taskId uint64, err error)

Schedule a delayed execution of a executor with a time interval.

func (*DelayWheel) ScheduleFunc

func (de *DelayWheel) ScheduleFunc(d time.Duration, f func(ctx *TaskCtx)) (taskId uint64, err error)

Schedule a delayed execution of a function with a time interval.

func (*DelayWheel) Start

func (de *DelayWheel) Start()

Start the delaywheel.

func (*DelayWheel) Stop

func (de *DelayWheel) Stop(stopFunc StopFunc) error

Send a stop signal to delaywheel

type Executor

type Executor interface {
	Execute(taskCtx *TaskCtx)
}

Executor contains an Execute() method, > where TaskCtx is passed in to obtain the relevant parameters of the current task.

type LogLevel

type LogLevel int
const (
	LOG_DEBUG LogLevel = iota
	LOG_INFO
	LOG_WARN
	LOG_ERROR
)

type Logger

type Logger interface {
	Debug(format string, args ...any)
	Info(format string, args ...any)
	Error(format string, args ...any)
	Warn(format string, args ...any)
}

type Option

type Option func(*DelayWheel)

func WithAutoRun

func WithAutoRun() Option

WithAutoRun enables the automatic execution of tasks when they are due. When enabled, tasks will be executed in their own goroutines as soon as they are scheduled.

func WithCurTaskID

func WithCurTaskID(num uint64) Option

WithCurTaskID sets the initial task ID for the DelayWheel. This can be used to start the task IDs from a specific number.

func WithLogLevel

func WithLogLevel(level LogLevel) Option

WithLogLevel sets the logging level for the DelayWheel. This controls the verbosity of the logs produced by the DelayWheel, allowing for finer control over what is logged.

func WithLogger

func WithLogger(logger Logger) Option

WithLogger sets the logger to be used by the DelayWheel. This allows for custom logging implementations to be integrated into the DelayWheel, facilitating logging of internal events according to the user's preferences.

func WithPendingBufferSize

func WithPendingBufferSize(size int) Option

WithPendingBufferSize sets the size of the buffer for the channel that holds pending tasks. This can be used to control the maximum number of tasks that can be held in the queue before being processed, which can help in managing memory usage and controlling how tasks are batched for execution.

type StopCtx

type StopCtx struct {
	// contains filtered or unexported fields
}

func (*StopCtx) GetAllTask

func (ctx *StopCtx) GetAllTask() []*Task

func (*StopCtx) WaitForDone

func (ctx *StopCtx) WaitForDone(inputCtx context.Context)

type StopFunc

type StopFunc func(stopCtx *StopCtx) error

type Task

type Task struct {
	// contains filtered or unexported fields
}

func (*Task) Cancel

func (dt *Task) Cancel()

Cancel the task

func (*Task) Execute

func (dt *Task) Execute()

Execute the task; Notice: The task will self-recycle and clear relevant data after execution.

func (*Task) Executor

func (dt *Task) Executor() Executor

Get the executor.

func (*Task) Expiration

func (dt *Task) Expiration() int64

Get the task expiration.

func (*Task) IsCanceled added in v1.0.2

func (dt *Task) IsCanceled() bool

func (*Task) TaskID

func (dt *Task) TaskID() uint64

Get the taskID

type TaskCtx

type TaskCtx struct {
	// contains filtered or unexported fields
}

func (*TaskCtx) Cancel

func (ctx *TaskCtx) Cancel()

func (*TaskCtx) Expiration

func (ctx *TaskCtx) Expiration() int64

func (*TaskCtx) ExpireTime

func (ctx *TaskCtx) ExpireTime() time.Time

func (*TaskCtx) IsCancelled

func (ctx *TaskCtx) IsCancelled() bool

func (*TaskCtx) ReSchedule

func (ctx *TaskCtx) ReSchedule(d time.Duration)

func (*TaskCtx) TaskID

func (ctx *TaskCtx) TaskID() uint64

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL