Documentation
¶
Index ¶
- Constants
- type Pool
- func (st *Pool) AddCallback(callbackFn PoolCallback) error
- func (st *Pool) AddTask(data interface{}) error
- func (st *Pool) AddTaskCallback(data interface{}, callbackFn PoolCallback) error
- func (st *Pool) AddWorker() error
- func (st *Pool) AddWorkers(n int) error
- func (st *Pool) GetPanicTasks() goconcurrentqueue.Queue
- func (st *Pool) GetTotalWorkers() int
- func (st *Pool) GetTotalWorkersInProgress() int
- func (st *Pool) KillAllWorkers() error
- func (st *Pool) KillAllWorkersAndWait() error
- func (st *Pool) KillWorker() error
- func (st *Pool) KillWorkers(n int) error
- func (st *Pool) LateKillAllWorkers() error
- func (st *Pool) LateKillWorker() error
- func (st *Pool) LateKillWorkers(n int) error
- func (st *Pool) PauseAllWorkers()
- func (st *Pool) ResumeAllWorkers()
- func (st *Pool) SafeWaitUntilNSuccesses(n int) error
- func (st *Pool) SetKilledWorkerChan(ch chan int)
- func (st *Pool) SetLogChan(ch chan PoolLog)
- func (st *Pool) SetNewWorkerChan(ch chan int)
- func (st *Pool) SetTotalWorkers(n int) error
- func (st *Pool) SetWorkerFunc(fn PoolFunc)
- func (st *Pool) StartWorkers() error
- func (st *Pool) StartWorkersAndWait() error
- func (st *Pool) Wait() error
- func (st *Pool) WaitUntilInitialWorkersAreUp() error
- func (st *Pool) WaitUntilNSuccesses(n int) error
- type PoolCallback
- type PoolError
- type PoolFunc
- type PoolLog
- type PoolOptions
Constants ¶
const ( ErrorEmbeddedErrors = "embedded.errors" ErrorData = "wrong.data" ErrorKillAllWorkersInProgress = "action.kill.all.workers.in.progress" ErrorWaitInProgress = "action.wait.in.progress" ErrorWaitUntilInitialWorkersAreUpInProgress = "action.wait.until.initial.workers.are.up.in.progress" ErrorDispatcherNoActionNoTask = "dispatcher.no.action.no.task" ErrorDispatcherWorkerCouldNotBeEnqueued = "dispatcher.worker.couldnt.be.enqueued" ErrorNoDefaultWorkerFunction = "no.default.worker.function" ErrorDispatcherChannelFull = "dispatcher.channel.at.full.capacity" ErrorFullCapacityChannel = "full.capacity.channel" ErrorWorkerNotFound = "worker.not.found" ErrorWorkerType = "worker.wrong.type" ErrorWorkerMaxTimeExceeded = "worker.max.time.exceeded" ErrorWorkerFuncPanic = "worker.func.panic" )
const ( ErrorWorkerNilTask = "worker.nil.task" ErrorWorkerNoTaskFunc = "worker.no.func.to.process.task" ListenInProgress = "listen.in.progress" WorkerClosed = "worker.closed" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
func NewPool ¶
NewPool is deprecated since version 0.10.0, please do not use it. Use NewPoolWithOptions instead. NewPool builds and returns a new Pool. This function will be removed on version 1.0.0 Parameters:
initialWorkers = amount of workers to start up at initialization maxOperationsInQueue = maximum amount of actions/tasks that could be enqueued at the same time logVerbose = true ==> output logs
func NewPoolWithOptions ¶
func NewPoolWithOptions(options PoolOptions) (*Pool, error)
NewPoolWithOptions builds and returns a Pool. This function will return error if any of the following scenarios occurs:
- MaxWorkers == 0
- MaxOperationsInQueue == 0
func (*Pool) AddCallback ¶
func (st *Pool) AddCallback(callbackFn PoolCallback) error
AddCallback enqueues a callback function into a FIFO queue.
Workers (if alive) will be listening to and picking up jobs from this queue. If no workers are alive nor idle, the job will stay in the queue until any worker will be ready to pick it up and start processing it.
The worker who picks up this job will only invoke the callback function, passing nil as a parameter.
The queue in which this function enqueues the jobs has a limit (it was set up at pool initialization). It means that AddCallback will wait for a free queue slot to enqueue a new job in case the queue is at full capacity.
AddCallback will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during a certain amount of time when WaitUntilNSuccesses meets the stop condition.
func (*Pool) AddTask ¶
AddTask enqueues a task (into a FIFO queue).
The parameter for the task's data accepts any type (interface{}).
Workers (if any) will be listening to and picking up tasks from this queue. If no workers are alive nor idle, the task will stay in the queue until any worker will be ready to pick it up and start processing it.
The queue in which this function enqueues the tasks has a limit (it was set up at pool initialization). It means that AddTask will wait for a free queue slot to enqueue a new task in case the queue is at full capacity.
AddTask returns an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during a certain amount of time when WaitUntilNSuccesses meets the stop condition, or in other words: when KillAllWorkers is in progress.
func (*Pool) AddTaskCallback ¶
func (st *Pool) AddTaskCallback(data interface{}, callbackFn PoolCallback) error
AddTaskCallback enqueues a job and a callback function into a FIFO queue.
The parameter for the job's data accepts any data type (interface{}).
Workers will be listening to and picking up jobs from this queue. If no workers are alive nor idle, the job will stay in the queue until any worker will be ready to pick it up and start processing it.
The worker who picks up this job and callback will process the job first and later will invoke the callback function, passing the job's data as a parameter.
The queue in which this function enqueues the jobs has a capacity limit (it was set up at pool initialization). This means that AddTaskCallback will wait for a free queue slot to enqueue a new job in case the queue is at full capacity.
AddTaskCallback will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during a certain amount of time when WaitUntilNSuccesses meets the stop condition.
func (*Pool) AddWorker ¶
AddWorker adds a new worker to the pool.
This is an asynchronous operation, but you can be notified through a channel every time a new worker is started. The channel (optional) could be set using SetNewWorkerChan(chan) or at initialization (PoolOptions.NewWorkerChan in NewPoolWithOptions).
AddWorker returns an error if at least one of the following statements is true:
- the worker could not be started
- there is a "in course" KillAllWorkers operation
func (*Pool) AddWorkers ¶
AddWorkers adds n extra workers to the pool.
This is an asynchronous operation, but you can be notified through a channel every time a new worker is started. The channel (optional) could be set using SetNewWorkerChan(chan) or at initialization (PoolOptions.NewWorkerChan in NewPoolWithOptions).
AddWorkers returns an error if at least one of the following statements are true:
- parameter n <= 0
- a worker could not be started
- there is a "in course" KillAllWorkers operation
func (*Pool) GetPanicTasks ¶
func (st *Pool) GetPanicTasks() goconcurrentqueue.Queue
GetPanicTasks returns the queue containing all tasks that failed because the worker's panicked while processing.
func (*Pool) GetTotalWorkers ¶
GetTotalWorkers returns the number of registered workers.
func (*Pool) GetTotalWorkersInProgress ¶
GetTotalWorkersInProgress returns the amount of workers currently processing data (tasks/actions)
func (*Pool) KillAllWorkers ¶
KillAllWorkers kills all live workers (the number of live workers is determined at the moment this action is processed). If a worker is processing a job, it will not be immediately killed, the pool will wait until the current job gets processed.
The following functions will return error if invoked during KillAllWorkers execution:
- KillWorker
- KillWorkers
- LateKillWorker
- LateKillWorkers
- LateKillAllWorkers
- AddWorker
- AddWorkers
- SetTotalWorkers
func (*Pool) KillAllWorkersAndWait ¶
KillAllWorkersAndWait kills all workers (the amount of workers is determined at the moment this action is processed). This function waits until current alive workers are down. This is the difference between KillAllWorkersAndWait() and KillAllWorkers() If a worker is processing a job, it will not be immediately terminated, the pool will wait until the current job gets processed.
The following functions will return error if invoked during this function execution:
- KillWorker
- KillWorkers
- LateKillWorker
- LateKillWorkers
- LateKillAllWorkers
- AddWorker
- AddWorkers
- SetTotalWorkers
func (*Pool) KillWorker ¶
KillWorker kills an idle worker. The kill signal has a higher priority than the enqueued jobs. It means that a worker will be killed once it finishes its current job although there are unprocessed jobs in the queue. Use LateKillWorker() in case you need to wait until current enqueued jobs get processed. It returns an error in case there is a "in course" KillAllWorkers operation.
func (*Pool) KillWorkers ¶
KillWorkers kills n idle workers. If n > GetTotalWorkers(), only current amount of workers will be terminated. The kill signal has a higher priority than the enqueued jobs. It means that a worker will be killed once it finishes its current job, no matter if there are unprocessed jobs in the queue. Use LateKillAllWorkers() ot LateKillWorker() in case you need to wait until current enqueued jobs get processed. It returns an error in the following scenarios:
- there is a "in course" KillAllWorkers operation -
func (*Pool) LateKillAllWorkers ¶
LateKillAllWorkers kills all live workers only after all current jobs get processed. By "current jobs" it means: the number of enqueued jobs in the exact moment this function get executed. It returns an error for the following scenarios:
- there is a "in course" KillAllWorkers operation.
- the operations' queue (where tasks/actions get enqueued) is at full capacity
func (*Pool) LateKillWorker ¶
LateKillWorker kills a worker only after current enqueued jobs get processed. It returns an error in case there is a "in course" KillAllWorkers operation.
func (*Pool) LateKillWorkers ¶
LateKillWorkers kills n workers only after all current jobs get processed. If n > GetTotalWorkers(), only current amount of workers will be terminated. It returns an error in case there is a "in course" KillAllWorkers operation.
func (*Pool) PauseAllWorkers ¶
func (st *Pool) PauseAllWorkers()
PauseAllWorkers pauses all workers. This action will wait until previous invoked actions get processed, but it will skip the line of enqueued jobs(tasks). The jobs that are being processed at the time this action is processed will not be interrupted. From the moment this action gets processed, all enqueued jobs/actions will not be processed until the workers get resumed by ResumeAllWorkers.
func (*Pool) ResumeAllWorkers ¶
func (st *Pool) ResumeAllWorkers()
ResumeAllWorkers resumes all workers. Nothing will happen if this function is invoked while no workers are paused.
func (*Pool) SafeWaitUntilNSuccesses ¶
SafeWaitUntilNSuccesses waits until n workers finished their job successfully, then kills all active workers. A job/task is considered successful if the worker that processed it returned true. It could happen that other workers were processing jobs at the time the nth job was successfully processed, so this function will wait until all those extra workers were done. In other words, SafeWaitUntilNSuccesses guarantees that when it finishes no worker would be processing data. TODO ::: this function fails if n >= total workers
func (*Pool) SetKilledWorkerChan ¶
SetKilledWorkerChan sets a channel to receive a signal after a worker is terminated
func (*Pool) SetLogChan ¶
SetLogChan sets log channel. System logs will be sent over this channel.
func (*Pool) SetNewWorkerChan ¶
SetNewWorkerChan sets a channel to receive a signal once each new worker is started
func (*Pool) SetTotalWorkers ¶
SetTotalWorkers adjusts the number of live workers.
In case it needs to kill some workers (in order to adjust the total based on the given parameter), it will wait until their current jobs get processed (in case they are processing jobs).
This is an asynchronous operation, but you can be notified through a channel every time a new worker is started. The channel (optional) can be defined at SetNewWorkerChan(chan).
SetTotalWorkers returns an error in the following scenarios:
- There is a "in course" KillAllWorkers operation.
func (*Pool) SetWorkerFunc ¶
SetWorkerFunc sets the worker's default function handler. This function will be invoked each time a worker pulls a new job, and should return true to let know that the job was successfully completed, or false in other case.
func (*Pool) StartWorkers ¶
StartWorkers is deprecated since version 0.10.0, please do not use it. Why is StartWorkers deprecated? There is no need to explicitly starts up the workers as they automatically start up once created.
For backward compatibility, this function only returns nil. StartWorkers will be removed on version 1.0.0
Prior to version 0.10.0, the goal for this function was to start up the amount of workers pre-defined at pool instantiation.
func (*Pool) StartWorkersAndWait ¶
StartWorkersAndWait is deprecated since version 0.10.0, please do not use it. Use WaitUntilInitialWorkersAreUp instead. Why is StartWorkersAndWait deprecated? Workers are started up once they get created, there is no need to do it explicitly.
For backward compatibility, this function will return WaitUntilInitialWorkersAreUp. StartWorkersAndWait will be removed on version 1.0.0
Prior to version 0.10.0, the goal of this function was to start up all workers pre-defined at pool instantiation and wait until until all them were up and running.
func (*Pool) WaitUntilInitialWorkersAreUp ¶
WaitUntilInitialWorkersAreUp waits until all initial workers are up and running. The amount of initial workers were pre-defined at pool instantiation.
func (*Pool) WaitUntilNSuccesses ¶
WaitUntilNSuccesses waits until n workers finished their job successfully, then kills all active workers. A worker is considered successfully if the associated worker function returned true. TODO ::: An error will be returned if the worker's function is not already set.
type PoolCallback ¶
type PoolCallback func(interface{})
PoolCallback defines the callback function signature
type PoolError ¶
type PoolError struct {
// contains filtered or unexported fields
}
func (*PoolError) AddEmbeddedErrors ¶
AddEmbeddedErrors adds a slice of errors
func (*PoolError) GetEmbeddedErrors ¶
type PoolFunc ¶
type PoolFunc func(interface{}) bool
PoolFunc defines the function signature to be implemented by the workers
type PoolOptions ¶
type PoolOptions struct { // amount of initial workers TotalInitialWorkers uint // maximum amount of workers MaxWorkers uint // maximum actions/tasks to be enqueued at the same time MaxOperationsInQueue uint // channel to send notifications once new workers are created NewWorkerChan chan int // wait (NewPoolWithOptions function) until all initial workers are up and running WaitUntilInitialWorkersAreUp bool // log's verbose LogVerbose bool }
PoolOptions contains configuration data to build a Pool using NewPoolWithOptions