README

nursery: structured concurrency in Go

GoDoc GoReportCard CircleCI gopherbadger-tag-do-not-edit

RunConcurrently(
    // Job 1
    func(context.Context, chan error) {
        time.Sleep(time.Millisecond * 10)
        log.Println("Job 1 done...")
    },
    // Job 2
    func(context.Context, chan error) {
        time.Sleep(time.Millisecond * 5)
        log.Println("Job 2 done...")
    },
)
log.Println("All jobs done...")

Installation

go get -u github.com/arunsworld/nursery

Notes on structured concurrency, or: Go statement considered harmful is an article that compares the dangers of goto with the go statement.

While I don't necessarily agree with the entire content I can appreciate that even with Go's high-level abstraction of concurrency using Goroutines, Channels & the select statement it is possible to end up with unreadable code, deadlocks, leaked goroutines, race conditions and poor error handling.

Implementing a higher-level abstraction for the use-cases mentioned is very straightforward in Go and this simple package provides just that.

The following functions are provided:

  • RunConcurrently(jobs ...ConcurrentJob) error: takes an array of ConcurrentJobs and runs them concurrently ensuring that all jobs are completed before the call terminates. If all jobs terminate cleanly error is nil; otherwise the first non-nil error is returned.
  • RunConcurrentlyWithContext(parentCtx context.Context, jobs ...ConcurrentJob) error: is the RunConcurrently behavior but additionally wraps a context that's passed in allowing cancellations of the parentCtx to get propagated.
  • RunMultipleCopiesConcurrently(copies int, job ConcurrentJob) error: makes copies of the given job and runs them concurrently. This is useful for cases where we want to execute multiple slow consumers taking jobs from a channel until the job is finished. The channel itself can be fed by a producer that is run concurrently with the job running the consumers. Each job's context is also passed an unique index with key nursery.JobID - a 0 based int - that maybe used as a job identity if required.
  • RunMultipleCopiesConcurrentlyWithContext(ctx context.Context, copies int, job ConcurrentJob) error: is the RunMultipleCopiesConcurrently behavior with a context that allows cancellation to be propagated to the jobs.
  • RunUntilFirstCompletion(jobs ...ConcurrentJob) error: takes an array of ConcurrentJobs and runs them concurrently but terminates after the completion of the earliest completing job. A key point here is that despite early termination it blocks until all jobs have terminated (ie. released any used resources). If all jobs terminate cleanly error is nil; otherwise the first non-nil error is returned.
  • RunUntilFirstCompletionWithContext(parentCtx context.Context, jobs ...ConcurrentJob) error: is the RunUntilFirstCompletion behavior but additionally wraps a context that's passed in allowing cancellations of the parentCtx to get propagated.
  • RunConcurrentlyWithTimeout(timeout time.Duration, jobs ...ConcurrentJob) error: is similar in behavior to RunConcurrently except it also takes a timeout and can cause the function to terminate earlier if timeout has expired. As before we wait for all jobs to have cleanly terminated.
  • RunUntilFirstCompletionWithTimeout(timeout time.Duration, jobs ...ConcurrentJob) error: is similar in behavior to RunUntilFirstCompletion with an additional timeout clause.

ConcurrentJob is a simple function that takes a context and error channel. We need to ensure that we're listening to the Done() channel on context and if invoked to clean-up resources and bail out. Errors are to be published to the error channel for proper handling.

Note: while this package simplifies the semantics of defining and executing concurrent code it cannot protect against bad concurrent programming such as using shared resources across jobs leading to data corruption or panics due to race conditions.

You may also be interested in reading Structured Concurrency in Go.

The library includes a utility function: IsContextDone(context.Context) to check if the passed in context is done or not. This can be used as a guard clause in a for loop within a ConcurrentJob using the passed in context to decide whether to stop processing and return or continue.

Documentation

Overview

    Package nursery implements "structured concurrency" in Go.

    It's based on this blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/

    Example
    Output:
    
    

    Index

    Examples

    Constants

    View Source
    const JobID = jobIDKey("id")

      JobID is the key used to identify the JobID from the context for jobs running in copies

      Variables

      This section is empty.

      Functions

      func IsContextDone

      func IsContextDone(ctx context.Context) bool

        IsContextDone is a utility function to check if the context is Done/Cancelled.

        func RunConcurrently

        func RunConcurrently(jobs ...ConcurrentJob) error

          RunConcurrently runs jobs concurrently until all jobs have either finished or any one job encountered an error.

          func RunConcurrentlyWithContext

          func RunConcurrentlyWithContext(parentCtx context.Context, jobs ...ConcurrentJob) error

            RunConcurrentlyWithContext runs jobs concurrently until all jobs have either finished or any one job encountered an error. It wraps the parent context - so if the parent context is Done the jobs get the signal to wrap up

            func RunConcurrentlyWithTimeout

            func RunConcurrentlyWithTimeout(timeout time.Duration, jobs ...ConcurrentJob) error

              RunConcurrentlyWithTimeout runs jobs concurrently until all jobs have either finished or any one job encountered an error. or the timeout has expired

              func RunMultipleCopiesConcurrently

              func RunMultipleCopiesConcurrently(copies int, job ConcurrentJob) error

                RunMultipleCopiesConcurrently runs multiple copies of the given job until they have all finished or any one has encountered an error. The passed context can be optionally checked for an int value with key JobID counting up from 0 to identify uniquely the copy that is run.

                func RunMultipleCopiesConcurrentlyWithContext

                func RunMultipleCopiesConcurrentlyWithContext(ctx context.Context, copies int, job ConcurrentJob) error

                  RunMultipleCopiesConcurrentlyWithContext runs multiple copies of the given job until they have all finished or any one has encountered an error. The passed context can be optionally checked for an int value with key JobID counting up from 0 to identify uniquely the copy that is run. It wraps the parent context - so if the parent context is Done the jobs get the signal to wrap up

                  func RunUntilFirstCompletion

                  func RunUntilFirstCompletion(jobs ...ConcurrentJob) error

                    RunUntilFirstCompletion runs jobs concurrently until atleast one job has finished or any job has encountered an error.

                    func RunUntilFirstCompletionWithContext

                    func RunUntilFirstCompletionWithContext(parentCtx context.Context, jobs ...ConcurrentJob) error

                      RunUntilFirstCompletionWithContext runs jobs concurrently until atleast one job has finished or any job has encountered an error.

                      func RunUntilFirstCompletionWithTimeout

                      func RunUntilFirstCompletionWithTimeout(timeout time.Duration, jobs ...ConcurrentJob) error

                        RunUntilFirstCompletionWithTimeout runs jobs concurrently until atleast one job has finished or any job has encountered an error or the timeout has expired.

                        Types

                        type ConcurrentJob

                        type ConcurrentJob func(context.Context, chan error)

                          ConcurrentJob contains procedural code that can run concurrently to another. Please ensure that you're listening to `context.Done()` - at which point you're required to clean up and exit. Publish any errors into the error channel but note that only the first error across the jobs will be returned. Finally ensure that you're not unsafely modifying shared state without protection and using go's built in channels for communicating rather than sharing memory.

                          Example
                          Output:
                          
                          

                          Source Files