gopherpool

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2022 License: Apache-2.0 Imports: 9 Imported by: 0

README

gopher-pool Godoc

Yet another Worker Pool implementation.

Gophers are swimming!

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAlreadyStarted = errors.New("already started")
	ErrAlreadyEnded   = errors.New("already ended")
)

Functions

This section is empty.

Types

type Option

type Option = func(w *WorkerPool) *WorkerPool

Option is an option that changes WoerkerPool instance. This can be used in NewWorkerPool.

func SetAbnormalReturnCb

func SetAbnormalReturnCb(cb func(err error)) Option

SetDefaultAbnormalReturnCb is an Option that overrides abnormal-return cb with cb.

cb is called if and only if WorkFn is returned abnormally. cb may be called multiple time simultaneously.

func SetDefaultAbnormalReturnCb

func SetDefaultAbnormalReturnCb() Option

SetDefaultAbnormalReturnCb is an Option that,

  • overrides abnormal-return cb.
  • simply log.Println runtime-panic or runtime.Goexit-is-called error.

cb is called if and only if WorkFn is returned abnormally. cb may be called multiple time simultaneously.

func SetDefaultWorkerConstructor

func SetDefaultWorkerConstructor(workCh chan WorkFn, onTaskReceived func(), onTaskDone func()) Option

SetWorkerConstructor sets default worker construtor implementation built from given args. workFn must be sent throught this workCh. Both of onTaskReceived and onTaskDone can be nil.

func SetWorkerConstructor

func SetWorkerConstructor(workCh chan WorkFn, workerConstructor WorkerConstructor) Option

SetWorkerConstructor sets workerConstructor and assosiated workCh. workFn must be sent throught this workCh.

type WorkFn

type WorkFn = func(ctx context.Context)

type Worker

type Worker[T any] struct {
	*state.WorkingStateChecker

	*state.EndedStateChecker
	// contains filtered or unexported fields
}

Worker represents a single task executor. It will work on a single task at a time. It may be in stopped-state where loop is stopped, working-state where looping in goroutine, or ended-state where no way is given to step into working-state again.

func NewWorker

func NewWorker[T any](id T, workCh <-chan WorkFn, onTaskReceived, onWorkDone func()) (*Worker[T], error)

func (*Worker[T]) Id

func (w *Worker[T]) Id() T

Id is getter of id.

func (*Worker[T]) Kill

func (w *Worker[T]) Kill()

Kill kills this worker. If a task is being worked at the time of invocation, a contex passed to the task will be cancelled immediately. Kill makes this worker to step into ended state, making it impossible to Start-ed again.

func (*Worker[T]) Start

func (w *Worker[T]) Start() (err error)

Start starts worker loop. It blocks until Stop and/or Kill is called, or conditions below are met. w will be ended if workCh is closed or workFn returns abnormally.

  • Start returns `ErrAlreadyEnded` if worker is already ended.
  • Start returns `ErrAlreadyStarted` if worker is already started.

func (*Worker[T]) Stop

func (w *Worker[T]) Stop()

Stop stops an active Start loop. Stop does not cancel contex passed to workFn, just waits until it returns.

Stop will stops next Start immediately if Start is not doing its loop when Stop is called.

type WorkerConstructor

type WorkerConstructor = func(id int, onTaskReceived func(), onTaskDone func()) *Worker[int]

WorkerConstructor is aliased type of constructor. id must be unique value. Overlapping id causes undefined behavior. onTaskReceived, onTaskDone can be nil.

func BuildWorkerConstructor

func BuildWorkerConstructor(workCh <-chan WorkFn, onTaskReceived_ func(), onTaskDone_ func()) WorkerConstructor

BuildWorkerConstructor is helper function for WorkerConstructor. workCh must not be nil. onTaskReceived_, onTaskDone_ can be nil.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is container for workers.

func NewWorkerPool

func NewWorkerPool(
	options ...Option,
) *WorkerPool

func (*WorkerPool) ActiveWorkerNum

func (p *WorkerPool) ActiveWorkerNum() int64

ActiveWorkerNum returns number of actively working worker.

func (*WorkerPool) Add

func (p *WorkerPool) Add(delta uint32) (newAliveLen int)

Add adds delta number of workers to this pool. This will create delta number of goroutines.

func (*WorkerPool) Kill

func (p *WorkerPool) Kill()

Kill kills all workers.

func (*WorkerPool) Len

func (p *WorkerPool) Len() (alive int, sleeping int)

Len returns number of workers. alive is running workers. sleeping is workers removed by Remove while still working on its job.

func (*WorkerPool) Remove

func (p *WorkerPool) Remove(delta uint32) (alive int, sleeping int)

Remove removes delta number of randomly selected workers from this pool. Removed workers could be held as sleeping if they are still working on workFn.

func (*WorkerPool) Send

func (p *WorkerPool) Send(workFn WorkFn)

Send is wrapper that sends workFn to internal workCh. Send blocks until workFn is received.

func (*WorkerPool) SenderChan

func (p *WorkerPool) SenderChan() chan<- WorkFn

SenderChan is getter of sender side of WorkFn chan.

func (*WorkerPool) Wait

func (p *WorkerPool) Wait()

Wait waits for all workers to stop. Calling this without Kill and/or Remove all workers may block forever.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL