conpats

module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2026 License: MIT

README

conpats banner

conpats

conpats contains several common concurrency patterns for convenient use.

Go Reference Go Report Card codecov Tag

go get github.com/kiriyms/conpats

Table of Contents

Quick Rundown

  • conpats provides Worker Pool, Pipeline and Tee.
Worker Pool
  • Use pool.Pool when you need to run jobs concurrently with a goroutine limit.
  • Use pool.ErrorPool when you need to run jobs that return errors concurrently with a giroutine limit.
  • Use pool.ContextPool when you need to run jobs that return errors and receive a context.Context argument concurrently with a giroutine limit.

Every Pool must be created using pool.New(...). To convert it use:

  • .New(...).WithErrors() to get a pool.ErrorPool.
  • .New(...).WithErrors().WithContext(ctx) to get a pool.ContextPool, where the ctx paramater specifies your parent context that needs to be passed to all your jobs.
Pipeline
  • Use pipe.PipeFromChan(...) when you need to run all input values from a given channel through a function concurrently.
  • Use pipe.PipeFromSlice(...) when you need to run all values of a given slice through a function concurrently.

Both Pipe functions return channels, making it easy to chain several pipes together or using the output channel in other ways, for example:

  • Use pipe.Collect(chan) when you want to block and collect results from a channel into a slice until it is closed.

The Pipeline implementation uses the pool.Pool by default, but can be modified:

  • Use pipe.WithPool(pool) option parameter to specify the Worker Pool implementation that the Pipe will use.
Tee
  • Use tee.NewTee(chan) to create several channels (buffered or unbuffered) that each receive a copy of a value from a provided chan channel.

Goals

Main goals of this package are:

  1. Make concurrency easier and reduce boilerplate
  2. Provide a variety of common concurrency patterns in one place
  3. Avoid any third-party dependencies

Usage

This section provides simple usage examples of Worker Pool, Pipeline and Tee usage compared to manual implementation. More examples can be found in these patterns' respective READMEs: Pool, Pipe, Tee.

Worker Pool
Manual Using pool.Pool
func main() {
	wg := sync.WaitGroup{}
	jobs := make(chan func())
	for i := 0; i < 10; i++ {
		wg.Go(func() {
			for job := range jobs {
				job()
			}
		})
	}

	for i := 0; i < 100; i++ {
		jobs <- doWork
	}
	close(jobs)
	wg.Wait()
}
func main() {
	p := pool.New(10)
	for i := 0; i < 100; i++ {
		p.Go(doWork)
	}
	p.Wait()
}
Pipeline
Manual Using pipe.PipeFromChan()
func main() {
		nums := []int{1, 2, 3, 4, 5}

	in := make(chan int)
	go func() {
		defer close(in)
		for _, n := range nums {
			in <- n
		}
	}()

	sqrtChan := make(chan float64)
	wgSqrt := sync.WaitGroup{}
	go func() {
		defer close(sqrtChan)
		defer wgSqrt.Wait()
		for i := 0; i < 5; i++ {
			wgSqrt.Add(1)
			go func() {
				defer wgSqrt.Done()
				for n := range in {
					sqrtChan <- float64(math.Sqrt(float64(n)))
				}
			}()
		}
	}()

	logChan := make(chan string)
	wgLog := sync.WaitGroup{}
	go func() {
		defer close(logChan)
		defer wgLog.Wait()
		for i := 0; i < 3; i++ {
			wgLog.Add(1)
			go func() {
				defer wgLog.Done()
				for sq := range sqrtChan {
					logChan <- fmt.Sprintf("Sqrt: %.2f", sq)
				}
			}()
		}
	}()

	results := make([]string, 0)
	for log := range logChan {
		results = append(results, log)
	}
}
func main() {
	nums := []int{1, 2, 3, 4, 5}

	sqrtChan := pipe.PipeFromSlice(func(n int) float64 {
		return math.Sqrt(float64(n))
	}, nums, 5)

	logChan := pipe.PipeFromChan(func(n float64) string {
		return fmt.Sprintf("Sqrt: %.2f", n)
	}, sqrtChan, 2)

	results := pipe.Collect(logChan)
}
Tee
Manual Using tee.NewTee()
func main() {
	in := make(chan int)
	outs := make([]chan int, 3)
	for i := range 3 {
		outs[i] = make(chan int)
	}

	go func() {
		defer func() {
			for _, out := range outs {
				close(out)
			}
		}()

		for item := range in {
			for _, out := range outs {
				out <- item
			}
		}
	}()
}
func main() {
	in := make(chan int)
	outs := tee.NewTee(in, 3, 0)
}

Note: if one of the output channels is blocked and waiting to be read from, it will cause all other output channels to block too.

Cookbook

The concurrency pattern abstractions in conpats can be easily combined with each other.

To see usage examples that are more complex and closer to real-world problems, check out the Cookbook.

Thoughts & Notes

Making a small Go package has been an enlightening and interesting experience. As a result of this endeavor, I've jotted down some final thoughts.

Status

v1 (core API settled).

Common concurrency patterns are implemented. Possible future improvements:

  • Add more patters & utility functions (like Fan-in/Fan-out, Pub-Sub, etc.)
  • Add more cookbook examples

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL