conc

package module
v0.0.0-...-bd5b8a1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2023 License: MIT Imports: 7 Imported by: 0

README

conc

GoDoc

Status: Alpha. Please file issues for all things high and low (including of naming of types and functions).

A library that dynamically adjusts concurrency of a worker pool based on observed latency changes. Thereby constantly trying to achieve optimal throughput without incurring latency costs.

conc is a concurrency library inspired by Netflix's concurrency-limits library. Please read Performance Under Load to understand more details.

Compared to Netflix's library, which is thread pool-oriented, this library is worker pool-oriented. The benefit is that it more easily can be used both for

  • thread pool scenarios (by submitting work/functions to a shared channel); and
  • worker pool scenarios such as processing things from a worker queue.

There's a simulator that allows you to test out the library. To play around with it:

$ git clone github.com/JensRantil/conc
$ cd conc/simulator
$ go run . -help

Documentation

Index

Constants

View Source
const DefaultMaxConcurrency = 20

Variables

This section is empty.

Functions

This section is empty.

Types

type Execution

type Execution struct {
	InFlight uint
	RTT      time.Duration
	Err      error
}

type GradientController

type GradientController struct {
	// contains filtered or unexported fields
}

GradientController delegates concurrency limits to SimplifiedController, adding basic limits such as minimum and maximum concurrency.

func NewGradientController

func NewGradientController(n Notifier, pool *WorkerPool, opts ...GradientOpts) *GradientController

NewGradientController creates a new GradientController. Call the Start() method to make it run. Once done, make sure to call Stop() to clear upp resources. After stopped, the controller can be started again if you want to.

func (*GradientController) Start

func (c *GradientController) Start()

func (*GradientController) Stop

func (c *GradientController) Stop(ctx context.Context)

type GradientOpts

type GradientOpts func(*GradientController)

func WithBackoffRatio

func WithBackoffRatio(b float64) GradientOpts

WithBackoffRatio sets

func WithInitialLimit

func WithInitialLimit(b uint) GradientOpts

func WithMaxLimit

func WithMaxLimit(b uint) GradientOpts

func WithMinLimit

func WithMinLimit(b uint) GradientOpts

func WithProbeInterval

func WithProbeInterval(i uint) GradientOpts

func WithQueueSize

func WithQueueSize(q func(uint) uint) GradientOpts

func WithRTTTolerance

func WithRTTTolerance(rttt float64) GradientOpts

func WithRandomSource

func WithRandomSource(s rand.Source) GradientOpts

func WithSmoothing

func WithSmoothing(s float64) GradientOpts

type NonBlockingReporter

type NonBlockingReporter struct {
	// contains filtered or unexported fields
}

func NewNonBlockingReporter

func NewNonBlockingReporter(chanSize int) *NonBlockingReporter

func (*NonBlockingReporter) NoWork

func (r *NonBlockingReporter) NoWork()

NoWork signals there was no work to be performed.

func (*NonBlockingReporter) NoWorkChan

func (r *NonBlockingReporter) NoWorkChan() chan struct{}

func (*NonBlockingReporter) NotifyChan

func (r *NonBlockingReporter) NotifyChan() chan Execution

func (*NonBlockingReporter) Work

func (r *NonBlockingReporter) Work(unit func() error)

Every time

type Notifier

type Notifier interface {
	NotifyChan() chan Execution
	NoWorkChan() chan struct{}
}

type Reporter

type Reporter interface {
	NoWork()
	Work(unit func() error)
}

Reporter receives feedback from processes about latencies and errors.

type Runner

type Runner interface {
	// Start is called when your application should start another processing
	// thread. The Start function must be blocking. Start must stop processing
	// when there is an element that can be read from stopper. All processing
	// in Start must report its latency, possible errors, and if it has run out
	// of work, to r.
	Start(stopper <-chan struct{}, r Reporter)
}

Runner is an interface implemented by you. It starts a process.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool keeps track of current running processes. It starts and stops them.

func NewWorkerPool

func NewWorkerPool(r Runner, re Reporter, opts ...WorkerPoolOpts) *WorkerPool

NewWorkerPool creates an WorkerPool. The orchestrator starts with WantedN set to zero. Call Stop(...) to properly clean up after usage.

func (*WorkerPool) ActualN

func (o *WorkerPool) ActualN() uint

ActualN returns the number of processes currently running.

func (*WorkerPool) Decr

func (o *WorkerPool) Decr(n uint)

Decr reduces the number of running processes. They will be closed async. To wait for them to have shut down, call SettleDown().

func (*WorkerPool) Incr

func (o *WorkerPool) Incr(n uint)

Incr increases the number of running processes. To wait for them to have shut down, call SettleDown().

func (*WorkerPool) SettleDown

func (o *WorkerPool) SettleDown(ctx context.Context)

Settle waits for WantedN to be the same as ActualN.

func (*WorkerPool) WantedN

func (o *WorkerPool) WantedN() uint

WantedN returns the number of processes we want running.

type WorkerPoolMetrics

type WorkerPoolMetrics interface {
	Incr(n uint)
	Decr(n uint)
	Restart()
}

WorkerPoolMetrics is called for different events in the orchestrator.

type WorkerPoolOpts

type WorkerPoolOpts func(*WorkerPool)

func WithMetrics

func WithMetrics(metrics WorkerPoolMetrics) WorkerPoolOpts

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL