Documentation ¶
Index ¶
- Constants
- type Job
- type JobID
- type JobMetadata
- type JobType
- type ScheduledTask
- type Scheduler
- type Task
- type TaskHandler
- type TimingWheel
- type TimingWheelCommonMetadata
- type TimingWheelSlot
- type TimingWheelSlotMetadata
- type TimingWheelTimeSourceEnum
- type TimingWheels
- type TimingWheelsOption
- func WithTimingWheelTimeSource(source TimingWheelTimeSourceEnum) TimingWheelsOption
- func WithTimingWheelsEventBufferSize(size int) TimingWheelsOption
- func WithTimingWheelsIDGen(gen id.Gen) TimingWheelsOption
- func WithTimingWheelsName(name string) TimingWheelsOption
- func WithTimingWheelsSlotSize(slotSize int64) TimingWheelsOption
- func WithTimingWheelsStats() TimingWheelsOption
- func WithTimingWheelsTickMs(basicTickMs time.Duration) TimingWheelsOption
- func WithTimingWheelsWorkerPoolSize(poolSize int) TimingWheelsOption
Constants ¶
const ( ErrTimingWheelStopped = twError("[timing-wheels] stopped") ErrTimingWheelTaskNotFound = twError("[timing-wheels] task not found") ErrTimingWheelTaskEmptyJobID = twError("[timing-wheels] empty job id in task") ErrTimingWheelEmptyJob = twError("[timing-wheels] empty job in task") ErrTimingWheelTaskIsExpired = twError("[timing-wheels] task is expired") ErrTimingWheelTaskUnableToBeAddedToSlot = twError("[timing-wheels] task unable to be added to a flushed slot") ErrTimingWheelTaskUnableToBeRemoved = twError("[timing-wheels] task unable to be removed") ErrTimingWheelTaskTooShortExpiration = twError("[timing-wheels] task expiration is too short") ErrTimingWheelUnknownScheduler = twError("[timing-wheels] unknown schedule") ErrTimingWheelTaskCancelled = twError("[timing-wheels] task cancelled") )
const (
TimingWheelStatsName = "xboot/xtw"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Job ¶
type Job func(ctx context.Context, metadata JobMetadata)
Job is the function that will be executed by the timing wheel
type JobMetadata ¶
type JobMetadata interface { // GetJobID returns the jobID of the job, unique identifier. GetJobID() JobID // GetExpiredMs returns the expirationMs of the job. GetExpiredMs() int64 // GetRestLoopCount returns the rest loop count. GetRestLoopCount() int64 // GetJobType returns the job type. GetJobType() JobType }
JobMetadata describes the metadata of a job Each slot in the timing wheel is a linked list of jobs
type ScheduledTask ¶
type ScheduledTask interface { Task UpdateNextScheduledMs() }
ScheduledTask is the interface that wraps the repeat Job
func NewRepeatTask ¶
type Scheduler ¶
type Scheduler interface { // GetRestLoopCount returns the rest loop count. // If the rest loop count is -1, it means that the task will run forever unless cancel manually. GetRestLoopCount() int64 // contains filtered or unexported methods }
func NewFiniteScheduler ¶
func NewInfiniteScheduler ¶
type Task ¶
type Task interface { JobMetadata GetJobMetadata() JobMetadata // GetJob returns the job function. GetJob() Job // GetSlot returns the slot of the job. GetSlot() TimingWheelSlot // GetPreviousSlotMetadata returns the previous slot metadata of the job. GetPreviousSlotMetadata() TimingWheelSlotMetadata Cancel() bool Cancelled() bool // contains filtered or unexported methods }
Task is the interface that wraps the Job
type TaskHandler ¶
type TaskHandler func(Task) // Core function
TaskHandler is a function that reinserts a task into the timing wheel. It means that the task should be executed periodically or repeatedly for a certain times. Reinsert will add current task to next slot, higher level slot (overflow wheel) or the same level slot (current wheel) depending on the expirationMs of the task. When the task is reinserted, the expirationMs of the task should be updated.
- Check if the task is cancelled. If so, stop reinserting.
- Check if the task's loop count is greater than 0. If so, decrease the loop count and reinsert.
- Check if the task's loop count is -1 (run forever unless cancel manually). If so, reinsert and update the expirationMs.
type TimingWheel ¶
type TimingWheel interface { TimingWheelCommonMetadata GetInterval() int64 GetCurrentTimeMs() int64 }
type TimingWheelSlot ¶
type TimingWheelSlot interface { TimingWheelSlotMetadata // GetMetadata returns the metadata of the slot. GetMetadata() TimingWheelSlotMetadata // AddTask adds a task to the slot. AddTask(Task) error // RemoveTask removes a task from the slot. RemoveTask(Task) bool // Flush flushes all tasks in the slot generally, // but it should be called in a loop. Flush(TaskHandler) }
TimingWheelSlot is the interface that wraps the slot, in kafka, it is called bucket.
func NewXSlot ¶
func NewXSlot() TimingWheelSlot
type TimingWheelSlotMetadata ¶
type TimingWheelSlotMetadata interface { // GetExpirationMs returns the expirationMs of the slot. GetExpirationMs() int64 // GetSlotID returns the slotID of the slot, easy for debugging. GetSlotID() int64 // GetLevel returns the level of the slot, easy for debugging. GetLevel() int64 // contains filtered or unexported methods }
type TimingWheelTimeSourceEnum ¶
type TimingWheelTimeSourceEnum int8
const ( SdkDefaultTime TimingWheelTimeSourceEnum = iota GoNativeClock UnixClock )
type TimingWheels ¶
type TimingWheels interface { TimingWheelCommonMetadata // AddTask adds a task to the timing wheels. AddTask(task Task) error // CancelTask cancels a task by jobID. CancelTask(jobID JobID) error // Shutdown stops the timing wheels Shutdown() // AfterFunc schedules a function to run after the duration delayMs. AfterFunc(delayMs time.Duration, fn Job) (Task, error) // ScheduleFunc schedules a function to run at a certain time generated by the schedule. ScheduleFunc(schedFn func() Scheduler, fn Job) (Task, error) }
func NewXTimingWheels ¶
func NewXTimingWheels(ctx context.Context, opts ...TimingWheelsOption) TimingWheels
NewXTimingWheels creates a new timing wheel. The same as the kafka, Time.SYSTEM.hiResClockMs() is used.
func NewXTimingWheelsV2 ¶
func NewXTimingWheelsV2(ctx context.Context, opts ...TimingWheelsOption) TimingWheels
NewXTimingWheelsV2 creates a new timing wheel. The same as the kafka, Time.SYSTEM.hiResClockMs() is used.
type TimingWheelsOption ¶
type TimingWheelsOption func(option *xTimingWheelsOption)
func WithTimingWheelTimeSource ¶
func WithTimingWheelTimeSource(source TimingWheelTimeSourceEnum) TimingWheelsOption
func WithTimingWheelsEventBufferSize ¶
func WithTimingWheelsEventBufferSize(size int) TimingWheelsOption
func WithTimingWheelsIDGen ¶
func WithTimingWheelsIDGen(gen id.Gen) TimingWheelsOption
func WithTimingWheelsName ¶
func WithTimingWheelsName(name string) TimingWheelsOption
func WithTimingWheelsSlotSize ¶
func WithTimingWheelsSlotSize(slotSize int64) TimingWheelsOption
func WithTimingWheelsStats ¶
func WithTimingWheelsStats() TimingWheelsOption
func WithTimingWheelsTickMs ¶
func WithTimingWheelsTickMs(basicTickMs time.Duration) TimingWheelsOption
func WithTimingWheelsWorkerPoolSize ¶
func WithTimingWheelsWorkerPoolSize(poolSize int) TimingWheelsOption