workerpool

package
v1.69.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrWorkerPoolResultChannelIsClosed = errors.New("worker job is already closed")

Functions

func ConsumeResultStream added in v1.68.6

func ConsumeResultStream[T any](ctx context.Context, job JobResultPipe[T], consumer func(T) error) error

func SafeChannelWrite

func SafeChannelWrite[T any](ctx context.Context, ch chan<- JobResult[T], value JobResult[T]) error

SafeChannelWrite writes a value to a channel, returning an error if the context is canceled.

func SubmitJob

func SubmitJob[T any](ctx context.Context, m Manager, job Job[T]) error

SubmitJob used to submit jobs to our worker pool for processing. Once a job is submitted the end user does not need to do any further tasks One can ideally also wait for the results of their processing for their specific job by listening to the job's ResultChan.

Types

type Job

type Job[T any] interface {
	JobResultPipe[T]
	F() func(ctx context.Context, result JobResultPipe[T]) error
	ID() string
	CanRun() bool
	Retries() int
	Runs() int
	IncreaseRuns()
}

Job represents a task that can be executed and produce results of type T.

func NewJob

func NewJob[T any](process func(ctx context.Context, result JobResultPipe[T]) error) Job[T]

NewJob creates a new job with default buffer size and retry count.

func NewJobWithBuffer

func NewJobWithBuffer[T any](process func(ctx context.Context, result JobResultPipe[T]) error, buffer int) Job[T]

NewJobWithBuffer creates a new job with a specified buffer size.

func NewJobWithBufferAndRetry

func NewJobWithBufferAndRetry[T any](
	process func(ctx context.Context, result JobResultPipe[T]) error,
	resultBufferSize, retries int,
) Job[T]

NewJobWithBufferAndRetry creates a new job with specified buffer size and retry count.

func NewJobWithRetry

func NewJobWithRetry[T any](process func(ctx context.Context, result JobResultPipe[T]) error, retries int) Job[T]

NewJobWithRetry creates a new job with a specified retry count.

type JobImpl

type JobImpl[T any] struct {
	// contains filtered or unexported fields
}

JobImpl is the concrete implementation of a Job.

func (*JobImpl[T]) CanRun

func (ji *JobImpl[T]) CanRun() bool

func (*JobImpl[T]) Close

func (ji *JobImpl[T]) Close()

func (*JobImpl[T]) F

func (ji *JobImpl[T]) F() func(ctx context.Context, result JobResultPipe[T]) error

func (*JobImpl[T]) ID

func (ji *JobImpl[T]) ID() string

func (*JobImpl[T]) IncreaseRuns

func (ji *JobImpl[T]) IncreaseRuns()

func (*JobImpl[T]) ReadResult

func (ji *JobImpl[T]) ReadResult(ctx context.Context) (JobResult[T], bool)

func (*JobImpl[T]) ResultBufferSize

func (ji *JobImpl[T]) ResultBufferSize() int

func (*JobImpl[T]) ResultChan

func (ji *JobImpl[T]) ResultChan() <-chan JobResult[T]

func (*JobImpl[T]) Retries

func (ji *JobImpl[T]) Retries() int

func (*JobImpl[T]) Runs

func (ji *JobImpl[T]) Runs() int

func (*JobImpl[T]) WriteError

func (ji *JobImpl[T]) WriteError(ctx context.Context, val error) error

func (*JobImpl[T]) WriteResult

func (ji *JobImpl[T]) WriteResult(ctx context.Context, val T) error

type JobResult

type JobResult[T any] interface {
	IsError() bool
	Error() error
	Item() T
}

JobResult represents the result of a job execution, which can be either a value of type T or an error.

func ErrorResult

func ErrorResult[T any](err error) JobResult[T]

func Result

func Result[T any](item T) JobResult[T]

func SafeChannelRead

func SafeChannelRead[T any](ctx context.Context, ch <-chan JobResult[T]) (JobResult[T], bool)

SafeChannelRead reads a value from a channel, returning false if the channel is closed or the context is canceled.

type JobResultPipe

type JobResultPipe[T any] interface {
	ResultBufferSize() int
	ResultChan() <-chan JobResult[T]
	WriteError(ctx context.Context, val error) error
	WriteResult(ctx context.Context, val T) error
	ReadResult(ctx context.Context) (JobResult[T], bool)
	Close()
}

JobResultPipe is a channel-based pipeline for passing job results.

type Manager

type Manager interface {
	GetPool() (WorkerPool, error)
	StopError(context.Context, error)
}

func NewManager

func NewManager(
	ctx context.Context,
	cfg config.ConfigurationWorkerPool,
	stopOnErr func(ctx context.Context, err error),
	opts ...Option,
) Manager

type Option

type Option func(*Options)

Option defines a function that configures worker pool options.

func WithConcurrency

func WithConcurrency(concurrency int) Option

WithConcurrency sets the concurrency for the worker pool.

func WithPoolCount

func WithPoolCount(count int) Option

WithPoolCount sets the number of worker pools.

func WithPoolDisablePurge

func WithPoolDisablePurge(disable bool) Option

WithPoolDisablePurge disables the purge mechanism in the pool.

func WithPoolExpiryDuration

func WithPoolExpiryDuration(duration time.Duration) Option

WithPoolExpiryDuration sets the expiry duration for workers.

func WithPoolLogger

func WithPoolLogger(logger *util.LogEntry) Option

WithPoolLogger sets a logger for the pool.

func WithPoolNonblocking

func WithPoolNonblocking(nonblocking bool) Option

WithPoolNonblocking sets the non-blocking option for the pool.

func WithPoolPanicHandler

func WithPoolPanicHandler(handler func(any)) Option

WithPoolPanicHandler sets a panic handlers for the pool.

func WithPoolPreAlloc

func WithPoolPreAlloc(preAlloc bool) Option

WithPoolPreAlloc pre-allocates memory for the pool.

func WithSinglePoolCapacity

func WithSinglePoolCapacity(capacity int) Option

WithSinglePoolCapacity sets the capacity for a single worker pool.

type Options

type Options struct {
	PoolCount          int
	SinglePoolCapacity int
	Concurrency        int
	ExpiryDuration     time.Duration
	Nonblocking        bool
	PreAlloc           bool
	PanicHandler       func(any)
	Logger             *util.LogEntry
	DisablePurge       bool
}

Options defines configurable options for the service's internal worker pool.

type WorkerPool

type WorkerPool interface {
	Submit(ctx context.Context, task func()) error
	Shutdown()
}

WorkerPool defines the common methods for worker pool operations. This allows the Service to hold either a single ants.Pool or an ants.MultiPool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL