cuba

package module
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2019 License: MIT Imports: 5 Imported by: 8

README

Project Cuba

Experiment in allowing workers to own the means of production.

Go makes many parallel cases easy to implement. Cuba aims to simplify some of the cases that aren't as easy to implement.

If your algorithm can handle unbounded parallelism, then just spawn thousands of goroutines and let the Go runtime figure out how to make it work. However, this doesn't work if you have limits on external resources like open file descriptors or database connection handles. It also may not be the most memory efficient since each goroutine needs its own call stack.

For bounded parallelism, Go's chans allow splitting a task into a sequence of steps that happen in parallel. This model supports fanning in and out to increase the parallelism beyond the number of sequential steps.

The limitation of using chans is that they only work so long as the pipeline of work is unidirectional. Simple linear sequences are easiest, but you can construct any acyclical graph.

Cuba aims to support parallelism where the dataflow may be cyclical.

One example is a crawler style algorithm which involves both taking a node to process, and pushing newly discovered nodes back onto the queue.

Another example is backing off and retrying without head-of-line blocking. Work items could be pushed to the back of the queue to be retried when they come around again.

Usage

First, define a worker function:

func doWork(handle *cuba.Handle) {
	item := handle.Item().(myItemType)

  // Do something with item

  for _, newItem := range newItemsFound {
    handle.Push(newItem)
    // Optionally: handle.Sync()
  }
}

Normally, handle.Push() buffers the new items before releasing them back to the work pool. When the function returns, the pool mutex will be aquired once and the items pushed as a batch. This means that other threads may sit idle until the function returns.

Calling handle.Sync() will immediately aquire the lock and push any items in the buffer. This will increase lock contention, but may improve parallelism if you have long running workers.

Then initialize a pool, seed with initial items, and wait for processing to complete.

  // Initialize a new pool.
  // Swap `cuba.NewQueue()` for `cuba.NewStack()` for LIFO order
	pool := cuba.New(doWork, cuba.NewQueue())

  // Optionally: pool.SetMaxWorkers(n)

  // Seed the pool with initial work
  // Workers are started as soon as something is available to process
  pool.Push(myFirstItem)

  // Wait for the workers to finish processing all work and terminate
  pool.Finish()

By default, cuba pools have a maximum thread count equal to runtime.NumCPU(). This can be changed by calling pool.SetMaxWorkers.

Documentation

Index

Constants

View Source
const (
	POOL_RUN = iota
	POOL_FINISH
	POOL_ABORT
)

Variables

View Source
var PoolAbortedErr = errors.New("pool has been aborted")

Functions

This section is empty.

Types

type Bucket added in v0.2.0

type Bucket interface {
	Push(interface{})
	PushAll([]interface{})
	Pop() interface{}
	IsEmpty() bool
	Empty()
}

type Handle added in v0.2.0

type Handle struct {
	// contains filtered or unexported fields
}

func (*Handle) Item added in v0.2.0

func (handle *Handle) Item() interface{}

func (*Handle) Push added in v0.2.0

func (handle *Handle) Push(item interface{})

func (*Handle) Sync added in v0.2.0

func (handle *Handle) Sync()

type Pool added in v0.2.0

type Pool struct {
	// contains filtered or unexported fields
}

func New added in v0.2.0

func New(task Task, bucket Bucket) *Pool

Constructs a new Cuba thread pool.

The worker callback will be called by multiple goroutines in parallel, so is expected to be thread safe.

Bucket affects the order that items will be processed in. cuba.NewQueue() provides FIFO ordering, while cuba.NewStack() provides LIFO ordered work.

func (*Pool) Abort added in v0.3.0

func (pool *Pool) Abort()

func (*Pool) Finish added in v0.2.0

func (pool *Pool) Finish()

Calling Finish() waits for all work to complete, and allows goroutines to shut down.

func (*Pool) Push added in v0.2.0

func (pool *Pool) Push(item interface{}) error

Push an item into the worker pool. This will be scheduled to run on a worker immediately.

func (*Pool) PushAll added in v0.2.0

func (pool *Pool) PushAll(items []interface{}) error

Push multiple items into the worker pool.

Compared to Push() this only aquires the lock once, so may reduce lock contention.

func (*Pool) SetMaxWorkers added in v0.2.0

func (pool *Pool) SetMaxWorkers(n int32)

Sets the maximum number of worker goroutines.

Default: runtime.NumCPU() (i.e. the number of CPU cores available)

type Queue added in v0.2.0

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue added in v0.2.0

func NewQueue() *Queue

func (*Queue) Empty added in v0.2.0

func (queue *Queue) Empty()

func (*Queue) IsEmpty added in v0.3.0

func (queue *Queue) IsEmpty() bool

func (*Queue) Pop added in v0.2.0

func (queue *Queue) Pop() interface{}

func (*Queue) Push added in v0.2.0

func (queue *Queue) Push(item interface{})

func (*Queue) PushAll added in v0.2.0

func (queue *Queue) PushAll(items []interface{})

type Stack added in v0.2.0

type Stack struct {
	// contains filtered or unexported fields
}

func NewStack

func NewStack() *Stack

func (*Stack) Empty added in v0.2.0

func (stack *Stack) Empty()

func (*Stack) IsEmpty added in v0.3.0

func (stack *Stack) IsEmpty() bool

func (*Stack) Pop added in v0.2.0

func (stack *Stack) Pop() interface{}

func (*Stack) Push added in v0.2.0

func (stack *Stack) Push(item interface{})

func (*Stack) PushAll added in v0.2.0

func (stack *Stack) PushAll(items []interface{})

type Task added in v0.2.0

type Task func(*Handle)

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL