Documentation
¶
Overview ¶
Package queue implements structures and routines for organizing tasks and items into managed queues. It also provides logic for obtaining progress information and metrics on these managed queues.
Index ¶
- Constants
- type EnumerationManager
- type EnumerationSourceQueue
- type EnumerationTask
- type EvaluationManager
- type EvaluationShareQueue
- type GenericManager
- type GenericQueue
- func (q *GenericQueue[V]) Dequeue() (V, bool)
- func (q *GenericQueue[V]) DequeueAndProcess(ctx context.Context, processFunc func(V) int) error
- func (q *GenericQueue[V]) DequeueAndProcessConc(ctx context.Context, maxWorkers int, processFunc func(V) int) error
- func (q *GenericQueue[V]) Enqueue(items ...V)
- func (q *GenericQueue[V]) GetSuccessful() []V
- func (q *GenericQueue[V]) HasRemainingItems() bool
- func (q *GenericQueue[V]) PostProcess(p schema.Pipeline[V]) bool
- func (q *GenericQueue[V]) PreProcess(p schema.Pipeline[V]) bool
- func (q *GenericQueue[V]) Progress() Progress
- func (q *GenericQueue[V]) SetProcessing(items ...V)
- func (q *GenericQueue[V]) SetSkipped(items ...V)
- func (q *GenericQueue[V]) SetSuccess(items ...V)
- type GenericQueueType
- type IOManager
- type IOTargetQueue
- type Manager
- type Progress
- type TaskManager
Constants ¶
const ( // DecisionSuccess is returned by a processFunc when an item was processed. DecisionSuccess = 1 // DecisionSkipped is returned by a processFunc when an item was skipped. DecisionSkipped = 0 // DecisionRequeue is returned by a processFunc when an item needs // requeueing. DecisionRequeue = -1 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EnumerationManager ¶
type EnumerationManager struct {
*GenericManager[schema.Storage, *EnumerationTask, *EnumerationSourceQueue]
}
EnumerationManager is a queue manager for enumeration operations. It is used to manage a number of different EnumerationSourceQueue that are each independent and bucketized by their source storage name.
EnumerationManager embeds a GenericManager. It is thread-safe and can both be accessed and processed concurrently.
The items contained within EnumerationSourceQueue are EnumerationTask.
func NewEnumerationManager ¶
func NewEnumerationManager() *EnumerationManager
NewEnumerationManager returns a pointer to a new EnumerationManager.
func (*EnumerationManager) Enqueue ¶
func (m *EnumerationManager) Enqueue(items ...*EnumerationTask)
Enqueue adds EnumerationTask(s) into the correct EnumerationSourceQueue, as managed by EnumerationManager, based on their respective source storage name.
func (*EnumerationManager) Progress ¶
func (m *EnumerationManager) Progress() Progress
Progress returns the Progress of the EnumerationManager.
type EnumerationSourceQueue ¶
type EnumerationSourceQueue struct {
*GenericQueue[*EnumerationTask]
}
EnumerationSourceQueue is a queue where items of a common source storage name were previously enqueued and aggregated by their EnumerationManager.
EnumerationSourceQueue embeds a GenericQueue. It is thread-safe and can both be accessed and processed concurrently.
The items contained within EnumerationSourceQueue are EnumerationTask.
func NewEnumerationSourceQueue ¶
func NewEnumerationSourceQueue() *EnumerationSourceQueue
NewEnumerationSourceQueue returns a pointer to a new EnumerationSourceQueue. This method is generally only called from the respective EnumerationManager.
func (*EnumerationSourceQueue) Progress ¶
func (q *EnumerationSourceQueue) Progress() Progress
Progress returns the Progress of the EnumerationSourceQueue.
type EnumerationTask ¶
EnumerationTask is an enumeration task for later execution.
func (*EnumerationTask) Run ¶
func (e *EnumerationTask) Run() int
Run executes the stored enumeration function of an EnumerationTask.
type EvaluationManager ¶
type EvaluationManager struct {
*GenericManager[schema.Share, *schema.Moveable, *EvaluationShareQueue]
}
EvaluationManager is a queue manager for evaluation operations. It is used to manage a number of different EvaluationShareQueue that are each independent and bucketized by using their share name.
EvaluationManager embeds a GenericManager. It is thread-safe and can both be accessed and processed concurrently.
The items contained within EvaluationShareQueue are schema.Moveable.
func NewEvaluationManager ¶
func NewEvaluationManager() *EvaluationManager
NewEvaluationManager returns a pointer to a new EvaluationManager.
func (*EvaluationManager) Enqueue ¶
func (m *EvaluationManager) Enqueue(items ...*schema.Moveable)
Enqueue adds schema.Moveable(s) into the correct EvaluationShareQueue, as managed by EvaluationManager, based on their respective shares names.
func (*EvaluationManager) Progress ¶
func (m *EvaluationManager) Progress() Progress
Progress returns the Progress of the EvaluationManager.
type EvaluationShareQueue ¶
type EvaluationShareQueue struct {
*GenericQueue[*schema.Moveable]
}
EvaluationShareQueue is a queue where items of a common share name were previously enqueued and aggregated by their EvaluationManager.
EvaluationShareQueue embeds a GenericQueue. It is thread-safe and can both be accessed and processed concurrently.
The items contained within EvaluationShareQueue are schema.Moveable.
func NewEvaluationShareQueue ¶
func NewEvaluationShareQueue() *EvaluationShareQueue
NewEvaluationShareQueue returns a pointer to a new EvaluationShareQueue. This method is generally only called from the respective EvaluationManager.
func (*EvaluationShareQueue) Progress ¶
func (q *EvaluationShareQueue) Progress() Progress
Progress returns the Progress of the EvaluationShareQueue.
type GenericManager ¶
type GenericManager[K comparable, V comparable, Q GenericQueueType[V]] struct { sync.RWMutex // contains filtered or unexported fields }
GenericManager is a generic queue manager for queues of GenericQueueType.
func NewGenericManager ¶
func NewGenericManager[K comparable, V comparable, Q GenericQueueType[V]]() *GenericManager[K, V, Q]
NewGenericManager returns a pointer to a new GenericManager.
func (*GenericManager[K, V, Q]) Enqueue ¶
func (m *GenericManager[K, V, Q]) Enqueue(item V, getKeyFunc func(V) K, newQueueFunc func() Q)
Enqueue bucketizes items into queues according to a getKeyFunc, creating new queues as required using a newQueueFunc.
func (*GenericManager[K, V, Q]) GetQueues ¶
func (m *GenericManager[K, V, Q]) GetQueues() map[K]Q
GetQueues returns a copy of the internal map holding pointers to all managed queues.
func (*GenericManager[K, V, Q]) GetSuccessful ¶
func (m *GenericManager[K, V, Q]) GetSuccessful() []V
GetSuccessful returns a slice of all queues successfully processed items.
func (*GenericManager[K, V, Q]) Progress ¶
func (m *GenericManager[K, V, Q]) Progress() Progress
Progress returns the Progress for the GenericManager.
type GenericQueue ¶
type GenericQueue[V comparable] struct { sync.RWMutex // contains filtered or unexported fields }
GenericQueue is a generic queue that can hold any comparable type of items.
func NewGenericQueue ¶
func NewGenericQueue[V comparable]() *GenericQueue[V]
NewGenericQueue returns a pointer to a new GenericQueue.
func (*GenericQueue[V]) Dequeue ¶
func (q *GenericQueue[V]) Dequeue() (V, bool)
Dequeue returns an item from the queue and advances the queue head.
func (*GenericQueue[V]) DequeueAndProcess ¶
func (q *GenericQueue[V]) DequeueAndProcess(ctx context.Context, processFunc func(V) int) error
DequeueAndProcess sequentially dequeues and processes items using the given processFunc. An error is only returned in case of a context cancellation, the processFunc is otherwise expected to return only an integer with the processing function's decision for that item.
Possible decisions to be returned: DecisionSuccess, DecisionSkipped, DecisionRequeue.
func (*GenericQueue[V]) DequeueAndProcessConc ¶
func (q *GenericQueue[V]) DequeueAndProcessConc(ctx context.Context, maxWorkers int, processFunc func(V) int) error
DequeueAndProcessConc concurrently dequeues and processes items using given processFunc. An error is only returned in case of a context cancellation, the processFunc is otherwise expected to return only an integer with the processing function's decision for that item.
Possible decisions to be returned: DecisionSuccess, DecisionSkipped, DecisionRequeue.
It is the responsibility of the processFunc to ensure thread-safety for anything happening inside the processFunc, with the GenericQueue only guaranteeing thread-safety for itself.
func (*GenericQueue[V]) Enqueue ¶
func (q *GenericQueue[V]) Enqueue(items ...V)
Enqueue adds items to the queue.
func (*GenericQueue[V]) GetSuccessful ¶
func (q *GenericQueue[V]) GetSuccessful() []V
GetSuccessful returns a copy of the internal slice holding all successful items.
func (*GenericQueue[V]) HasRemainingItems ¶
func (q *GenericQueue[V]) HasRemainingItems() bool
HasRemainingItems returns whether a queue has remaining items to process.
func (*GenericQueue[V]) PostProcess ¶
func (q *GenericQueue[V]) PostProcess(p schema.Pipeline[V]) bool
PostProcess runs a schema.Pipeline's contained post-processors on all successfully processed queue items.
func (*GenericQueue[V]) PreProcess ¶
func (q *GenericQueue[V]) PreProcess(p schema.Pipeline[V]) bool
PreProcess runs a schema.Pipeline's contained pre-processors on all yet unprocessed queue items.
If the queue is being operated on concurrently, sorting functions should not be used as pre-processors. Instead, such functions should be post-processors instead to guarantee that the order is preserved at the end of the queue.
func (*GenericQueue[V]) Progress ¶
func (q *GenericQueue[V]) Progress() Progress
Progress returns the Progress for the GenericQueue.
func (*GenericQueue[V]) SetProcessing ¶
func (q *GenericQueue[V]) SetProcessing(items ...V)
SetProcessing sets given items as in progress (processing).
func (*GenericQueue[V]) SetSkipped ¶
func (q *GenericQueue[V]) SetSkipped(items ...V)
SetSkipped sets given in-progress queue items as skipped. The items are removed from the in-progress map in the process.
func (*GenericQueue[V]) SetSuccess ¶
func (q *GenericQueue[V]) SetSuccess(items ...V)
SetSuccess sets given in-progress queue items as successfully processed. The items are removed from the in-progress map in the process.
type GenericQueueType ¶
type GenericQueueType[V comparable] interface { Enqueue(items ...V) GetSuccessful() []V Progress() Progress }
GenericQueueType defines methods that a managed queue needs to have.
type IOManager ¶
type IOManager struct {
*GenericManager[schema.Storage, *schema.Moveable, *IOTargetQueue]
}
IOManager is a queue manager for IO operations. It is used to manage a number of different IOTargetQueue that are each independent and bucketized by their target storage name.
IOManager embeds a GenericManager. The manager is generally thread-safe and can be accessed concurrently.
Beware that IOTargetQueue contained items can only be processed sequentially, in order not to operate concurrently within the same destination target storage.
The items contained within IOTargetQueue are schema.Moveable.
func NewIOManager ¶
func NewIOManager() *IOManager
NewIOManager returns a pointer to a new IOManager.
func (*IOManager) Enqueue ¶
Enqueue adds schema.Moveable(s) into the correct IOTargetQueue, as managed by IOManager, based on their respective target storage name.
type IOTargetQueue ¶
type IOTargetQueue struct {
*GenericQueue[*schema.Moveable]
// contains filtered or unexported fields
}
IOTargetQueue is a queue where items of a common target storage name were previously enqueued and aggregated by their IOManager.
IOTargetQueue embeds a GenericQueue.
Beware that IOTargetQueue contained items can only be processed sequentially, in order not to operate concurrently within the same destination target storage.
The items contained within IOTargetQueue are schema.Moveable.
func NewIOTargetQueue ¶
func NewIOTargetQueue() *IOTargetQueue
NewIOTargetQueue returns a pointer to a new IOTargetQueue. This method is generally only called from the respective IOManager.
func (*IOTargetQueue) AddBytesTransfered ¶
func (q *IOTargetQueue) AddBytesTransfered(bytes uint64)
AddBytesTransfered adds given transferred bytes to the total amount transferred for that IOTargetQueue.
func (*IOTargetQueue) DequeueAndProcessConc ¶
func (q *IOTargetQueue) DequeueAndProcessConc(ctx context.Context, maxWorkers int, processFunc func(*schema.Moveable) int) error
DequeueAndProcessConc is unsupported by IOTargetQueue and will result in a panic when used.
func (*IOTargetQueue) Progress ¶
func (q *IOTargetQueue) Progress() Progress
Progress returns the Progress of the IOTargetQueue.
type Manager ¶
type Manager struct {
sync.Mutex
// EnumerationManager contains all enumeration tasks.
EnumerationManager *EnumerationManager
// EvaluationManager contains all [schema.Moveable] candidates.
EvaluationManager *EvaluationManager
// IOManager contains all sorted, allocated and validated [schema.Moveable].
IOManager *IOManager
}
Manager is the principal queue manager implementation containing:
- queue.EnumerationManager for enumeration of all schema.Moveable.
- queue.EvaluationManager for sorting, allocating and validating them.
- queue.IOManager for moving all schema.Moveable to their destinations.
type Progress ¶
type Progress struct {
HasStarted bool
HasFinished bool
StartTime time.Time
FinishTime time.Time
ProgressPct float64
TotalItems int
ProcessedItems int
InProgressItems int
SuccessItems int
SkippedItems int
ETA time.Time
TimeLeft time.Duration
TransferSpeed float64
TransferSpeedUnit string
}
Progress holds information about the progress of a queue (or manager). It is meant to be passed by value.
type TaskManager ¶
TaskManager is a simple task manager for delayed function execution.
func NewTaskManager ¶
func NewTaskManager() *TaskManager
NewTaskManager returns a pointer to a new TaskManager.
func (*TaskManager) Add ¶
func (t *TaskManager) Add(taskedFunc func())
Add adds a new taskedFunc to the TaskManager. Functions with parameters can be added by invoking a parameterized function that immediately returns a func(), capturing any parameters in the closure.
func (*TaskManager) Launch ¶
func (t *TaskManager) Launch(ctx context.Context) error
Launch sequentially launches the functions stored in a TaskManager. An error is only returned in case of a mid-flight context cancellation.
func (*TaskManager) LaunchConcAndWait ¶
func (t *TaskManager) LaunchConcAndWait(ctx context.Context, maxWorkers int) error
LaunchConcAndWait concurrently launches the functions stored in a TaskManager. An error is only returned in case of a mid-flight context cancellation.
It is the responsibility of the taskedFunc to ensure thread-safety for anything happening inside the taskedFunc, with the TaskManager only guaranteeing thread-safety for itself.