xsync

package module
v0.0.0-...-b164660 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2020 License: MIT Imports: 4 Imported by: 0

README

Concurrency utilities

This go library contains a mix of simple wrappers and utilities that we have found useful. We will keep adding more conccurrency primitives and utilities here.

AtMost every

Often when we are logging information, printing debugging information, or gathering some metrics, we may want to only do so every so often so that we do not spam the receivers. Ideally, a logging library should provide such a primitive, but several don't.

As an example, if we want to log at most once per minute, you can write the following:

import "github.com/nauto/xsync"

...

atMostEveryMinute := xsync.AtMost{Every: time.Minute}
...
for i := 0; true; i++ {
    ...
    // The following line will print at most once per minute
    atMostEveryMinute.Run(func(){ log.Infof("Iteration %v\n", i)})
    ...
}
...
LimitedWaitGroup

Often there are times when we want to run several goroutines in parallel while also limiting the number launched and running concurrently. You might want to use Go's sync.WaitGroup. However, its semantics do not provide a way to limit the number of goroutines launched. This is what a Semaphore helps with. LimitedWaitGroup is a very thin wrapper around Go's Semaphore library but provides WaitGroup like semantics and can function as a drop-in replacement for WaitGroup. The only difference is that the Add method blocks till there are resources available.

Example:

import "github.com/nauto/xsync"

...

wg := xsync.NewLimitedWaitGroup(100)

for i := 0; i < 1000; i++ {
    wg.Add(1)

    // There will only be at most 100 of the goroutines below launched and
    // running at the same time.
    go func() {
        defer wg.Done()
        // your code
    }()
}
wg.Wait()
Worker Pool

When there is a stream of possibly expensive jobs coming in such that a single thread cannot keep up, we have no choice but to process them in parallel. One way is to have many instances of the service running in parallel processing these jobs. Often this may result in waste especially with larger machines and when each instance needs to load a lot of resources in order to process these jobs. In Go, one might want to use Goroutines to process this incoming stream of jobs. Essentially, we launch a new goroutine for each new job and can use semaphores or LimitedWaitGroup to keep the number of simultaneously running goroutines manageable.

Given that goroutines are cheap to create and destroy, this is almost always a good solution. In other languages that do not have the equivalent of goroutines and only threads, one would typically create a pool of threads to process these jobs that once created are never destroyed because threads are expensive to switch between, create, and destroy.

Even though thread pools have their downsides, there may sometimes be reasons where we are required to keep some data between jobs and perhaps process all related jobs in the same thread, which may help avoid the need for some locks or transactions.

Here, we provide a simple interface to create worker pools in Go for those situations where other solutions would not work. There are two kinds of worker pools:

Whichever is free

Whichever worker gets free first picks the next job. The following example shows how you can create such a worker pool:

import "github.com/nauto/xsync"

...

ctx, cancel := context.WithCancel(context.Background())
pool := xsync.WorkerPool{
    Ctx:        ctx,
    Cancel:     cancel,
    NumWorkers: 5,
    BufferSize: 10,  // Maximum backlog before Posting blocks.
    Worker: func(ctx context.Context, messages <-chan interface{}, worker int) error {
        for {
            select {
            case <-ctx.Done():  // The context was cancelled.
                return nil  
            case msg := <-messages:
                // Process message
            }
        }  
        return nil
    },
}
// Start the Worker pool. This does not return till the pool shuts down. 
pool.Run()
...
...
// In another goroutine/thread, post messages for processing by this worker pool.
pool.Post(msg1)
pool.Post(msg2)
...
Sharded

Say if there are several related jobs coming in, e.g., account updates that are persisted in an in-memory store. If these were going to different workers, each worker would have to lock the entry corresponding to the user before making any updates. On the other hand, if the updates for a particular id are all done from the same thread, there would be no need for locking greatly improving performance.

Creating a sharded worker pool is exactly the same as before except that you need to provide a hash function:

...
pool := xsync.WorkerPool{
    ...
    Hash: func hash(i interface{}) uint32 {
        return uint32(/* hash of i */)
    }
    ...
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AtMost

type AtMost struct {
	Every time.Duration
	// contains filtered or unexported fields
}

AtMost can be used to execute a function at most once in the specified duration. It's thread-safe to call this from multiple go-routines.

AtMost tries to keep the critical region as small as possible. As a result, if the specified function takes too long to run compared to the duration, another copy of f may be started while the first one is running.

func (*AtMost) Run

func (a *AtMost) Run(f func())

Run will execute the given function at most once in the specified duration.

type HashFunc

type HashFunc func(interface{}) uint32

HashFunc is the signature of a method to use as a hash function for WorkerPool.

type LimitedWaitGroup

type LimitedWaitGroup struct {
	// contains filtered or unexported fields
}

LimitedWaitGroup is similar to sync.WaitGroup, but allows for a limit for the number of concurrently running routines. Essentially it provides an interface that combines Semaphore and WaitGroup semantics.

Its usage is similar, but the Add method now blocks. The number of routines executing is controlled by the Add method.

Example:

wg := NewLimitedWaitGroup(100)

for i := 0; i < 1000; i++ {
	wg.Add(1)

	go func() {
		defer wg.Done()

		// your code
	}()
}

wg.Wait()

func NewLimitedWaitGroup

func NewLimitedWaitGroup(limit int) *LimitedWaitGroup

func (*LimitedWaitGroup) Add

func (wg *LimitedWaitGroup) Add(n int64)

Add uses a resource. It blocks if there are no resources are available. Resources can be released by calls to Done().

func (*LimitedWaitGroup) Done

func (wg *LimitedWaitGroup) Done()

Done is similar to sync.WaitGroup.Done. It releases a resource back to the pool.

func (*LimitedWaitGroup) Wait

func (wg *LimitedWaitGroup) Wait()

Wait is similar to sync.WaitGroup.Wait. It waits till all acquired resources have been released.

type WorkerFunc

type WorkerFunc func(ctx context.Context, messages <-chan interface{}, shard int) error

WorkerFunc is the signature of a method to use for WorkerPool.

type WorkerPool

type WorkerPool struct {

	// Set these fields before calling Run.
	Ctx        context.Context
	Cancel     context.CancelFunc
	NumWorkers int
	BufferSize int

	// "Override" these function pointers.
	Hash   HashFunc
	Worker WorkerFunc
	// contains filtered or unexported fields
}

WorkerPool manages a pool of workers where incoming posted items are optionally sharded and sent to the appropriate worker or the next available worker. The sharding is useful when, for example, one needs to have a pool of workers, but have messages from the same id (used for hashing) go to the same worker each time.

See tests for sample uses.

func (*WorkerPool) Post

func (p *WorkerPool) Post(datum interface{}) error

Post routes the given datum to the appropriate shard. If the channel buffer for this shard is full, this method blocks. In the case of an ordinary worker pool, it's posted on a common channel, which is shared between all workers.

If the running context has been cancelled (or timed out), it returns the corresponding error and does not post to the channels.

func (*WorkerPool) Run

func (p *WorkerPool) Run()

Run initializes and starts a new worker pool using the set worker function with the specified number of workers/shards and buffer size. It also takes a context with the context's cancel function.

If a worker finishes with a non-nil error, the context, which is the same as the passed-in context and is shared with all the workers, is cancelled. Any workers checking for the context cancellation can then choose to gracefully finish whatever they are doing. This mechanism can be used if a single worker wants to request a graceful shutdown for the whole system including other worker pools.

If the set hash function is nil, an ordinary worker pool is created; if it is not nil, a sharded worker pool with one shard for each worker is created.

Run does not return till all the workers have finished.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL