spmc

package
v7.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(opts *Options)

Option represents the optional function.

func WithExpiryDuration

func WithExpiryDuration(expiryDuration time.Duration) Option

WithExpiryDuration sets up the interval time of cleaning up goroutines.

func WithMaxBlockingTasks

func WithMaxBlockingTasks(maxBlockingTasks int) Option

WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.

func WithNonblocking

func WithNonblocking(nonblocking bool) Option

WithNonblocking indicates that pool will return nil when there is no available workers.

func WithPanicHandler

func WithPanicHandler(panicHandler func(interface{})) Option

WithPanicHandler sets up panic handler.

type Options

type Options struct {
	// PanicHandler is used to handle panics from each worker goroutine.
	// if nil, panics will be thrown out again from worker goroutines.
	PanicHandler func(interface{})

	// ExpiryDuration is a period for the scavenger goroutine to clean up those expired workers,
	// the scavenger scans all workers every `ExpiryDuration` and clean up those workers that haven't been
	// used for more than `ExpiryDuration`.
	ExpiryDuration time.Duration

	// LimitDuration is a period in the limit mode.
	LimitDuration time.Duration

	// Max number of goroutine blocking on pool.Submit.
	// 0 (default value) means no such limit.
	MaxBlockingTasks int

	// When Nonblocking is true, Pool.AddProduce will never be blocked.
	// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
	// When Nonblocking is true, MaxBlockingTasks is inoperative.
	Nonblocking bool
}

Options contains all options which will be applied when instantiating an pool.

func DefaultOption

func DefaultOption() *Options

DefaultOption is the default option.

type Pool

type Pool[T any, U any, C any, CT any, TF pooltask.Context[CT]] struct {
	gpool.BasePool
	// contains filtered or unexported fields
}

Pool is a single producer, multiple consumer goroutine pool. T is the type of the task. We can treat it as input. U is the type of the result. We can treat it as output. C is the type of the const parameter. if Our task look like y = ax + b, C acts like b as const parameter. CT is the type of the context. It needs to be read/written parallel. TF is the type of the context getter. It is used to get a context. if we don't need to use CT/TF, we can define CT as any and TF as NilContext.

func NewSPMCPool

func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name string, size int32, component util.Component, options ...Option) (*Pool[T, U, C, CT, TF], error)

NewSPMCPool create a single producer, multiple consumer goroutine pool.

func (*Pool[T, U, C, CT, TF]) AddProduceBySlice

func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), constArg C, contextFn TF, options ...TaskOption) (<-chan U, pooltask.TaskController[T, U, C, CT, TF])

AddProduceBySlice is to add Produce by a slice. Producer need to return ErrProducerClosed when to exit.

func (*Pool[T, U, C, CT, TF]) AddProducer

func (p *Pool[T, U, C, CT, TF]) AddProducer(producer func() (T, error), constArg C, contextFn TF, options ...TaskOption) (<-chan U, pooltask.TaskController[T, U, C, CT, TF])

AddProducer is to add producer. Producer need to return ErrProducerClosed when to exit.

func (*Pool[T, U, C, CT, TF]) Cap

func (p *Pool[T, U, C, CT, TF]) Cap() int

Cap returns the capacity of this pool.

func (*Pool[T, U, C, CT, TF]) DeleteTask

func (p *Pool[T, U, C, CT, TF]) DeleteTask(id uint64)

DeleteTask is to delete task. Please don't use it manually.

func (*Pool[T, U, C, CT, TF]) ExitSubTask

func (p *Pool[T, U, C, CT, TF]) ExitSubTask(id uint64)

ExitSubTask is to reduce the number of subtasks.

func (*Pool[T, U, C, CT, TF]) Free

func (p *Pool[T, U, C, CT, TF]) Free() int

Free returns the number of available goroutines to work, -1 indicates this pool is unlimited.

func (*Pool[T, U, C, CT, TF]) IsClosed

func (p *Pool[T, U, C, CT, TF]) IsClosed() bool

IsClosed indicates whether the pool is closed.

func (*Pool[T, U, C, CT, TF]) ReleaseAndWait

func (p *Pool[T, U, C, CT, TF]) ReleaseAndWait()

ReleaseAndWait is like Release, it waits all workers to exit.

func (*Pool[T, U, C, CT, TF]) Running

func (p *Pool[T, U, C, CT, TF]) Running() int

Running returns the number of workers currently running.

func (*Pool[T, U, C, CT, TF]) SetConsumerFunc

func (p *Pool[T, U, C, CT, TF]) SetConsumerFunc(consumerFunc func(T, C, CT) U)

SetConsumerFunc is to set ConsumerFunc which is to process the task.

func (*Pool[T, U, C, CT, TF]) StopTask

func (p *Pool[T, U, C, CT, TF]) StopTask(id uint64)

StopTask is to stop task by id Please don't use it manually.

func (*Pool[T, U, C, CT, TF]) Tune

func (p *Pool[T, U, C, CT, TF]) Tune(size int)

Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool.

func (*Pool[T, U, C, CT, TF]) Waiting

func (p *Pool[T, U, C, CT, TF]) Waiting() int

Waiting returns the number of tasks which are waiting be executed.

type TaskOption

type TaskOption func(opts *TaskOptions)

TaskOption represents the optional function.

func WithConcurrency

func WithConcurrency(c int) TaskOption

WithConcurrency is to set the concurrency of task.

func WithResultChanLen

func WithResultChanLen(resultChanLen uint64) TaskOption

WithResultChanLen is to set the length of result channel.

func WithTaskChanLen

func WithTaskChanLen(taskChanLen uint64) TaskOption

WithTaskChanLen is to set the length of task channel.

type TaskOptions

type TaskOptions struct {
	Concurrency   int
	ResultChanLen uint64
	TaskChanLen   uint64
}

TaskOptions contains all options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL