gostickyworkerpool

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2024 License: MIT Imports: 6 Imported by: 0

README

go-sticky-worker-pool

Coverage

This is a very simple implementation of key-sticky concurrency over go routines.

Imagine you have an FIFO queue and you want to run jobs concurrently keeping ordering. Well, you can't guarantee though, but you can guarantee ordering of correlated workloads (e.g. workloads of the same user id, or the same aggregate id). Besides that, it may reduce your lock management overhead, since correlated jobs will run in order, and possibly will help you avoid deadlocks.

This is achieved by using a consistent hash that delivers workloads with the same key to the same channel.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStopTimeout       = errors.New("stop timeout")
	ErrWorkerPoolStopped = errors.New("worker pool has stopped")
)

Functions

func WithChannelsBufferSize

func WithChannelsBufferSize(channelsBufferSize uint) stickyWorkerPoolOption

Set ChannelsBufferSize of the worker pool config

func WithConcurrency

func WithConcurrency(concurrency uint) stickyWorkerPoolOption

Set Concurrency of the worker pool config

func WithStopTimeout

func WithStopTimeout(timeout time.Duration) stickyWorkerPoolOption

Set Stop timeout of the worker pool config

Types

type StickyWorkerPool

type StickyWorkerPool struct {
	// contains filtered or unexported fields
}

This worker pool uses a hash function to stick keys with workers in order to provide a first-in-first-out behaviour among the workloads with the same key. There is no guarantee of ordering between diferent keys.

func NewStickyWorkerPool

func NewStickyWorkerPool(workFn WorkFn, options ...stickyWorkerPoolOption) *StickyWorkerPool

Creates a new StickyWorkerPool

func (*StickyWorkerPool) Send

func (s *StickyWorkerPool) Send(ctx context.Context, key string, args any) (<-chan error, error)

func (*StickyWorkerPool) Start

func (s *StickyWorkerPool) Start(ctx context.Context)

Start starts the workers of the pool It creates starts <concurrency> go routines a.k.a. workers

func (*StickyWorkerPool) Stop

func (s *StickyWorkerPool) Stop() error

Stop stops the worker It prevents worker to receive new workloads returning error and await for the workloads flush or timeout

type WorkFn

type WorkFn func(ctx context.Context, key string, args any) error

WorkFn is the function responsible for handling workloads. It will receive the context, key and args passed to Send function The error will be send to the error channel returned from Send function

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL