queue

package
v0.0.0-...-24f08a5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2025 License: GPL-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package queue implements structures and routines for organizing tasks and items into managed queues. It also provides logic for obtaining progress information and metrics on these managed queues.

Index

Constants

View Source
const (
	// DecisionSuccess is returned by a processFunc when an item was processed.
	DecisionSuccess = 1

	// DecisionSkipped is returned by a processFunc when an item was skipped.
	DecisionSkipped = 0

	// DecisionRequeue is returned by a processFunc when an item needs
	// requeueing.
	DecisionRequeue = -1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type EnumerationManager

type EnumerationManager struct {
	*GenericManager[schema.Storage, *EnumerationTask, *EnumerationSourceQueue]
}

EnumerationManager is a queue manager for enumeration operations. It is used to manage a number of different EnumerationSourceQueue that are each independent and bucketized by their source storage name.

EnumerationManager embeds a GenericManager. It is thread-safe and can both be accessed and processed concurrently.

The items contained within EnumerationSourceQueue are EnumerationTask.

func NewEnumerationManager

func NewEnumerationManager() *EnumerationManager

NewEnumerationManager returns a pointer to a new EnumerationManager.

func (*EnumerationManager) Enqueue

func (m *EnumerationManager) Enqueue(items ...*EnumerationTask)

Enqueue adds EnumerationTask(s) into the correct EnumerationSourceQueue, as managed by EnumerationManager, based on their respective source storage name.

func (*EnumerationManager) Progress

func (m *EnumerationManager) Progress() Progress

Progress returns the Progress of the EnumerationManager.

type EnumerationSourceQueue

type EnumerationSourceQueue struct {
	*GenericQueue[*EnumerationTask]
}

EnumerationSourceQueue is a queue where items of a common source storage name were previously enqueued and aggregated by their EnumerationManager.

EnumerationSourceQueue embeds a GenericQueue. It is thread-safe and can both be accessed and processed concurrently.

The items contained within EnumerationSourceQueue are EnumerationTask.

func NewEnumerationSourceQueue

func NewEnumerationSourceQueue() *EnumerationSourceQueue

NewEnumerationSourceQueue returns a pointer to a new EnumerationSourceQueue. This method is generally only called from the respective EnumerationManager.

func (*EnumerationSourceQueue) Progress

func (q *EnumerationSourceQueue) Progress() Progress

Progress returns the Progress of the EnumerationSourceQueue.

type EnumerationTask

type EnumerationTask struct {
	Share    schema.Share
	Source   schema.Storage
	Function func() int
}

EnumerationTask is an enumeration task for later execution.

func (*EnumerationTask) Run

func (e *EnumerationTask) Run() int

Run executes the stored enumeration function of an EnumerationTask.

type EvaluationManager

type EvaluationManager struct {
	*GenericManager[schema.Share, *schema.Moveable, *EvaluationShareQueue]
}

EvaluationManager is a queue manager for evaluation operations. It is used to manage a number of different EvaluationShareQueue that are each independent and bucketized by using their share name.

EvaluationManager embeds a GenericManager. It is thread-safe and can both be accessed and processed concurrently.

The items contained within EvaluationShareQueue are schema.Moveable.

func NewEvaluationManager

func NewEvaluationManager() *EvaluationManager

NewEvaluationManager returns a pointer to a new EvaluationManager.

func (*EvaluationManager) Enqueue

func (m *EvaluationManager) Enqueue(items ...*schema.Moveable)

Enqueue adds schema.Moveable(s) into the correct EvaluationShareQueue, as managed by EvaluationManager, based on their respective shares names.

func (*EvaluationManager) Progress

func (m *EvaluationManager) Progress() Progress

Progress returns the Progress of the EvaluationManager.

type EvaluationShareQueue

type EvaluationShareQueue struct {
	*GenericQueue[*schema.Moveable]
}

EvaluationShareQueue is a queue where items of a common share name were previously enqueued and aggregated by their EvaluationManager.

EvaluationShareQueue embeds a GenericQueue. It is thread-safe and can both be accessed and processed concurrently.

The items contained within EvaluationShareQueue are schema.Moveable.

func NewEvaluationShareQueue

func NewEvaluationShareQueue() *EvaluationShareQueue

NewEvaluationShareQueue returns a pointer to a new EvaluationShareQueue. This method is generally only called from the respective EvaluationManager.

func (*EvaluationShareQueue) Progress

func (q *EvaluationShareQueue) Progress() Progress

Progress returns the Progress of the EvaluationShareQueue.

type GenericManager

type GenericManager[K comparable, V comparable, Q GenericQueueType[V]] struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

GenericManager is a generic queue manager for queues of GenericQueueType.

func NewGenericManager

func NewGenericManager[K comparable, V comparable, Q GenericQueueType[V]]() *GenericManager[K, V, Q]

NewGenericManager returns a pointer to a new GenericManager.

func (*GenericManager[K, V, Q]) Enqueue

func (m *GenericManager[K, V, Q]) Enqueue(item V, getKeyFunc func(V) K, newQueueFunc func() Q)

Enqueue bucketizes items into queues according to a getKeyFunc, creating new queues as required using a newQueueFunc.

func (*GenericManager[K, V, Q]) GetQueues

func (m *GenericManager[K, V, Q]) GetQueues() map[K]Q

GetQueues returns a copy of the internal map holding pointers to all managed queues.

func (*GenericManager[K, V, Q]) GetSuccessful

func (m *GenericManager[K, V, Q]) GetSuccessful() []V

GetSuccessful returns a slice of all queues successfully processed items.

func (*GenericManager[K, V, Q]) Progress

func (m *GenericManager[K, V, Q]) Progress() Progress

Progress returns the Progress for the GenericManager.

type GenericQueue

type GenericQueue[V comparable] struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

GenericQueue is a generic queue that can hold any comparable type of items.

func NewGenericQueue

func NewGenericQueue[V comparable]() *GenericQueue[V]

NewGenericQueue returns a pointer to a new GenericQueue.

func (*GenericQueue[V]) Dequeue

func (q *GenericQueue[V]) Dequeue() (V, bool)

Dequeue returns an item from the queue and advances the queue head.

func (*GenericQueue[V]) DequeueAndProcess

func (q *GenericQueue[V]) DequeueAndProcess(ctx context.Context, processFunc func(V) int) error

DequeueAndProcess sequentially dequeues and processes items using the given processFunc. An error is only returned in case of a context cancellation, the processFunc is otherwise expected to return only an integer with the processing function's decision for that item.

Possible decisions to be returned: DecisionSuccess, DecisionSkipped, DecisionRequeue.

func (*GenericQueue[V]) DequeueAndProcessConc

func (q *GenericQueue[V]) DequeueAndProcessConc(ctx context.Context, maxWorkers int, processFunc func(V) int) error

DequeueAndProcessConc concurrently dequeues and processes items using given processFunc. An error is only returned in case of a context cancellation, the processFunc is otherwise expected to return only an integer with the processing function's decision for that item.

Possible decisions to be returned: DecisionSuccess, DecisionSkipped, DecisionRequeue.

It is the responsibility of the processFunc to ensure thread-safety for anything happening inside the processFunc, with the GenericQueue only guaranteeing thread-safety for itself.

func (*GenericQueue[V]) Enqueue

func (q *GenericQueue[V]) Enqueue(items ...V)

Enqueue adds items to the queue.

func (*GenericQueue[V]) GetSuccessful

func (q *GenericQueue[V]) GetSuccessful() []V

GetSuccessful returns a copy of the internal slice holding all successful items.

func (*GenericQueue[V]) HasRemainingItems

func (q *GenericQueue[V]) HasRemainingItems() bool

HasRemainingItems returns whether a queue has remaining items to process.

func (*GenericQueue[V]) PostProcess

func (q *GenericQueue[V]) PostProcess(p schema.Pipeline[V]) bool

PostProcess runs a schema.Pipeline's contained post-processors on all successfully processed queue items.

func (*GenericQueue[V]) PreProcess

func (q *GenericQueue[V]) PreProcess(p schema.Pipeline[V]) bool

PreProcess runs a schema.Pipeline's contained pre-processors on all yet unprocessed queue items.

If the queue is being operated on concurrently, sorting functions should not be used as pre-processors. Instead, such functions should be post-processors instead to guarantee that the order is preserved at the end of the queue.

func (*GenericQueue[V]) Progress

func (q *GenericQueue[V]) Progress() Progress

Progress returns the Progress for the GenericQueue.

func (*GenericQueue[V]) SetProcessing

func (q *GenericQueue[V]) SetProcessing(items ...V)

SetProcessing sets given items as in progress (processing).

func (*GenericQueue[V]) SetSkipped

func (q *GenericQueue[V]) SetSkipped(items ...V)

SetSkipped sets given in-progress queue items as skipped. The items are removed from the in-progress map in the process.

func (*GenericQueue[V]) SetSuccess

func (q *GenericQueue[V]) SetSuccess(items ...V)

SetSuccess sets given in-progress queue items as successfully processed. The items are removed from the in-progress map in the process.

type GenericQueueType

type GenericQueueType[V comparable] interface {
	Enqueue(items ...V)
	GetSuccessful() []V
	Progress() Progress
}

GenericQueueType defines methods that a managed queue needs to have.

type IOManager

type IOManager struct {
	*GenericManager[schema.Storage, *schema.Moveable, *IOTargetQueue]
}

IOManager is a queue manager for IO operations. It is used to manage a number of different IOTargetQueue that are each independent and bucketized by their target storage name.

IOManager embeds a GenericManager. The manager is generally thread-safe and can be accessed concurrently.

Beware that IOTargetQueue contained items can only be processed sequentially, in order not to operate concurrently within the same destination target storage.

The items contained within IOTargetQueue are schema.Moveable.

func NewIOManager

func NewIOManager() *IOManager

NewIOManager returns a pointer to a new IOManager.

func (*IOManager) Enqueue

func (m *IOManager) Enqueue(items ...*schema.Moveable)

Enqueue adds schema.Moveable(s) into the correct IOTargetQueue, as managed by IOManager, based on their respective target storage name.

func (*IOManager) Progress

func (m *IOManager) Progress() Progress

Progress returns the Progress of the IOManager.

type IOTargetQueue

type IOTargetQueue struct {
	*GenericQueue[*schema.Moveable]
	// contains filtered or unexported fields
}

IOTargetQueue is a queue where items of a common target storage name were previously enqueued and aggregated by their IOManager.

IOTargetQueue embeds a GenericQueue.

Beware that IOTargetQueue contained items can only be processed sequentially, in order not to operate concurrently within the same destination target storage.

The items contained within IOTargetQueue are schema.Moveable.

func NewIOTargetQueue

func NewIOTargetQueue() *IOTargetQueue

NewIOTargetQueue returns a pointer to a new IOTargetQueue. This method is generally only called from the respective IOManager.

func (*IOTargetQueue) AddBytesTransfered

func (q *IOTargetQueue) AddBytesTransfered(bytes uint64)

AddBytesTransfered adds given transferred bytes to the total amount transferred for that IOTargetQueue.

func (*IOTargetQueue) DequeueAndProcessConc

func (q *IOTargetQueue) DequeueAndProcessConc(ctx context.Context, maxWorkers int, processFunc func(*schema.Moveable) int) error

DequeueAndProcessConc is unsupported by IOTargetQueue and will result in a panic when used.

func (*IOTargetQueue) Progress

func (q *IOTargetQueue) Progress() Progress

Progress returns the Progress of the IOTargetQueue.

type Manager

type Manager struct {
	sync.Mutex

	// EnumerationManager contains all enumeration tasks.
	EnumerationManager *EnumerationManager

	// EvaluationManager contains all [schema.Moveable] candidates.
	EvaluationManager *EvaluationManager

	// IOManager contains all sorted, allocated and validated [schema.Moveable].
	IOManager *IOManager
}

Manager is the principal queue manager implementation containing:

func NewManager

func NewManager() *Manager

NewManager returns a pointer to a new queue Manager.

type Progress

type Progress struct {
	HasStarted        bool
	HasFinished       bool
	StartTime         time.Time
	FinishTime        time.Time
	ProgressPct       float64
	TotalItems        int
	ProcessedItems    int
	InProgressItems   int
	SuccessItems      int
	SkippedItems      int
	ETA               time.Time
	TimeLeft          time.Duration
	TransferSpeed     float64
	TransferSpeedUnit string
}

Progress holds information about the progress of a queue (or manager). It is meant to be passed by value.

type TaskManager

type TaskManager struct {
	sync.Mutex

	Tasks []func()
}

TaskManager is a simple task manager for delayed function execution.

func NewTaskManager

func NewTaskManager() *TaskManager

NewTaskManager returns a pointer to a new TaskManager.

func (*TaskManager) Add

func (t *TaskManager) Add(taskedFunc func())

Add adds a new taskedFunc to the TaskManager. Functions with parameters can be added by invoking a parameterized function that immediately returns a func(), capturing any parameters in the closure.

func (*TaskManager) Launch

func (t *TaskManager) Launch(ctx context.Context) error

Launch sequentially launches the functions stored in a TaskManager. An error is only returned in case of a mid-flight context cancellation.

func (*TaskManager) LaunchConcAndWait

func (t *TaskManager) LaunchConcAndWait(ctx context.Context, maxWorkers int) error

LaunchConcAndWait concurrently launches the functions stored in a TaskManager. An error is only returned in case of a mid-flight context cancellation.

It is the responsibility of the taskedFunc to ensure thread-safety for anything happening inside the taskedFunc, with the TaskManager only guaranteeing thread-safety for itself.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL