gobs

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2021 License: MIT Imports: 2 Imported by: 0

README

Gobs is a tiny worker pool

Go Reference License Build Status Coverage Status Go Report Card

Package gobs implements a simple job queue where each individual job is run concurrently in its own goroutine while ensuring that no more than a given number of jobs can be ran at a time. It provides methods to ensure all jobs have been completed, and to capture errors.

Documentation

Overview

Package gobs implements a simple job queue where each individual job is run concurrently in its own goroutine while ensuring that no more than a given number of jobs can be ran at a time. It provides methods to ensure all jobs have been completed, and to capture errors

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch is a holder for a group of jobs to be run in the Pool. Batch exposes a Wait() function which allows code to block while waiting for all jobs of the batch to be completed

func (*Batch) Submit

func (b *Batch) Submit(job Job)

Submit adds a new job to the batch in the worker pool. Submit blocks until the pool's concurrency setting allows the job to start running, then launches the job in a new goroutine. To track the job's completion, use Batch.Wait()

func (*Batch) Wait

func (b *Batch) Wait() error

Wait blocks until all job submitted to the batch have completed. Once Wait has been called, no further jobs should be submitted to the batch.

Wait returns a MultiError that can be used to inspect individual job errors

type Job

type Job func() error

Job is a unit of work that returns a non-nil error in case of failure

type MultiError

type MultiError interface {
	error
	Errors() []error
}

MultiError is an error that handles multiple unrelated errors

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a worker pool that accepts a bounded number of Jobs

func NewPool

func NewPool(concurrency int) *Pool

NewPool creates a worker pool garanteeing that no more than concurrency jobs will be running at a given instant

Example
pool := NewPool(2)
pool.Submit(func() error {
	time.Sleep(10 * time.Millisecond)
	return nil
})
pool.Submit(func() error {
	time.Sleep(10 * time.Millisecond)
	return nil
})
pool.Stop()
// both goroutines are now complete

func (*Pool) Batch

func (p *Pool) Batch() *Batch

Batch creates a new holder for a group of jobs to be run in the Pool.

Example
pool := NewPool(2)
status := pool.Submit(func() error {
	time.Sleep(100 * time.Millisecond)
	return nil
})
batch := pool.Batch()
batch.Submit(func() error {
	time.Sleep(10 * time.Millisecond)
	return fmt.Errorf("an error 1 occured")
})
batch.Submit(func() error {
	time.Sleep(10 * time.Millisecond)
	return fmt.Errorf("an error 2 occured")
})
batch.Submit(func() error {
	time.Sleep(10 * time.Millisecond)
	return nil
})
err := batch.Wait()
if err != nil {
	//err.Error() is "an error 1 occured (and 1 other errors)" or "an error 2 occured (and 1 other errors)"
	//err.(MultiError).Errors() is a slice of 2 errors
}

// this job is still running in the pool, i.e. not affected by batch.Wait()
_ = status.Wait()

func (*Pool) Stop

func (p *Pool) Stop()

Stop blocks until all submitted jobs have completed, then frees all resources created by the pool.

Submitting a new Job to the Pool once Stop has been called will deadlock and/or panic. Calling Stop more than once will deadlock and/or panic

func (*Pool) Submit

func (p *Pool) Submit(job Job) *Status

Submit adds a new job to the worker pool. Submit blocks until the pool's concurrency setting allows the job to start running, then launches the job in a new goroutine. It returns a Status that can be used to track the job completion and/or error

Example
pool := NewPool(2)
status := pool.Submit(func() error {
	time.Sleep(10 * time.Millisecond)
	return fmt.Errorf("an error occured")
})

//do some other stuff while waiting for job to complete

err := status.Wait()
if err != nil {
	//err.Error() == "an error occured"
}

type Status

type Status struct {
	// contains filtered or unexported fields
}

Status tracks the completion of a Job

func (*Status) Wait

func (s *Status) Wait() error

Wait blocks until the associated Job has terminated. It returns the error returned by the Job

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL