worker

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2024 License: MIT Imports: 3 Imported by: 0

README

worker

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ContextIsNilError is an error indicating that a nil context was provided.
	// This error is used to signal that an operation requiring a non-nil context
	// encountered an invalid input, which could disrupt the operation's control flow.
	ContextIsNilError = errors.New("context is nil")

	// ChanIsCloseError is an error indicating that a channel has been closed.
	// This error is typically used when an operation attempts to send or receive
	// on a closed channel, which is an illegal operation in Go.
	ChanIsCloseError = errors.New("chan is close")

	// ChanIsEmptyError is an error indicating that a channel is empty.
	// This error might be used when an operation expected a channel to have available data,
	// but found it empty, preventing the operation from proceeding as intended.
	ChanIsEmptyError = errors.New("chan is empty")

	// MaxWorkersReachedError is an error indicating that the maximum number of workers has been reached.
	// This error is used to prevent the creation of additional workers beyond the allowed limit,
	// ensuring that the system's resources are managed efficiently and within expected constraints.
	MaxWorkersReachedError = errors.New("maximum number of workers reached")

	// WorkerIsNilError is an error that indicates an operation was attempted with a nil worker.
	// This error is used to signal that a worker instance expected to be valid was found to be nil,
	// and therefore the operation cannot proceed.
	WorkerIsNilError = errors.New("worker is nil")

	// WorkerPoolStopError is an error that indicates an operation was attempted on a stopped worker pool.
	// This error is used to signal that the worker pool has been stopped, meaning no further work can be assigned
	// or processed, and the operation should be terminated.
	WorkerPoolStopError = errors.New("worker pool is stopped")
)

Functions

func Exclude

func Exclude[T comparable](elements []T, element T) []T

Exclude removes all instances of a specified value from the provided slice. It creates a new slice containing only the elements that are not equal to the specified value. This approach efficiently constructs the result slice by reusing the original slice's underlying array, avoiding unnecessary memory allocations.

func GetRecoverError

func GetRecoverError(rec any) error

GetRecoverError extracts an error from a recoverable panic. It checks if the recovered value is an error type, and if so, returns it. If the recovered value is not an error type, it returns nil.

Types

type Error

type Error struct {
	Error    error  // The actual error that occurred.
	Instance Worker // Reference to the Worker instance where the error occurred.
}

Error encapsulates an error along with the reference to the Worker instance where the error occurred. It is used to convey error information from a Worker to external components through the error channel.

type Options

type Options struct {
	// Context is the context used to control the lifecycle of the tasks and workers.
	// It allows for task cancellation, timeouts, and passing values between functions.
	Context context.Context

	// Queue is the channel where tasks are submitted to be processed by the workers.
	// The size of the channel buffer determines how many tasks can be queued before
	// workers are required to start processing them.
	//
	// Note: the channel must be closed after the worker pool is stopped.
	Queue chan Task
	// WorkerCount specifies the number of workers that will be spawned in the pool.
	// This determines how many tasks can be processed concurrently.
	WorkerCount int32
	// MaxRetryWorkerRestart defines the maximum number of times a worker can be restarted
	// in case of failure before it is considered as a critical issue and further restarts
	// are stopped. This helps in preventing endless restarts in case of persistent errors.
	MaxRetryWorkerRestart int32
}

Options defines the configuration settings for a worker pool. It contains the context for task processing, the task queue, and settings related to the number of workers and retry behavior.

type Pool

type Pool interface {
	// Run starts the pool and initializes the worker management loop.
	// This function is responsible for handling task processing, managing worker errors, and ensuring
	// the lifecycle of workers, including graceful shutdowns when required.
	Run()

	// AddTaskInQueue attempts to add a new task to the pool's task queue for processing by the workers.
	// If the task cannot be added due to the pool being closed or other reasons, an error is returned.
	// This method ensures that the task is handled in a non-blocking way and leverages context-based
	// cancellation or timeout handling if needed.
	AddTaskInQueue(task Task) error

	// AddWorker registers a new worker to the pool. It initializes the worker with the pool's context,
	// task queue, and error handling mechanisms. The worker begins processing tasks after being added.
	// If the pool has been stopped or the worker cannot be added, an error is returned.
	AddWorker(wr Worker) error

	// RunningWorkers returns the number of currently active workers in the pool.
	// This value is managed atomically to ensure accurate results, even in a concurrent environment.
	RunningWorkers() int32

	// Stop gracefully shuts down the pool, signaling all workers to stop processing tasks.
	// It ensures all workers finish their ongoing tasks before the pool is fully shut down.
	Stop()
}

Pool defines the core interface for managing a pool of workers that execute tasks concurrently. It provides methods to run the pool, add tasks, manage workers, and control the pool's lifecycle.

type Processing

type Processing interface {
	// Processing is responsible for executing the basic logic for processing the task.
	// It receives the task to be processed and the context.
	// The result of the execution can be obtained via the Result channel
	Processing(ctx context.Context, input interface{})
}

Processing defines an interface for processing tasks. It includes methods for performing the main processing logic and handling errors.

type Status

type Status int

Status represents the current state of a worker within a worker pool system. It is used to indicate and manage the different phases of a worker's lifecycle.

const (
	// StatusWorkerIdle represents a state where the worker is waiting for a job to process.
	// In this state, the worker is ready and available to take on new tasks.
	StatusWorkerIdle Status = iota
	// StatusWorkerRunning represents a state where the worker is actively running a job.
	// This state indicates that the worker is engaged in processing a task and is not available for new jobs.
	StatusWorkerRunning
	// StatusWorkerStopped represents a state where the worker has stopped processing jobs.
	// This state indicates that the worker has been shut down or is no longer operational.
	StatusWorkerStopped
)

The possible values for WorkerStatus. These constants are used to manage and monitor the state of workers, allowing for efficient coordination and handling of workers within the pool.

type Task

type Task interface {
	// SetWaitGroup assigns a sync.WaitGroup to the task.
	// This allows the task to signal completion when it has finished its execution.
	// The wait group is used to synchronize with other goroutines, ensuring that all tasks
	// complete before proceeding.
	SetWaitGroup(wg *sync.WaitGroup) error

	// SetDoneChannel sets the channel that will be used to signal when the task is done.
	// This channel is expected to be closed once the task completes its execution. It is
	// used to notify other parts of the system that the task has finished.
	SetDoneChannel(done chan struct{}) error

	// SetContext assigns a context to the task.
	// The context can be used to manage task execution, including handling cancellation and
	// timeouts. It allows the task to respond to external signals for stopping or modifying
	// its behavior.
	SetContext(ctx context.Context) error

	// GetError retrieves the error encountered during task execution, if any.
	// This method allows checking for errors that occurred during the task's run and helps
	// in debugging and error handling.
	GetError() error

	// String returns a string representation of the task.
	// This is useful for logging and debugging purposes, providing a textual description
	// of the task to aid in understanding its state and behavior during execution.
	String() string

	// Run starts the execution of the task.
	// This method should contain the logic for performing the task's work. It is typically
	// called in a separate goroutine to allow asynchronous execution of the task.
	Run()

	// Stop signals the task to stop executing.
	// This method is used to gracefully terminate the task before it completes. It should
	// handle cleanup and termination logic to ensure the task is stopped in a controlled manner.
	Stop()
}

Task defines the contract for a task that can be executed, monitored, and managed. It provides methods for setting context, wait groups, and completion channels, as well as methods for handling errors and stopping execution. Implementations of this interface are expected to provide specific behavior for processing tasks, handling interruptions, and managing execution state.

type Worker

type Worker interface {
	// String returns a string representation of the Worker.
	// Typically, this is the worker's name, which is useful for logging and debugging purposes.
	String() string

	// SetContext assigns a context to the worker.
	// The context is used to control the worker's execution and can be used to
	// cancel operations or signal timeouts. Returns an error if the context is nil
	// or if there's an issue setting the context.
	//
	// NOTE: Context is passed from the worker pool (parent) to manage the state of the worker
	SetContext(ctx context.Context) error

	// SetQueue assigns a task queue to the worker.
	// The worker will listen to this queue for incoming tasks to process.
	// Returns an error if the queue is nil or invalid.
	//
	// NOTE: The channel must be closed after the worker pool is stopped.
	// Otherwise all the workers will simply be stopped and stop working.
	SetQueue(queue chan Task) error

	// SetWorkerErrChannel assigns an error channel to the worker.
	// This channel is used to report serious errors or panics that occur
	// during task processing. The pool listens to this channel to detect
	// when a worker encounters a severe issue and needs to be restarted.
	// Returns an error if the channel is closed at the time of assignment.
	SetWorkerErrChannel(errCh chan *Error) error

	// Restart attempts to restart the worker by incrementing the retry count
	// and then invoking the Start method to resume the worker's operation.
	// This method is used to recover a worker that may have encountered an issue,
	// tracking the number of recovery attempts.
	Restart(wg *sync.WaitGroup)

	// Start begins the worker's operation, processing tasks from the assigned queue.
	// It requires a sync.WaitGroup to manage the concurrent execution of workers.
	// The WaitGroup is used to ensure that all workers complete their tasks before
	// the program exits or the pool is shut down.
	Start(wg *sync.WaitGroup)

	// Stop initiates the worker's shutdown process.
	// This method returns a channel that is closed when the worker has completely
	// stopped. It allows for graceful shutdowns by signaling when it is safe to
	// release resources or proceed with other operations.
	Stop() <-chan struct{}

	// GetStatus retrieves the current status of the worker.
	// The status provides information about the worker's state, such as whether
	// it is running, idle, or stopped. This is useful for monitoring and managing
	// the worker's activity.
	GetStatus() Status

	// GetError returns a channel that the worker uses to report errors.
	// The channel carries errors encountered during task processing. This allows
	// the system to log, handle, or react to errors in a centralized manner.
	GetError() chan *Error

	// GetRetry returns the current retry count for the worker.
	// The retry count indicates the number of attempts made to restart the worker
	// in an effort to restore its operation after encountering an issue.
	GetRetry() int32
}

Worker defines the interface for a worker in a worker pool system. A Worker is responsible for processing tasks from a queue, managing its lifecycle (start, stop), and providing status and error information.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL