Documentation
¶
Overview ¶
workpool allows a bounded set of async tasks to execute concurrently
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type Future ¶
type Future interface { // GetWithTimeout waits for the completion of the async task for the given amount of time. // If the task completes within the timeout, result of the task is returned. Otherwise, ErrFutureTimeout // is returned as the error. Get can be called as many times as needed until the task completes. // When a call to Get has returned the results of the completed task, subsequent calls to Get will // return ErrFutureCompleted as an error. GetWithTimeout(timeout time.Duration) (interface{}, error) // GetWithContext waits for the completion of the async task until the context times out or is cancelled. // The behaviour when this function is called multiple times is identical to GetWithTimeout. GetWithContext(ctx context.Context) (interface{}, error) // Cancel attempts to cancel the async task. This is a best efforts attempt to perform the cancellation. // If the task is still in the queue, its' context will be cancelled, causing the WorkPool to reject the // task. If the task is already executing, then it is the user's responsibility to ensure that it honours // the cancellation of the context passed to it by the runtime. Cancel() }
Future is a container for the results of an async task that may or may not have completed yet
type ImmediateFuture ¶
type ImmediateFuture struct { Val interface{} Err error }
func NewImmediateFuture ¶
func NewImmediateFuture(val interface{}, err error) *ImmediateFuture
NewImmediateFuture is a Future that returns immeidately
func (*ImmediateFuture) Cancel ¶
func (f *ImmediateFuture) Cancel()
func (*ImmediateFuture) GetWithContext ¶
func (f *ImmediateFuture) GetWithContext(ctx context.Context) (interface{}, error)
func (*ImmediateFuture) GetWithTimeout ¶
func (f *ImmediateFuture) GetWithTimeout(timeout time.Duration) (interface{}, error)
type Result ¶
type Result struct { Value interface{} Err error }
Result holds the results of a task execution
func ErrorResult ¶
func SuccessResult ¶
func SuccessResult(value interface{}) *Result
type WorkQueue ¶
type WorkQueue struct {
// contains filtered or unexported fields
}
func New ¶
New creates a new WorkQueue that will execute at most maxGoRoutines concurrently and hold queueSize tasks in the backlog awaiting an execution slot.
func (*WorkQueue) IsShutdown ¶
IsShutdown returns true if the queue has been shutdown
func (*WorkQueue) Shutdown ¶
Shutdown gracefully shuts down the queue. It stops accepting any new tasks but will continue executing the already enqueued tasks to their completion. To avoid processing the queue, cancel the contexts associated with the enqueued tasks. Setting the wait parameter to true will cause the call to block until the queue finishes shutting down.
func (*WorkQueue) Submit ¶
Submit attempts to enqueue the given task. If the queue has enough space, it will be accepted and executed as soon as it reaches the front of the queue. The context parameter can be used to cancel the task execution if it has been in the queue for too long. It will also be passed to the task at the start of execution. Returns ErrQueueFull when the queue is full. If the WorkQueue is shutdown, ErrQueueShutdown will be returned.
Example ¶
wq := New(8, 16) defer wq.Shutdown(true) // When the task reaches the front of the queue, the associated context will be used to determine whether // the task should be executed or not. If the context hasn't been cancelled, the task will be started and // the context will be passed to it as the argument. ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second) defer cancelFunc() f, err := wq.Submit(ctx, func(c context.Context) *Result { // do work // in case of error, return ErrorResult(err) instead return SuccessResult("result") }) // If the number of queued tasks exceed the limit, ErrPoolFull will be returned if err == ErrQueueFull { fmt.Println("Work queue is full") return } // Wait for the task to complete for 10 seconds v, err := f.GetWithTimeout(10 * time.Second) if err != nil { if err == ErrFutureTimeout { fmt.Println("Timed out waiting for result") } else { fmt.Printf("Task failed: %+v\n", err) } return } fmt.Printf("Task result: %s\n", v)