Documentation ¶
Index ¶
- Constants
- type Job
- type JobAction
- type JobEnqueueResult
- type JobQueue
- func (q *JobQueue[K]) Dequeue() (job Job[K], shouldAcquireMore bool)
- func (q *JobQueue[K]) Enqueue(job Job[K]) JobEnqueueResult
- func (q *JobQueue[K]) Find(key K) (_ Job[K], found bool)
- func (q *JobQueue[K]) Pause()
- func (q *JobQueue[K]) Remaining(buf []Job[K]) (n int, shortbuf bool)
- func (q *JobQueue[K]) Remove(job Job[K]) bool
- func (q *JobQueue[K]) Resume()
- type SeqDataHandleFunc
- type SeqQueue
- type TimeoutData
- type TimeoutQueue
- func (q *TimeoutQueue[K, V]) Allow(key K)
- func (q *TimeoutQueue[K, V]) Clear()
- func (q *TimeoutQueue[K, V]) Find(key K) (_ V, _ bool)
- func (q *TimeoutQueue[K, V]) Forbid(key K)
- func (q *TimeoutQueue[K, V]) Len() int
- func (q *TimeoutQueue[K, V]) Less(i, j int) bool
- func (q *TimeoutQueue[K, V]) OfferWithDelay(key K, val V, wait time.Duration) error
- func (q *TimeoutQueue[K, V]) OfferWithTime(key K, val V, at time.Time) error
- func (q *TimeoutQueue[K, V]) Remains() []TimeoutData[K, V]
- func (q *TimeoutQueue[K, V]) Remove(key K) (ret V, _ bool)
- func (q *TimeoutQueue[K, V]) Start(stop <-chan struct{})
- func (q *TimeoutQueue[K, V]) Swap(i, j int)
- func (q *TimeoutQueue[K, V]) TakeCh() <-chan *TimeoutData[K, V]
Constants ¶
const ( ErrNotStarted errhelper.ErrString = "not started" ErrStopped errhelper.ErrString = "stopped" ErrKeyNotAllowed errhelper.ErrString = "key not allowed" )
Errors for timeout queue
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Job ¶
type Job[K comparable] struct { Action JobAction Key K }
Job item to record action and related resource object
type JobAction ¶
type JobAction uint8
const ( // JobActionAdd for jobs to add or create some resources JobActionAdd JobAction = 1 + iota // JobActionUpdate for jobs to update some resources JobActionUpdate // JobActionDelete for jobs to delete some resources JobActionDelete // JobActionCleanup for jobs to cleanup resources JobActionCleanup )
type JobEnqueueResult ¶ added in v0.12.0
type JobEnqueueResult uint8
JobEnqueueResult
const ( // JobEnqueueSuccess indicates the job enqueued successfully (with some side effects) // // Side effects: // // - enqueue an "Delete" job cancels "Update" job (also see JobEnqueueCounteractive) JobEnqueueSuccess JobEnqueueResult = iota // JobEnqueueDup indicates the job to enqueue was discaded due to duplication, it // is common for job scheduling and should be considered as success for most cases // // Dup condition happends if there is job with the same action and key JobEnqueueDup // JobEnqueueConflict indicates the job to enqueue was rejected because of conflict // this result is usually caused by inconsistent application state and should be considered // considered unrecoverable // // Conflict conditions: // // - enqueue an "Add" job, but there is an "Update" job // - enqueue an "Update", but there is an "Add" or "Delete" job JobEnqueueConflict // JobJobEnqueueCounteractive indicates the job to enqueue was canceled as it // counteracted with existing job, and that counteracted job also got canceled // // Counteractive conditions: // // - enqueue a "Delete" job, and there is an "Add" job (will cancel that "Add" job) JobEnqueueCounteractive // JobEnqueueInvalidAction indicates the job to enqueue was rejected because of its // invalid JobAction JobEnqueueInvalidAction )
func (JobEnqueueResult) Error ¶ added in v0.12.0
func (r JobEnqueueResult) Error() string
func (JobEnqueueResult) String ¶ added in v0.12.0
func (r JobEnqueueResult) String() string
type JobQueue ¶
type JobQueue[K comparable] struct { // contains filtered or unexported fields }
JobQueue implements a waiting interruptable blocking queue for job scheduling and deduplication It defines four kinds of job by intention
TODO: refactor it with priority queue (B+Tree)
func NewJobQueue ¶
func NewJobQueue[K comparable]() (q *JobQueue[K])
NewJobQueue creates a paused job queue.
func (*JobQueue[K]) Dequeue ¶ added in v0.12.0
Dequeue a job from the queue, it waits until there is a job available or the job queue being paused, shouldAcquireMore indicates whether the queue was paused (true means paused)
func (*JobQueue[K]) Enqueue ¶ added in v0.12.0
func (q *JobQueue[K]) Enqueue(job Job[K]) JobEnqueueResult
Enqueue enqueues a job, regardless of the state of the JobQueue.
see JobEnqueueResult for details of the return value.
func (*JobQueue[K]) Pause ¶
func (q *JobQueue[K]) Pause()
Pause pauses the Dequeue operations, existing Dequeue() call will return immediately.
func (*JobQueue[K]) Remaining ¶ added in v0.12.0
Remaining copies a list of jobs remaining into buf.
type SeqDataHandleFunc ¶ added in v0.5.4
type SeqQueue ¶
type SeqQueue[T any] struct { // contains filtered or unexported fields }
SeqQueue is the sequence queue for randomly enqueued ordered data.
func NewSeqQueue ¶
func NewSeqQueue[T any](handleData SeqDataHandleFunc[T]) *SeqQueue[T]
NewSeqQueue returns a empty SeqQueue
type TimeoutData ¶
type TimeoutData[K comparable, V any] struct { Key K Data V // contains filtered or unexported fields }
TimeoutData is the data set used internally
type TimeoutQueue ¶
type TimeoutQueue[K comparable, V any] struct { // contains filtered or unexported fields }
TimeoutQueue to arrange timeout events in a single queue, then you can access them in sequence with channel
func NewTimeoutQueue ¶
func NewTimeoutQueue[K comparable, V any]() *TimeoutQueue[K, V]
NewTimeoutQueue returns an idle TimeoutQueue
func (*TimeoutQueue[K, V]) Allow ¶
func (q *TimeoutQueue[K, V]) Allow(key K)
Allow allow tasks with key, future tasks with the key can be offered
func (*TimeoutQueue[K, V]) Clear ¶
func (q *TimeoutQueue[K, V]) Clear()
Clear out all timeout key-value pairs
func (*TimeoutQueue[K, V]) Find ¶
func (q *TimeoutQueue[K, V]) Find(key K) (_ V, _ bool)
Find timeout key-value pair according to the key
func (*TimeoutQueue[K, V]) Forbid ¶
func (q *TimeoutQueue[K, V]) Forbid(key K)
Forbid forbid tasks with key, future tasks with the key cannot be offered
func (*TimeoutQueue[K, V]) Len ¶
func (q *TimeoutQueue[K, V]) Len() int
Len is used internally for timeout data sort
func (*TimeoutQueue[K, V]) Less ¶
func (q *TimeoutQueue[K, V]) Less(i, j int) bool
Less is used internally for timeout data sort
func (*TimeoutQueue[K, V]) OfferWithDelay ¶
func (q *TimeoutQueue[K, V]) OfferWithDelay(key K, val V, wait time.Duration) error
OfferWithDelay to enqueue key-value pair, timeout after `wait`, if you would like to call Remove to delete the timeout object, `key` must be unique in this queue
func (*TimeoutQueue[K, V]) OfferWithTime ¶
func (q *TimeoutQueue[K, V]) OfferWithTime(key K, val V, at time.Time) error
OfferWithTime to enqueue key-value pair with time, timeout at `time`, if you would like to call Remove to delete the timeout object, `key` must be unique in this queue
func (*TimeoutQueue[K, V]) Remains ¶
func (q *TimeoutQueue[K, V]) Remains() []TimeoutData[K, V]
Remains shows key-value pairs not timed out
func (*TimeoutQueue[K, V]) Remove ¶
func (q *TimeoutQueue[K, V]) Remove(key K) (ret V, _ bool)
Remove a timeout object from the queue according to the key
func (*TimeoutQueue[K, V]) Start ¶
func (q *TimeoutQueue[K, V]) Start(stop <-chan struct{})
Start routine to generate timeout data
func (*TimeoutQueue[K, V]) Swap ¶
func (q *TimeoutQueue[K, V]) Swap(i, j int)
Swap is used internally for timeout data sort
func (*TimeoutQueue[K, V]) TakeCh ¶
func (q *TimeoutQueue[K, V]) TakeCh() <-chan *TimeoutData[K, V]
TakeCh returns the channel from which you can get key-value pairs timed out one by one