Documentation
¶
Index ¶
- Variables
- func ConsumeResultStream[T any](ctx context.Context, job JobResultPipe[T], consumer func(T) error) error
- func SafeChannelWrite[T any](ctx context.Context, ch chan<- JobResult[T], value JobResult[T]) error
- func SubmitJob[T any](ctx context.Context, m Manager, job Job[T]) error
- type Job
- func NewJob[T any](process func(ctx context.Context, result JobResultPipe[T]) error) Job[T]
- func NewJobWithBuffer[T any](process func(ctx context.Context, result JobResultPipe[T]) error, buffer int) Job[T]
- func NewJobWithBufferAndRetry[T any](process func(ctx context.Context, result JobResultPipe[T]) error, ...) Job[T]
- func NewJobWithRetry[T any](process func(ctx context.Context, result JobResultPipe[T]) error, retries int) Job[T]
- type JobImpl
- func (ji *JobImpl[T]) CanRun() bool
- func (ji *JobImpl[T]) Close()
- func (ji *JobImpl[T]) F() func(ctx context.Context, result JobResultPipe[T]) error
- func (ji *JobImpl[T]) ID() string
- func (ji *JobImpl[T]) IncreaseRuns()
- func (ji *JobImpl[T]) ReadResult(ctx context.Context) (JobResult[T], bool)
- func (ji *JobImpl[T]) ResultBufferSize() int
- func (ji *JobImpl[T]) ResultChan() <-chan JobResult[T]
- func (ji *JobImpl[T]) Retries() int
- func (ji *JobImpl[T]) Runs() int
- func (ji *JobImpl[T]) WriteError(ctx context.Context, val error) error
- func (ji *JobImpl[T]) WriteResult(ctx context.Context, val T) error
- type JobResult
- type JobResultPipe
- type Manager
- type Option
- func WithConcurrency(concurrency int) Option
- func WithPoolCount(count int) Option
- func WithPoolDisablePurge(disable bool) Option
- func WithPoolExpiryDuration(duration time.Duration) Option
- func WithPoolLogger(logger *util.LogEntry) Option
- func WithPoolNonblocking(nonblocking bool) Option
- func WithPoolPanicHandler(handler func(any)) Option
- func WithPoolPreAlloc(preAlloc bool) Option
- func WithSinglePoolCapacity(capacity int) Option
- type Options
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
var ErrWorkerPoolResultChannelIsClosed = errors.New("worker job is already closed")
Functions ¶
func ConsumeResultStream ¶ added in v1.68.6
func SafeChannelWrite ¶
SafeChannelWrite writes a value to a channel, returning an error if the context is canceled.
Types ¶
type Job ¶
type Job[T any] interface { JobResultPipe[T] F() func(ctx context.Context, result JobResultPipe[T]) error ID() string CanRun() bool Retries() int Runs() int IncreaseRuns() }
Job represents a task that can be executed and produce results of type T.
func NewJobWithBuffer ¶
func NewJobWithBuffer[T any](process func(ctx context.Context, result JobResultPipe[T]) error, buffer int) Job[T]
NewJobWithBuffer creates a new job with a specified buffer size.
func NewJobWithBufferAndRetry ¶
func NewJobWithBufferAndRetry[T any]( process func(ctx context.Context, result JobResultPipe[T]) error, resultBufferSize, retries int, ) Job[T]
NewJobWithBufferAndRetry creates a new job with specified buffer size and retry count.
func NewJobWithRetry ¶
func NewJobWithRetry[T any](process func(ctx context.Context, result JobResultPipe[T]) error, retries int) Job[T]
NewJobWithRetry creates a new job with a specified retry count.
type JobImpl ¶
type JobImpl[T any] struct { // contains filtered or unexported fields }
JobImpl is the concrete implementation of a Job.
func (*JobImpl[T]) F ¶
func (ji *JobImpl[T]) F() func(ctx context.Context, result JobResultPipe[T]) error
func (*JobImpl[T]) IncreaseRuns ¶
func (ji *JobImpl[T]) IncreaseRuns()
func (*JobImpl[T]) ReadResult ¶
func (*JobImpl[T]) ResultBufferSize ¶
func (*JobImpl[T]) ResultChan ¶
func (*JobImpl[T]) WriteError ¶
type JobResult ¶
JobResult represents the result of a job execution, which can be either a value of type T or an error.
func ErrorResult ¶
type JobResultPipe ¶
type JobResultPipe[T any] interface { ResultBufferSize() int ResultChan() <-chan JobResult[T] WriteError(ctx context.Context, val error) error WriteResult(ctx context.Context, val T) error ReadResult(ctx context.Context) (JobResult[T], bool) Close() }
JobResultPipe is a channel-based pipeline for passing job results.
type Manager ¶
type Manager interface {
GetPool() (WorkerPool, error)
StopError(context.Context, error)
}
type Option ¶
type Option func(*Options)
Option defines a function that configures worker pool options.
func WithConcurrency ¶
WithConcurrency sets the concurrency for the worker pool.
func WithPoolCount ¶
WithPoolCount sets the number of worker pools.
func WithPoolDisablePurge ¶
WithPoolDisablePurge disables the purge mechanism in the pool.
func WithPoolExpiryDuration ¶
WithPoolExpiryDuration sets the expiry duration for workers.
func WithPoolLogger ¶
WithPoolLogger sets a logger for the pool.
func WithPoolNonblocking ¶
WithPoolNonblocking sets the non-blocking option for the pool.
func WithPoolPanicHandler ¶
WithPoolPanicHandler sets a panic handlers for the pool.
func WithPoolPreAlloc ¶
WithPoolPreAlloc pre-allocates memory for the pool.
func WithSinglePoolCapacity ¶
WithSinglePoolCapacity sets the capacity for a single worker pool.
type Options ¶
type Options struct {
PoolCount int
SinglePoolCapacity int
Concurrency int
ExpiryDuration time.Duration
Nonblocking bool
PreAlloc bool
PanicHandler func(any)
Logger *util.LogEntry
DisablePurge bool
}
Options defines configurable options for the service's internal worker pool.
type WorkerPool ¶
WorkerPool defines the common methods for worker pool operations. This allows the Service to hold either a single ants.Pool or an ants.MultiPool.