Documentation
¶
Index ¶
- type MockPanicTask
- func (t *MockPanicTask) GetError() error
- func (t *MockPanicTask) Run()
- func (t *MockPanicTask) SetContext(_ context.Context) error
- func (t *MockPanicTask) SetDoneChannel(_ chan struct{}) error
- func (t *MockPanicTask) SetWaitGroup(wg *sync.WaitGroup) error
- func (t *MockPanicTask) Stop()
- func (t *MockPanicTask) String() string
- type MockProcessingLongTask
- type MockProcessingLongTaskResult
- type MockProcessingWithPanic
- type Pool
- type Task
- type Worker
- func (w *Worker) GetError() chan *worker.Error
- func (w *Worker) GetRetry() int32
- func (w *Worker) GetStatus() worker.Status
- func (w *Worker) Restart(wg *sync.WaitGroup)
- func (w *Worker) SetContext(ctx context.Context) error
- func (w *Worker) SetQueue(queue chan worker.Task) error
- func (w *Worker) SetWorkerErrChannel(errCh chan *worker.Error) error
- func (w *Worker) Start(wg *sync.WaitGroup)
- func (w *Worker) Stop() <-chan struct{}
- func (w *Worker) String() string
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MockPanicTask ¶
type MockPanicTask struct{}
MockPanicTask is a mock implementation of the Task interface designed to simulate a task that panics during execution. This can be used in testing scenarios where you need to verify the behavior of a worker or system when a task causes a panic.
func (*MockPanicTask) GetError ¶
func (t *MockPanicTask) GetError() error
GetError is a mock implementation that returns nil. This simulates a task that does not encounter any error.
func (*MockPanicTask) Run ¶
func (t *MockPanicTask) Run()
Run simulates the execution of the task and intentionally causes a panic. This is used to test how the system handles a task that panics.
func (*MockPanicTask) SetContext ¶
func (t *MockPanicTask) SetContext(_ context.Context) error
SetContext is a mock implementation that does nothing with the provided context. It simply returns nil, indicating success in setting the context.
func (*MockPanicTask) SetDoneChannel ¶
func (t *MockPanicTask) SetDoneChannel(_ chan struct{}) error
SetDoneChannel is a mock implementation that does nothing with the done channel. It simply returns nil, indicating success in setting the channel.
func (*MockPanicTask) SetWaitGroup ¶
func (t *MockPanicTask) SetWaitGroup(wg *sync.WaitGroup) error
SetWaitGroup simulates setting a wait group for the task. It immediately marks the wait group as done, as if the task has completed.
func (*MockPanicTask) Stop ¶
func (t *MockPanicTask) Stop()
Stop is a mock implementation that does nothing. It simulates stopping the task, though no operation is performed here.
func (*MockPanicTask) String ¶
func (t *MockPanicTask) String() string
String returns an empty string as a mock representation of the task.
type MockProcessingLongTask ¶
type MockProcessingLongTask struct {
// contains filtered or unexported fields
}
MockProcessingLongTask is a mock implementation of the Processing interface designed to simulate long-running tasks. It is primarily used for testing purposes to validate how a task behaves when it takes a considerable amount of time to complete.
func (*MockProcessingLongTask) Processing ¶
func (m *MockProcessingLongTask) Processing(ctx context.Context, input interface{})
Processing simulates the execution of a long-running task. It takes a context and an input parameter (both of which are ignored in this mock implementation) and sleeps for the specified timeout duration. This method is used to mimic the behavior of a task that consumes time and to test how the task handling mechanism responds to such delays.
type MockProcessingLongTaskResult ¶
type MockProcessingLongTaskResult struct { // ContextIsDone indicates whether the context was properly canceled during the task processing. // This is set to true if the context's Done channel was closed, signaling the task to stop. ContextIsDone bool // ContextIsNotDone indicates that the context was not canceled during the task processing. // This is set to true if the context's Done channel remained open, meaning the task continued without interruption. ContextIsNotDone bool }
MockProcessingLongTaskResult represents the result of processing a long-running task in a mock environment. This structure is used in tests to verify the state of the task's context after it has been processed. It contains flags indicating whether the context was canceled (done) or not during task execution.
type MockProcessingWithPanic ¶
type MockProcessingWithPanic struct{}
MockProcessingWithPanic is a mock implementation of the Processing interface. It is used in tests to simulate scenarios where the Processing method deliberately causes a panic to test error handling and recovery mechanisms.
func (*MockProcessingWithPanic) Processing ¶
func (m *MockProcessingWithPanic) Processing(_ context.Context, _ interface{})
Processing simulates a processing operation and deliberately causes a panic. This method is used to test the behavior of the system when a panic occurs during processing. It returns false, but the primary purpose is to trigger a panic with a predefined error message to test panic recovery mechanisms.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool represents a worker pool that manages a collection of worker instances. It handles task processing, worker lifecycle management, and provides mechanisms for graceful start and stop operations.
func NewWorkerPool ¶
NewWorkerPool creates a new instance of a worker pool with the specified options. It initializes the pool, sets up the context, and prepares the pool for managing workers and tasks.
func (*Pool) AddTaskInQueue ¶
AddTaskInQueue attempts to add a task to the pool's task queue for processing. It performs several safety checks, including recovering from potential panics and ensuring the queue is valid before adding the task. If the queue is not initialized or has been closed, appropriate errors are returned.
func (*Pool) AddWorker ¶
AddWorker adds a new worker to the worker pool and starts its execution. It ensures that the worker is correctly initialized with the pool's context and task queue, and handles any errors or panics that occur during the process.
func (*Pool) Run ¶
func (p *Pool) Run()
Run starts the worker pool and worker goroutines. It creates and launches a specified number of worker goroutines, each of which is responsible for processing jobs from a shared collector. This method also continuously listens for stop signals or context cancellation and reacts accordingly, ensuring a clean and controlled shutdown of the worker pool. For safety once is used, this is done in case someone will use worker pool in more than one place, and it will happen that Start will be started again, nothing critical will happen and we will not lose the past handlers.
func (*Pool) RunningWorkers ¶
RunningWorkers returns the current number of running workers in the pool. It retrieves the count of active workers using atomic operations to ensure thread safety.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task is a struct that encapsulates the execution logic of a concurrent task. It manages task execution, including timeout, error handling, and coordination with external components through contexts, channels, and WaitGroups.
func NewTask ¶
func NewTask(timeout time.Duration, taskName string, processing worker.Processing, processingInput interface{}) *Task
NewTask initializes a new Task instance with the provided parameters. The function takes the maximum timeout, task name, processing module, and inputs for processing and error handling.
func (*Task) GetError ¶
GetError returns a channel that can be used to receive error messages. In case of panic, the worker will be able to log the error in his or her own account, which can help when looking for problems.
func (*Task) Run ¶
func (t *Task) Run()
Run orchestrates the execution of a task with proper lifecycle management using contexts. It handles both short and long-running tasks, manages execution time through context timeouts, and ensures proper cleanup and error handling.
func (*Task) SetContext ¶
SetContext sets the parent context for the task. This allows updating the parent context during the execution of the task.
func (*Task) SetDoneChannel ¶
SetDoneChannel sets the provided channel as the done channel for the task. It first checks if the provided channel is nil or already closed to prevent setting an invalid channel, which could lead to runtime errors.
Note: the channel must be closed after you receive a signal that the process is complete.
func (*Task) SetWaitGroup ¶
SetWaitGroup assigns the provided WaitGroup to the task's wg field. This allows the task to signal when it is done by using the WaitGroup.
func (*Task) Stop ¶
func (t *Task) Stop()
Stop gracefully stops the job by signaling through the stop channel and closing it. This method ensures that the job's stop channel is only closed once, even if Stop is called multiple times. It uses sync.Once to guarantee that the channel is closed exactly once, preventing multiple closure attempts which could lead to a panic or undefined behavior.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker represents a worker in a task processing pool. It holds all necessary information and channels to process tasks, manage its state, and handle errors.
func NewWorker ¶
NewWorker initializes a new Worker instance with the provided workerName. It sets up necessary channels and a logger for the worker, and returns a pointer to the Worker.
func (*Worker) GetError ¶
GetError returns the channel through which worker errors are communicated. This allows external components to listen for and handle errors generated by the worker. The channel is used to send instances of Error, containing information about the error and the worker instance. Note:In some cases I was able to get panic when running task. This is a very critical situation, I could not control all the workers so that in case of a panic I would not lose all the workers. Now workers in case of panic can notify the worker-pool that controls them and that worker-pool will restore a particular worker in case the pool is not stopped. This should help avoid problems, especially since we might lose all workers.
func (*Worker) GetRetry ¶
GetRetry returns the current retry count for the worker. The retry count indicates the number of attempts made to restart the worker in an effort to restore its operation if it encountered an issue.
func (*Worker) GetStatus ¶
GetStatus is a method that retrieves the current status of a Worker instance. It ensures that the status is accessed in a thread-safe manner by locking the worker's mutex before reading the status. This prevents race conditions and ensures that the value returned is consistent, even in a concurrent environment.
func (*Worker) Restart ¶
Restart attempts to restart the worker by incrementing the retry count and then invoking the Start method to resume the worker's operation. The retry count is incremented to track the number of recovery attempts.
func (*Worker) SetContext ¶
SetContext sets the context for the worker. This method is used to provide a new context for the worker, which can be used to control its operations and manage its lifecycle. The method ensures that the provided context is not nil before setting it, maintaining the integrity of the worker's context.
func (*Worker) SetQueue ¶
SetQueue sets the task queue channel for the worker. This method allows the worker to be assigned a new task queue channel, which it will use to receive tasks. The method ensures that the provided channel is open before setting it, returning an error if the channel is closed.
func (*Worker) SetWorkerErrChannel ¶
SetWorkerErrChannel assigns the provided error channel to the worker for reporting serious panic errors that occur during its operation. This channel is used to notify the worker pool of such errors, so the pool can take appropriate action, such as restarting the worker.
func (*Worker) Start ¶
Start begins the worker's execution cycle. It initializes the worker's status, manages tasks from the job queue, and handles errors and context cancellations. The method uses a WaitGroup to signal when the worker has finished its work and includes mechanisms for recovery from panics to ensure that the worker continues operating smoothly even if unexpected errors occur.
func (*Worker) Stop ¶
func (w *Worker) Stop() <-chan struct{}
Stop signals the worker to stop processing tasks and returns a channel to indicate completion. It closes the stop channel, causing the worker to exit its processing loop and finish the current job. If there is a task in processing at the time of worker termination, it will be stopped.