workqueue

package module
v0.0.0-...-ca00916 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2017 License: Apache-2.0 Imports: 4 Imported by: 0

README

WorkQueue

GoDoc

A Go library for queuing and executing a set of tasks with a user-defined concurrency level. Includes support for timing out and/or cancelling the tasks in the queue.

Provides a primitive Future-like abstraction for obtaining the results of a task.

go get github.com/charithe/workqueue

Usage

wq := workqueue.New(8, 16)
defer wq.Shutdown(true)

// When the task reaches the front of the queue, the associated context will be used to determine whether
// the task should be executed or not. If the context hasn't been cancelled, the task will be started and
// the context will be passed to it as the argument.
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()

f, err := wq.Submit(ctx, func(c context.Context) *workqueue.Result {
    // do work
    // in case of error, return workqueue.ErrorResult(err) instead
    return workqueue.SuccessResult("result")
})

// If the number of queued tasks exceed the limit, ErrQueueFull will be returned
if err == workqueue.ErrQueueFull {
    fmt.Println("Pool queue is full")
    return
}

// Wait for the task to complete for 10 seconds
v, err := f.GetWithTimeout(10 * time.Second)
if err != nil {
    if err == workqueue.ErrFutureTimeout {
        fmt.Println("Timed out waiting for result")
    } else {
        fmt.Printf("Task failed: %+v\n", err)
    }
    return
}

fmt.Printf("Task result: %s\n", v)

Documentation

Overview

workpool allows a bounded set of async tasks to execute concurrently

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrQueueFull       = errors.New("Queue is full")
	ErrQueueShutdown   = errors.New("Queue is shutdown")
	ErrFutureCompleted = errors.New("Future completed")
	ErrFutureTimeout   = errors.New("Timeout")
)

Functions

This section is empty.

Types

type Future

type Future interface {
	// GetWithTimeout waits for the completion of the async task for the given amount of time.
	// If the task completes within the timeout, result of the task is returned. Otherwise, ErrFutureTimeout
	// is returned as the error. Get can be called as many times as needed until the task completes.
	// When a call to Get has returned the results of the completed task, subsequent calls to Get will
	// return ErrFutureCompleted as an error.
	GetWithTimeout(timeout time.Duration) (interface{}, error)

	// GetWithContext waits for the completion of the async task until the context times out or is cancelled.
	// The behaviour when this function is called multiple times is identical to GetWithTimeout.
	GetWithContext(ctx context.Context) (interface{}, error)

	// Cancel attempts to cancel the async task. This is a best efforts attempt to perform the cancellation.
	// If the task is still in the queue, its' context will be cancelled, causing the WorkPool to reject the
	// task. If the task is already executing, then it is the user's responsibility to ensure that it honours
	// the cancellation of the context passed to it by the runtime.
	Cancel()
}

Future is a container for the results of an async task that may or may not have completed yet

type ImmediateFuture

type ImmediateFuture struct {
	Val interface{}
	Err error
}

func NewImmediateFuture

func NewImmediateFuture(val interface{}, err error) *ImmediateFuture

NewImmediateFuture is a Future that returns immeidately

func (*ImmediateFuture) Cancel

func (f *ImmediateFuture) Cancel()

func (*ImmediateFuture) GetWithContext

func (f *ImmediateFuture) GetWithContext(ctx context.Context) (interface{}, error)

func (*ImmediateFuture) GetWithTimeout

func (f *ImmediateFuture) GetWithTimeout(timeout time.Duration) (interface{}, error)

type Result

type Result struct {
	Value interface{}
	Err   error
}

Result holds the results of a task execution

func ErrorResult

func ErrorResult(err error) *Result

func SuccessResult

func SuccessResult(value interface{}) *Result

type Task

type Task func(context.Context) *Result

type WorkQueue

type WorkQueue struct {
	// contains filtered or unexported fields
}

func New

func New(maxGoRoutines, queueSize int) *WorkQueue

New creates a new WorkQueue that will execute at most maxGoRoutines concurrently and hold queueSize tasks in the backlog awaiting an execution slot.

func (*WorkQueue) IsShutdown

func (wq *WorkQueue) IsShutdown() bool

IsShutdown returns true if the queue has been shutdown

func (*WorkQueue) Shutdown

func (wq *WorkQueue) Shutdown(wait bool)

Shutdown gracefully shuts down the queue. It stops accepting any new tasks but will continue executing the already enqueued tasks to their completion. To avoid processing the queue, cancel the contexts associated with the enqueued tasks. Setting the wait parameter to true will cause the call to block until the queue finishes shutting down.

func (*WorkQueue) Submit

func (wq *WorkQueue) Submit(ctx context.Context, task Task) (Future, error)

Submit attempts to enqueue the given task. If the queue has enough space, it will be accepted and executed as soon as it reaches the front of the queue. The context parameter can be used to cancel the task execution if it has been in the queue for too long. It will also be passed to the task at the start of execution. Returns ErrQueueFull when the queue is full. If the WorkQueue is shutdown, ErrQueueShutdown will be returned.

Example
wq := New(8, 16)
defer wq.Shutdown(true)

// When the task reaches the front of the queue, the associated context will be used to determine whether
// the task should be executed or not. If the context hasn't been cancelled, the task will be started and
// the context will be passed to it as the argument.
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()

f, err := wq.Submit(ctx, func(c context.Context) *Result {
	// do work
	// in case of error, return ErrorResult(err) instead
	return SuccessResult("result")
})

// If the number of queued tasks exceed the limit, ErrPoolFull will be returned
if err == ErrQueueFull {
	fmt.Println("Work queue is full")
	return
}

// Wait for the task to complete for 10 seconds
v, err := f.GetWithTimeout(10 * time.Second)
if err != nil {
	if err == ErrFutureTimeout {
		fmt.Println("Timed out waiting for result")
	} else {
		fmt.Printf("Task failed: %+v\n", err)
	}
	return
}

fmt.Printf("Task result: %s\n", v)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL