goworkerpool

package module
v0.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2022 License: MIT Imports: 6 Imported by: 21

README

go.dev reference godoc reference version Go Report Card Build Status codecov

goworkerpool - Pool of workers

Pool of concurrent workers with the ability of increment / decrement / pause / resume workers on demand.

Features

Prerequisites

Golang version >= 1.9

Installation

Execute:

go get github.com/enriquebris/goworkerpool

Documentation

Visit goworkerpool at godoc.org

TODO

Visit the TODO page.

Get started

Simple usage
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/enriquebris/goworkerpool"
)

func main() {
	var (
		maxOperationsInQueue uint = 50
	)
	pool, err := goworkerpool.NewPoolWithOptions(goworkerpool.PoolOptions{
		TotalInitialWorkers:          10,
		MaxWorkers:                   20,
		MaxOperationsInQueue:         maxOperationsInQueue,
		WaitUntilInitialWorkersAreUp: true,
		LogVerbose:                   true,
	})

	if err != nil {
		fmt.Println(err)
		return
	}

	// add the worker handler function
	pool.SetWorkerFunc(func(data interface{}) bool {
		log.Printf("processing %v\n", data)
		// add a 1 second delay (to makes it look as it were processing the job)
		time.Sleep(time.Second)
		log.Printf("processing finished for: %v\n", data)

		// let the pool knows that the worker was able to complete the task
		return true
	})

	// enqueue jobs in a separate goroutine
	go func() {
		for i := 0; i < int(maxOperationsInQueue); i++ {
			pool.AddTask(i)
		}

		// kill all workers after the currently enqueued jobs get processed
		pool.LateKillAllWorkers()
	}()

	// wait while at least one worker is alive
	pool.Wait()
}

Examples

See code examples at examples folder.

How To

Set the worker's handler function
pool.SetWorkerFunc(fn)

This is the way to set the function that will be invoked from a worker each time it pulls a job from the queue. Internally, the worker will pass the job as a parameter to this function.

The function signature (PoolFunc):

func handler(interface{}) bool {
	
}

The handler function should return true to let know that the job was successfully processed, or false in other case.

pool.SetWorkerFunc(func(data interface{}) bool {
		// do the processing

		// let the pool knows that the worker was able to complete the task
		return true
	})
Start up the workers
Start up the workers asynchronously

StartWorkers spins up the workers. The amount of workers to be started up is defined at the Pool instantiation.

pool.StartWorkers()

This is an asynchronous operation, but there is a way to be be notified each time a new worker is started up: through a channel. See SetNewWorkerChan(chan).

Start up the workers synchronously

StartWorkersAndWait spins up the workers and wait until all of them are 100% up. The amount of workers to be started up is defined at the Pool instantiation.

pool.StartWorkersAndWait()

Although this is an synchronous operation, there is a way to be be notified each time a new worker is started up: through a channel. See SetNewWorkerChan(chan). Keep in mind that the channel listener should be running on a different goroutine.

Receive a notification every time a new worker is started up

SetNewWorkerChan(chan) sets a channel to receive notifications every time a new worker is started up.

pool.SetNewWorkerChan(ch chan<- int)

This is optional, no channel is needed to start up new workers. Basically is just a way to give feedback for the worker's start up operation.

Enqueue jobs on demand
Enqueue a simple job

AddTask will enqueue a job into a FIFO queue (a channel).

pool.AddTask(data)

The parameter for the job's data accepts any kind of value (interface{}).

Workers (if alive) will be listening to and picking up jobs from this queue. If no workers are alive nor idle, the job will stay in the queue until any worker will be ready to pick it up and start processing it.

The queue in which this function enqueues the jobs has a limit (it was set up at pool initialization). It means that AddTask will wait for a free queue slot to enqueue a new job in case the queue is at full capacity.

AddTask will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during a certain amount of time when WaitUntilNSuccesses meets the stop condition.

Enqueue a simple job plus callback function

AddTaskCallback will enqueue a job plus a callback function into a FIFO queue (a channel).

pool.AddTaskCallback(data, callback)

The parameter for the job's data accepts any type (interface{}).

Workers (if alive) will be listening to and picking up jobs from this queue. If no workers are alive nor idle, the job will stay in the queue until any worker will be ready to pick it up and start processing it.

The worker who picks up this job + callback will process the job first and later will invoke the callback function, passing the job's data as a parameter.

The queue in which this function enqueues the jobs has a limit (it was set up at pool initialization). It means that AddTaskCallback will wait for a free queue slot to enqueue a new job in case the queue is at full capacity.

AddTaskCallback will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during a certain amount of time when WaitUntilNSuccesses meets the stop condition.

Enqueue a callback function

AddCallback will enqueue a callback function into a FIFO queue (a channel).

pool.AddCallback(callback)

Workers (if alive) will be listening to and picking up jobs from this queue. If no workers are alive nor idle, the job will stay in the queue until any worker will be ready to pick it up and start processing it.

The worker who picks up this task will only invoke the callback function, passing nil as a parameter.

The queue in which this function enqueues the jobs has a limit (it was set up at pool initialization). It means that AddCallback will wait for a free queue slot to enqueue a new job in case the queue is at full capacity.

AddCallback will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during a certain amount of time when WaitUntilNSuccesses meets the stop condition.

Enqueue a job with category and callback

AddComplexTask will enqueue a job into a FIFO queue (a channel).

pool.AddComplexTask(data, category, callback)

This function extends the scope of AddTask adding category and callback.

The job will be grouped based on the given category (for stats purposes).

The callback function (if any) will be invoked just after the job gets processed.

Pass multiple data to be processed by a worker

Let's suppose you have the following struct:

type JobData struct {
	Filename string
	Path     string
	Size     uint64
}

then you can enqueue it as a job (to be processed by a worker):

pool.AddTask(JobData{
	Filename: "file.txt",
	Path:     "/tmp/myfiles/",
	Size:     1500,
})

Keep in mind that the worker's handler function needs to cast the parameter as a JobData (that is on your side).

Wait until at least a worker is alive

Wait blocks until at least one worker is alive.

pool.Wait()
Wait until n jobs get successfully processed

WaitUntilNSuccesses blocks until n jobs get successfully processed.

pool.WaitUntilNSuccesses(n)
Add workers on demand
Add a worker on demand

AddWorker adds a new worker to the pool.

pool.AddWorker()

This is an asynchronous operation, but there is a way to be be notified each time a new worker is started up: through a channel.

Add n workers on demand

AddWorkers adds n new workers to the pool.

pool.AddWorkers(n)

This is an asynchronous operation, but there is a way to be be notified each time a new worker is started up: through a channel.

Multiple ways to kill workers
Kill workers on demand
Kill a worker

KillWorker kills a live worker once it is idle or after it finishes with its current job.

pool.KillWorker()

This is an asynchronous operation, but there is a way to be be notified each time a worker is killed: through a channel. See SetKilledWorkerChan(chan).

Kill n workers

KillWorkers kills all live workers. For those currently processing jobs, it will wait until the work is done.

pool.KillWorkers(n)

This is an asynchronous operation, but there is a way to be be notified each time a worker is killed: through a channel. See SetKilledWorkerChan(chan).

Kill all workers

KillAllWorkers kills all live workers once they are idle or after they finish processing their current jobs.

pool.KillAllWorkers()

This is an asynchronous operation, but there is a way to be be notified each time a worker is killed: through a channel. See SetKilledWorkerChan(chan).

Kill all workers and wait

KillAllWorkersAndWait triggers an action to kill all live workers and blocks until the action is done (meaning that all live workers are down).

pool.KillAllWorkersAndWait()

This is an asynchronous operation, but there is a way to be be notified each time a worker is killed: through a channel. See SetKilledWorkerChan(chan).

Kill workers after currently enqueued jobs get processed
Kill a worker after currently enqueued jobs get processed

LateKillWorker kills a worker after currently enqueued jobs get processed. If the worker is processing a job, it will be killed after the job gets processed.

By "currently enqueued jobs" I mean: the jobs enqueued at the moment this function was invoked.

pool.LateKillWorker()

This is an asynchronous operation, but there is a way to be be notified each time a worker is killed: through a channel. See SetKilledWorkerChan(chan).

Kill n workers after currently enqueued jobs get processed

LateKillWorkers kills n workers after currently enqueued jobs get processed. If the workers are processing jobs, they will be killed after the jobs get processed.

By "currently enqueued jobs" I mean: the jobs enqueued at the moment this function was invoked.

pool.LateKillWorkers(n)

This is an asynchronous operation, but there is a way to be be notified each time a worker is killed: through a channel. See SetKilledWorkerChan(chan).

Kill all workers after currently enqueued jobs get processed

LateKillAllWorkers kills all workers after currently enqueued jobs get processed. For those workers currently processing jobs, they will be killed after the jobs get processed.

By "currently enqueued jobs" I mean: the jobs enqueued at the moment this function was invoked.

pool.LateKillAllWorkers()

This is an asynchronous operation, but there is a way to be be notified each time a worker is killed: through a channel. See SetKilledWorkerChan(chan).

Receive a notification every time a worker is killed

SetKilledWorkerChan(chan) sets a channel to receive notifications every time a worker is killed.

pool.SetKilledWorkerChan(ch chan int)

This is 100% optional.

Update the amount of workers on demand

SetTotalWorkers adjusts the number of live workers.

pool.SetTotalWorkers(n)

In case it needs to kill some workers (in order to adjust the total based on the given parameter), it will wait until their current jobs get processed (if they are processing jobs).

This is an asynchronous operation, but there is a way to be be notified each time a new worker is started up: through a channel.

It returns an error in the following scenarios:

  • The workers were not started yet by StartWorkers.
  • There is a "in course" KillAllWorkers operation.
Pause all workers

PauseAllWorkers immediately pauses all workers (after they finish processing their current jobs). No new jobs will be pulled from the queue until ResumeAllWorkers() be invoked.

pool.PauseAllWorkers()
Resume all workers

ResumeAllWorkers() resumes all workers.

pool.ResumeAllWorkers()

History

v0.10.2
  • Minor readme.md update
v0.10.1
v0.10.0
  • Initial workers automatically start running on pool initialization

    • deprecated StartWorkers()
  • Each new added worker is being automatically started

  • SafeWaitUntilNSuccesses: it waits until n tasks were successfully processed, but if any extra task is already "in progress", this function will wait until it is done. An extra enqueued task could started processing just before the nth expected task was finished.

  • GetTotalWorkersInProgress: returns total workers in progress.

  • KillAllWorkers returns error

  • KillAllWorkersAndWait returns error

  • SetTotalWorkers It won't return error because of the workers were not yet started, workers are now started once they are created.

  • WaitUntilInitialWorkersAreUp: it waits until all initial workers are up and running.

  • StartWorkers is deprecated. It only returns nil.

  • StartWorkersAndWait is deprecated. It returns WaitUntilInitialWorkersAreUp()

v0.9.1
  • Examples: Replaced pool.StartWorkers() by pool.StartWorkersAndWait()
v0.9.0
  • Added a way to know that new workers were started (using an optional channel)
  • Added a way to know if a worker was killed (using an optional channel)
  • StartWorkersAndWait() to start workers (for first time) and wait until all of them are alive
v0.8.0
  • Enqueue jobs plus callback functions
  • Enqueue callback functions without jobs' data
v0.7.4
  • Fixed bug that caused randomly worker initialization error
v0.7.3
  • SetTotalWorkers() returns error in case it is invoked before StartWorkers()
v0.7.2
  • Fixed bug that prevents to start/add new workers after a Wait() function finishes.
v0.7.1
  • LateKillAllWorkers() will kill all alive workers (not only the number of workers that were alive when the function was invoked)
v0.7
  • Repository name modified to "goworkerpool"
v0.6
  • Pause / Resume all workers:
    • PauseAllWorkers()
    • ResumeAllWorkers()
  • Workers will listen to higher priority channels first
  • Workers will listen to broad messages (kill all workers, ...) before get signals from any other channel:
    • KillAllWorkers()
    • KillAllWorkersAndWait()
  • Added function to kill all workers (send a broad message to all workers) and wait until it happens:
    • pool.KillAllWorkersAndWait()
  • Added code examples
v0.5
  • Make Wait() listen to a channel (instead of use an endless for loop)
v0.4
  • Sync actions over workers. A FIFO queue was created for the following actions:
    • Add new worker
    • Kill worker(s)
    • Late kill worker(s)
    • Set total workers
v0.3
  • Added function to adjust number of live workers:
    • pool.SetTotalWorkers(n)
  • Added function to kill all live workers after current jobs get processed:
    • pool.LateKillAllWorkers()
v0.2
  • readme.md
  • godoc
  • code comments
v0.1

First stable BETA version.

Documentation

Index

Constants

View Source
const (
	ErrorEmbeddedErrors                         = "embedded.errors"
	ErrorData                                   = "wrong.data"
	ErrorKillAllWorkersInProgress               = "action.kill.all.workers.in.progress"
	ErrorWaitInProgress                         = "action.wait.in.progress"
	ErrorWaitUntilInitialWorkersAreUpInProgress = "action.wait.until.initial.workers.are.up.in.progress"
	ErrorDispatcherNoActionNoTask               = "dispatcher.no.action.no.task"
	ErrorDispatcherWorkerCouldNotBeEnqueued     = "dispatcher.worker.couldnt.be.enqueued"
	ErrorNoDefaultWorkerFunction                = "no.default.worker.function"
	ErrorDispatcherChannelFull                  = "dispatcher.channel.at.full.capacity"
	ErrorFullCapacityChannel                    = "full.capacity.channel"
	ErrorWorkerNotFound                         = "worker.not.found"
	ErrorWorkerType                             = "worker.wrong.type"
	ErrorWorkerMaxTimeExceeded                  = "worker.max.time.exceeded"
	ErrorWorkerFuncPanic                        = "worker.func.panic"
)
View Source
const (
	ErrorWorkerNilTask    = "worker.nil.task"
	ErrorWorkerNoTaskFunc = "worker.no.func.to.process.task"
	ListenInProgress      = "listen.in.progress"
	WorkerClosed          = "worker.closed"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

func NewPool

func NewPool(initialWorkers int, maxOperationsInQueue int, logVerbose bool) *Pool

NewPool is deprecated since version 0.10.0, please do not use it. Use NewPoolWithOptions instead. NewPool builds and returns a new Pool. This function will be removed on version 1.0.0 Parameters:

initialWorkers = amount of workers to start up at initialization
maxOperationsInQueue = maximum amount of actions/tasks that could be enqueued at the same time
logVerbose = true ==> output logs

func NewPoolWithOptions

func NewPoolWithOptions(options PoolOptions) (*Pool, error)

NewPoolWithOptions builds and returns a Pool. This function will return error if any of the following scenarios occurs:

  • MaxWorkers == 0
  • MaxOperationsInQueue == 0

func (*Pool) AddCallback

func (st *Pool) AddCallback(callbackFn PoolCallback) error

AddCallback enqueues a callback function into a FIFO queue.

Workers (if alive) will be listening to and picking up jobs from this queue. If no workers are alive nor idle, the job will stay in the queue until any worker will be ready to pick it up and start processing it.

The worker who picks up this job will only invoke the callback function, passing nil as a parameter.

The queue in which this function enqueues the jobs has a limit (it was set up at pool initialization). It means that AddCallback will wait for a free queue slot to enqueue a new job in case the queue is at full capacity.

AddCallback will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during a certain amount of time when WaitUntilNSuccesses meets the stop condition.

func (*Pool) AddTask

func (st *Pool) AddTask(data interface{}) error

AddTask enqueues a task (into a FIFO queue).

The parameter for the task's data accepts any type (interface{}).

Workers (if any) will be listening to and picking up tasks from this queue. If no workers are alive nor idle, the task will stay in the queue until any worker will be ready to pick it up and start processing it.

The queue in which this function enqueues the tasks has a limit (it was set up at pool initialization). It means that AddTask will wait for a free queue slot to enqueue a new task in case the queue is at full capacity.

AddTask returns an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during a certain amount of time when WaitUntilNSuccesses meets the stop condition, or in other words: when KillAllWorkers is in progress.

func (*Pool) AddTaskCallback

func (st *Pool) AddTaskCallback(data interface{}, callbackFn PoolCallback) error

AddTaskCallback enqueues a job and a callback function into a FIFO queue.

The parameter for the job's data accepts any data type (interface{}).

Workers will be listening to and picking up jobs from this queue. If no workers are alive nor idle, the job will stay in the queue until any worker will be ready to pick it up and start processing it.

The worker who picks up this job and callback will process the job first and later will invoke the callback function, passing the job's data as a parameter.

The queue in which this function enqueues the jobs has a capacity limit (it was set up at pool initialization). This means that AddTaskCallback will wait for a free queue slot to enqueue a new job in case the queue is at full capacity.

AddTaskCallback will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during a certain amount of time when WaitUntilNSuccesses meets the stop condition.

func (*Pool) AddWorker

func (st *Pool) AddWorker() error

AddWorker adds a new worker to the pool.

This is an asynchronous operation, but you can be notified through a channel every time a new worker is started. The channel (optional) could be set using SetNewWorkerChan(chan) or at initialization (PoolOptions.NewWorkerChan in NewPoolWithOptions).

AddWorker returns an error if at least one of the following statements is true:

  • the worker could not be started
  • there is a "in course" KillAllWorkers operation

func (*Pool) AddWorkers

func (st *Pool) AddWorkers(n int) error

AddWorkers adds n extra workers to the pool.

This is an asynchronous operation, but you can be notified through a channel every time a new worker is started. The channel (optional) could be set using SetNewWorkerChan(chan) or at initialization (PoolOptions.NewWorkerChan in NewPoolWithOptions).

AddWorkers returns an error if at least one of the following statements are true:

  • parameter n <= 0
  • a worker could not be started
  • there is a "in course" KillAllWorkers operation

func (*Pool) GetPanicTasks

func (st *Pool) GetPanicTasks() goconcurrentqueue.Queue

GetPanicTasks returns the queue containing all tasks that failed because the worker's panicked while processing.

func (*Pool) GetTotalWorkers

func (st *Pool) GetTotalWorkers() int

GetTotalWorkers returns the number of registered workers.

func (*Pool) GetTotalWorkersInProgress

func (st *Pool) GetTotalWorkersInProgress() int

GetTotalWorkersInProgress returns the amount of workers currently processing data (tasks/actions)

func (*Pool) KillAllWorkers

func (st *Pool) KillAllWorkers() error

KillAllWorkers kills all live workers (the number of live workers is determined at the moment this action is processed). If a worker is processing a job, it will not be immediately killed, the pool will wait until the current job gets processed.

The following functions will return error if invoked during KillAllWorkers execution:

  • KillWorker
  • KillWorkers
  • LateKillWorker
  • LateKillWorkers
  • LateKillAllWorkers
  • AddWorker
  • AddWorkers
  • SetTotalWorkers

func (*Pool) KillAllWorkersAndWait

func (st *Pool) KillAllWorkersAndWait() error

KillAllWorkersAndWait kills all workers (the amount of workers is determined at the moment this action is processed). This function waits until current alive workers are down. This is the difference between KillAllWorkersAndWait() and KillAllWorkers() If a worker is processing a job, it will not be immediately terminated, the pool will wait until the current job gets processed.

The following functions will return error if invoked during this function execution:

  • KillWorker
  • KillWorkers
  • LateKillWorker
  • LateKillWorkers
  • LateKillAllWorkers
  • AddWorker
  • AddWorkers
  • SetTotalWorkers

func (*Pool) KillWorker

func (st *Pool) KillWorker() error

KillWorker kills an idle worker. The kill signal has a higher priority than the enqueued jobs. It means that a worker will be killed once it finishes its current job although there are unprocessed jobs in the queue. Use LateKillWorker() in case you need to wait until current enqueued jobs get processed. It returns an error in case there is a "in course" KillAllWorkers operation.

func (*Pool) KillWorkers

func (st *Pool) KillWorkers(n int) error

KillWorkers kills n idle workers. If n > GetTotalWorkers(), only current amount of workers will be terminated. The kill signal has a higher priority than the enqueued jobs. It means that a worker will be killed once it finishes its current job, no matter if there are unprocessed jobs in the queue. Use LateKillAllWorkers() ot LateKillWorker() in case you need to wait until current enqueued jobs get processed. It returns an error in the following scenarios:

  • there is a "in course" KillAllWorkers operation -

func (*Pool) LateKillAllWorkers

func (st *Pool) LateKillAllWorkers() error

LateKillAllWorkers kills all live workers only after all current jobs get processed. By "current jobs" it means: the number of enqueued jobs in the exact moment this function get executed. It returns an error for the following scenarios:

  • there is a "in course" KillAllWorkers operation.
  • the operations' queue (where tasks/actions get enqueued) is at full capacity

func (*Pool) LateKillWorker

func (st *Pool) LateKillWorker() error

LateKillWorker kills a worker only after current enqueued jobs get processed. It returns an error in case there is a "in course" KillAllWorkers operation.

func (*Pool) LateKillWorkers

func (st *Pool) LateKillWorkers(n int) error

LateKillWorkers kills n workers only after all current jobs get processed. If n > GetTotalWorkers(), only current amount of workers will be terminated. It returns an error in case there is a "in course" KillAllWorkers operation.

func (*Pool) PauseAllWorkers

func (st *Pool) PauseAllWorkers()

PauseAllWorkers pauses all workers. This action will wait until previous invoked actions get processed, but it will skip the line of enqueued jobs(tasks). The jobs that are being processed at the time this action is processed will not be interrupted. From the moment this action gets processed, all enqueued jobs/actions will not be processed until the workers get resumed by ResumeAllWorkers.

func (*Pool) ResumeAllWorkers

func (st *Pool) ResumeAllWorkers()

ResumeAllWorkers resumes all workers. Nothing will happen if this function is invoked while no workers are paused.

func (*Pool) SafeWaitUntilNSuccesses

func (st *Pool) SafeWaitUntilNSuccesses(n int) error

SafeWaitUntilNSuccesses waits until n workers finished their job successfully, then kills all active workers. A job/task is considered successful if the worker that processed it returned true. It could happen that other workers were processing jobs at the time the nth job was successfully processed, so this function will wait until all those extra workers were done. In other words, SafeWaitUntilNSuccesses guarantees that when it finishes no worker would be processing data. TODO ::: this function fails if n >= total workers

func (*Pool) SetKilledWorkerChan

func (st *Pool) SetKilledWorkerChan(ch chan int)

SetKilledWorkerChan sets a channel to receive a signal after a worker is terminated

func (*Pool) SetLogChan

func (st *Pool) SetLogChan(ch chan PoolLog)

SetLogChan sets log channel. System logs will be sent over this channel.

func (*Pool) SetNewWorkerChan

func (st *Pool) SetNewWorkerChan(ch chan int)

SetNewWorkerChan sets a channel to receive a signal once each new worker is started

func (*Pool) SetTotalWorkers

func (st *Pool) SetTotalWorkers(n int) error

SetTotalWorkers adjusts the number of live workers.

In case it needs to kill some workers (in order to adjust the total based on the given parameter), it will wait until their current jobs get processed (in case they are processing jobs).

This is an asynchronous operation, but you can be notified through a channel every time a new worker is started. The channel (optional) can be defined at SetNewWorkerChan(chan).

SetTotalWorkers returns an error in the following scenarios:

  • There is a "in course" KillAllWorkers operation.

func (*Pool) SetWorkerFunc

func (st *Pool) SetWorkerFunc(fn PoolFunc)

SetWorkerFunc sets the worker's default function handler. This function will be invoked each time a worker pulls a new job, and should return true to let know that the job was successfully completed, or false in other case.

func (*Pool) StartWorkers

func (st *Pool) StartWorkers() error

StartWorkers is deprecated since version 0.10.0, please do not use it. Why is StartWorkers deprecated? There is no need to explicitly starts up the workers as they automatically start up once created.

For backward compatibility, this function only returns nil. StartWorkers will be removed on version 1.0.0

Prior to version 0.10.0, the goal for this function was to start up the amount of workers pre-defined at pool instantiation.

func (*Pool) StartWorkersAndWait

func (st *Pool) StartWorkersAndWait() error

StartWorkersAndWait is deprecated since version 0.10.0, please do not use it. Use WaitUntilInitialWorkersAreUp instead. Why is StartWorkersAndWait deprecated? Workers are started up once they get created, there is no need to do it explicitly.

For backward compatibility, this function will return WaitUntilInitialWorkersAreUp. StartWorkersAndWait will be removed on version 1.0.0

Prior to version 0.10.0, the goal of this function was to start up all workers pre-defined at pool instantiation and wait until until all them were up and running.

func (*Pool) Wait

func (st *Pool) Wait() error

Wait waits while at least a worker is up and running

func (*Pool) WaitUntilInitialWorkersAreUp

func (st *Pool) WaitUntilInitialWorkersAreUp() error

WaitUntilInitialWorkersAreUp waits until all initial workers are up and running. The amount of initial workers were pre-defined at pool instantiation.

func (*Pool) WaitUntilNSuccesses

func (st *Pool) WaitUntilNSuccesses(n int) error

WaitUntilNSuccesses waits until n workers finished their job successfully, then kills all active workers. A worker is considered successfully if the associated worker function returned true. TODO ::: An error will be returned if the worker's function is not already set.

type PoolCallback

type PoolCallback func(interface{})

PoolCallback defines the callback function signature

type PoolError

type PoolError struct {
	// contains filtered or unexported fields
}

func (*PoolError) AddEmbeddedErrors

func (st *PoolError) AddEmbeddedErrors(errors []error)

AddEmbeddedErrors adds a slice of errors

func (*PoolError) Code

func (st *PoolError) Code() string

func (*PoolError) Error

func (st *PoolError) Error() string

func (*PoolError) GetEmbeddedErrors

func (st *PoolError) GetEmbeddedErrors() []error

type PoolFunc

type PoolFunc func(interface{}) bool

PoolFunc defines the function signature to be implemented by the workers

type PoolLog

type PoolLog struct {
	Code    string
	Message string
	Error   error
}

Pool Log data

type PoolOptions

type PoolOptions struct {
	// amount of initial workers
	TotalInitialWorkers uint
	// maximum amount of workers
	MaxWorkers uint
	// maximum actions/tasks to be enqueued at the same time
	MaxOperationsInQueue uint
	// channel to send notifications once new workers are created
	NewWorkerChan chan int
	// wait (NewPoolWithOptions function) until all initial workers are up and running
	WaitUntilInitialWorkersAreUp bool
	// log's verbose
	LogVerbose bool
}

PoolOptions contains configuration data to build a Pool using NewPoolWithOptions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL