timer

package
v0.0.0-...-c23f155 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2024 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrTimingWheelStopped                   = twError("[timing-wheels] stopped")
	ErrTimingWheelTaskNotFound              = twError("[timing-wheels] task not found")
	ErrTimingWheelTaskEmptyJobID            = twError("[timing-wheels] empty job id in task")
	ErrTimingWheelEmptyJob                  = twError("[timing-wheels] empty job in task")
	ErrTimingWheelTaskIsExpired             = twError("[timing-wheels] task is expired")
	ErrTimingWheelTaskUnableToBeAddedToSlot = twError("[timing-wheels] task unable to be added to a flushed slot")
	ErrTimingWheelTaskUnableToBeRemoved     = twError("[timing-wheels] task unable to be removed")
	ErrTimingWheelTaskTooShortExpiration    = twError("[timing-wheels] task expiration is too short")
	ErrTimingWheelUnknownScheduler          = twError("[timing-wheels] unknown schedule")
	ErrTimingWheelTaskCancelled             = twError("[timing-wheels] task cancelled")
)
View Source
const (
	TimingWheelStatsName = "xboot/xtw"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job func(ctx context.Context, metadata JobMetadata)

Job is the function that will be executed by the timing wheel

type JobID

type JobID string

JobID is the unique identifier of a job

type JobMetadata

type JobMetadata interface {
	// GetJobID returns the jobID of the job, unique identifier.
	GetJobID() JobID
	// GetExpiredMs returns the expirationMs of the job.
	GetExpiredMs() int64
	// GetRestLoopCount returns the rest loop count.
	GetRestLoopCount() int64
	// GetJobType returns the job type.
	GetJobType() JobType
}

JobMetadata describes the metadata of a job Each slot in the timing wheel is a linked list of jobs

type JobType

type JobType uint8
const (
	OnceJob JobType = iota
	RepeatedJob
)

func (JobType) String

func (t JobType) String() string

type ScheduledTask

type ScheduledTask interface {
	Task
	UpdateNextScheduledMs()
}

ScheduledTask is the interface that wraps the repeat Job

func NewRepeatTask

func NewRepeatTask(
	ctx context.Context,
	jobID JobID,
	beginMs int64,
	scheduler Scheduler,
	job Job,
) ScheduledTask

type Scheduler

type Scheduler interface {

	// GetRestLoopCount returns the rest loop count.
	// If the rest loop count is -1, it means that the task will run forever unless cancel manually.
	GetRestLoopCount() int64
	// contains filtered or unexported methods
}

func NewFiniteScheduler

func NewFiniteScheduler(intervals ...time.Duration) Scheduler

func NewInfiniteScheduler

func NewInfiniteScheduler(intervals ...time.Duration) Scheduler

type Task

type Task interface {
	JobMetadata
	GetJobMetadata() JobMetadata
	// GetJob returns the job function.
	GetJob() Job
	// GetSlot returns the slot of the job.
	GetSlot() TimingWheelSlot

	// GetPreviousSlotMetadata returns the previous slot metadata of the job.
	GetPreviousSlotMetadata() TimingWheelSlotMetadata

	Cancel() bool
	Cancelled() bool
	// contains filtered or unexported methods
}

Task is the interface that wraps the Job

func NewOnceTask

func NewOnceTask(
	ctx context.Context,
	jobID JobID,
	expiredMs int64,
	job Job,
) Task

type TaskHandler

type TaskHandler func(Task) // Core function

TaskHandler is a function that reinserts a task into the timing wheel. It means that the task should be executed periodically or repeatedly for a certain times. Reinsert will add current task to next slot, higher level slot (overflow wheel) or the same level slot (current wheel) depending on the expirationMs of the task. When the task is reinserted, the expirationMs of the task should be updated.

  1. Check if the task is cancelled. If so, stop reinserting.
  2. Check if the task's loop count is greater than 0. If so, decrease the loop count and reinsert.
  3. Check if the task's loop count is -1 (run forever unless cancel manually). If so, reinsert and update the expirationMs.

type TimingWheel

type TimingWheel interface {
	TimingWheelCommonMetadata
	GetInterval() int64
	GetCurrentTimeMs() int64
}

type TimingWheelCommonMetadata

type TimingWheelCommonMetadata interface {
	// GetTickMs returns the baseline tick ms (interval) of the timing-wheel.
	GetTickMs() int64
	// GetStartMs returns the start ms of the timing-wheel.
	GetStartMs() int64
}

type TimingWheelSlot

type TimingWheelSlot interface {
	TimingWheelSlotMetadata
	// GetMetadata returns the metadata of the slot.
	GetMetadata() TimingWheelSlotMetadata
	// AddTask adds a task to the slot.
	AddTask(Task) error
	// RemoveTask removes a task from the slot.
	RemoveTask(Task) bool
	// Flush flushes all tasks in the slot generally,
	// but it should be called in a loop.
	Flush(TaskHandler)
}

TimingWheelSlot is the interface that wraps the slot, in kafka, it is called bucket.

func NewXSlot

func NewXSlot() TimingWheelSlot

type TimingWheelSlotMetadata

type TimingWheelSlotMetadata interface {
	// GetExpirationMs returns the expirationMs of the slot.
	GetExpirationMs() int64

	// GetSlotID returns the slotID of the slot, easy for debugging.
	GetSlotID() int64

	// GetLevel returns the level of the slot, easy for debugging.
	GetLevel() int64
	// contains filtered or unexported methods
}

type TimingWheelTimeSourceEnum

type TimingWheelTimeSourceEnum int8
const (
	SdkDefaultTime TimingWheelTimeSourceEnum = iota
	GoNativeClock
	UnixClock
)

type TimingWheels

type TimingWheels interface {
	TimingWheelCommonMetadata
	// AddTask adds a task to the timing wheels.
	AddTask(task Task) error
	// CancelTask cancels a task by jobID.
	CancelTask(jobID JobID) error
	// Shutdown stops the timing wheels
	Shutdown()
	// AfterFunc schedules a function to run after the duration delayMs.
	AfterFunc(delayMs time.Duration, fn Job) (Task, error)
	// ScheduleFunc schedules a function to run at a certain time generated by the schedule.
	ScheduleFunc(schedFn func() Scheduler, fn Job) (Task, error)
}

func NewXTimingWheels

func NewXTimingWheels(ctx context.Context, opts ...TimingWheelsOption) TimingWheels

NewXTimingWheels creates a new timing wheel. The same as the kafka, Time.SYSTEM.hiResClockMs() is used.

func NewXTimingWheelsV2

func NewXTimingWheelsV2(ctx context.Context, opts ...TimingWheelsOption) TimingWheels

NewXTimingWheelsV2 creates a new timing wheel. The same as the kafka, Time.SYSTEM.hiResClockMs() is used.

type TimingWheelsOption

type TimingWheelsOption func(option *xTimingWheelsOption)

func WithTimingWheelTimeSource

func WithTimingWheelTimeSource(source TimingWheelTimeSourceEnum) TimingWheelsOption

func WithTimingWheelsEventBufferSize

func WithTimingWheelsEventBufferSize(size int) TimingWheelsOption

func WithTimingWheelsIDGen

func WithTimingWheelsIDGen(gen id.Gen) TimingWheelsOption

func WithTimingWheelsName

func WithTimingWheelsName(name string) TimingWheelsOption

func WithTimingWheelsSlotSize

func WithTimingWheelsSlotSize(slotSize int64) TimingWheelsOption

func WithTimingWheelsStats

func WithTimingWheelsStats() TimingWheelsOption

func WithTimingWheelsTickMs

func WithTimingWheelsTickMs(basicTickMs time.Duration) TimingWheelsOption

func WithTimingWheelsWorkerPoolSize

func WithTimingWheelsWorkerPoolSize(poolSize int) TimingWheelsOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL