README

channels

Build Status GoDoc Code of Conduct

A collection of helper functions and special types for working with and extending Go's existing channels. Due to limitations of Go's type system, importing this library directly is often not practical for production code. It serves equally well, however, as a reference guide and template for implementing many common idioms; if you use it in this way I would appreciate the inclusion of some sort of credit in the resulting code.

See https://godoc.org/github.com/eapache/channels for full documentation or https://gopkg.in/eapache/channels.v1 for a versioned import path.

Requires Go version 1.1 or later, as certain necessary elements of the reflect package were not present in 1.0.

Most of the buffered channel types in this package are backed by a very fast queue implementation that used to be built into this package but has now been extracted into its own package at https://github.com/eapache/queue.

Note: Several types in this package provide so-called "infinite" buffers. Be very careful using these, as no buffer is truly infinite. If such a buffer grows too large your program will run out of memory and crash. Caveat emptor.

Expand ▾ Collapse ▴

Documentation

Overview

    Package channels provides a collection of helper functions, interfaces and implementations for working with and extending the capabilities of golang's existing channels. The main interface of interest is Channel, though sub-interfaces are also provided for cases where the full Channel interface cannot be met (for example, InChannel for write-only channels).

    For integration with native typed golang channels, functions Wrap and Unwrap are provided which do the appropriate type conversions. The NativeChannel, NativeInChannel and NativeOutChannel type definitions are also provided for use with native channels which already carry values of type interface{}.

    The heart of the package consists of several distinct implementations of the Channel interface, including channels backed by special buffers (resizable, infinite, ring buffers, etc) and other useful types. A "black hole" channel for discarding unwanted values (similar in purpose to ioutil.Discard or /dev/null) rounds out the set.

    Helper functions for operating on Channels include Pipe and Tee (which behave much like their Unix namesakes), as well as Multiplex and Distribute. "Weak" versions of these functions also exist, which do not close their output channel(s) on completion.

    Due to limitations of Go's type system, importing this library directly is often not practical for production code. It serves equally well, however, as a reference guide and template for implementing many common idioms; if you use it in this way I would appreciate the inclusion of some sort of credit in the resulting code.

    Warning: several types in this package provide so-called "infinite" buffers. Be *very* careful using these, as no buffer is truly infinite - if such a buffer grows too large your program will run out of memory and crash. Caveat emptor.

    Index

    Examples

    Constants

    This section is empty.

    Variables

    This section is empty.

    Functions

    func Distribute

    func Distribute(input SimpleOutChannel, outputs ...SimpleInChannel)

      Distribute takes a single input channel and an arbitrary number of output channels and duplicates each input into *one* available output. If multiple outputs are waiting for a value, one is chosen at random. When the input channel is closed, all outputs channels are closed. Distribute with a single output channel is equivalent to Pipe (though slightly less efficient).

      func Multiplex

      func Multiplex(output SimpleInChannel, inputs ...SimpleOutChannel)

        Multiplex takes an arbitrary number of input channels and multiplexes their output into a single output channel. When all input channels have been closed, the output channel is closed. Multiplex with a single input channel is equivalent to Pipe (though slightly less efficient).

        func Pipe

        func Pipe(input SimpleOutChannel, output SimpleInChannel)

          Pipe connects the input channel to the output channel so that they behave as if a single channel.

          func Tee

          func Tee(input SimpleOutChannel, outputs ...SimpleInChannel)

            Tee (like its Unix namesake) takes a single input channel and an arbitrary number of output channels and duplicates each input into every output. When the input channel is closed, all outputs channels are closed. Tee with a single output channel is equivalent to Pipe (though slightly less efficient).

            func Unwrap

            func Unwrap(input SimpleOutChannel, output interface{})

              Unwrap takes a SimpleOutChannel and uses reflection to pipe it to a typed native channel for easy integration with existing channel sources. Output can be any writable channel type (chan or chan<-). It panics if the output is not a writable channel, or if a value is received that cannot be sent on the output channel.

              func WeakDistribute

              func WeakDistribute(input SimpleOutChannel, outputs ...SimpleInChannel)

                WeakDistribute behaves like Distribute (distributing a single input amongst multiple outputs) except that it does not close the output channels when the input channel is closed.

                func WeakMultiplex

                func WeakMultiplex(output SimpleInChannel, inputs ...SimpleOutChannel)

                  WeakMultiplex behaves like Multiplex (multiplexing multiple inputs into a single output) except that it does not close the output channel when the input channels are closed.

                  func WeakPipe

                  func WeakPipe(input SimpleOutChannel, output SimpleInChannel)

                    WeakPipe behaves like Pipe (connecting the two channels) except that it does not close the output channel when the input channel is closed.

                    func WeakTee

                    func WeakTee(input SimpleOutChannel, outputs ...SimpleInChannel)

                      WeakTee behaves like Tee (duplicating a single input into multiple outputs) except that it does not close the output channels when the input channel is closed.

                      Types

                      type BatchingChannel

                      type BatchingChannel struct {
                      	// contains filtered or unexported fields
                      }

                        BatchingChannel implements the Channel interface, with the change that instead of producing individual elements on Out(), it batches together the entire internal buffer each time. Trying to construct an unbuffered batching channel will panic, that configuration is not supported (and provides no benefit over an unbuffered NativeChannel).

                        func NewBatchingChannel

                        func NewBatchingChannel(size BufferCap) *BatchingChannel

                        func (*BatchingChannel) Cap

                        func (ch *BatchingChannel) Cap() BufferCap

                        func (*BatchingChannel) Close

                        func (ch *BatchingChannel) Close()

                        func (*BatchingChannel) In

                        func (ch *BatchingChannel) In() chan<- interface{}

                        func (*BatchingChannel) Len

                        func (ch *BatchingChannel) Len() int

                        func (*BatchingChannel) Out

                        func (ch *BatchingChannel) Out() <-chan interface{}

                          Out returns a <-chan interface{} in order that BatchingChannel conforms to the standard Channel interface provided by this package, however each output value is guaranteed to be of type []interface{} - a slice collecting the most recent batch of values sent on the In channel. The slice is guaranteed to not be empty or nil. In practice the net result is that you need an additional type assertion to access the underlying values.

                          type BlackHole

                          type BlackHole struct {
                          	// contains filtered or unexported fields
                          }

                            BlackHole implements the InChannel interface and provides an analogue for the "Discard" variable in the ioutil package - it never blocks, and simply discards every value it reads. The number of items discarded in this way is counted and returned from Len.

                            func NewBlackHole

                            func NewBlackHole() *BlackHole

                            func (*BlackHole) Cap

                            func (ch *BlackHole) Cap() BufferCap

                            func (*BlackHole) Close

                            func (ch *BlackHole) Close()

                            func (*BlackHole) In

                            func (ch *BlackHole) In() chan<- interface{}

                            func (*BlackHole) Len

                            func (ch *BlackHole) Len() int

                            type Buffer

                            type Buffer interface {
                            	Len() int       // The number of elements currently buffered.
                            	Cap() BufferCap // The maximum number of elements that can be buffered.
                            }

                              Buffer is an interface for any channel that provides access to query the state of its buffer. Even unbuffered channels can implement this interface by simply returning 0 from Len() and None from Cap().

                              type BufferCap

                              type BufferCap int

                                BufferCap represents the capacity of the buffer backing a channel. Valid values consist of all positive integers, as well as the special values below.

                                const (
                                	// None is the capacity for channels that have no buffer at all.
                                	None BufferCap = 0
                                	// Infinity is the capacity for channels with no limit on their buffer size.
                                	Infinity BufferCap = -1
                                )

                                type Channel

                                type Channel interface {
                                	SimpleChannel
                                	Buffer
                                }

                                  Channel is an interface representing a channel that is readable, writeable and implements the Buffer interface

                                  Example
                                  Output:
                                  
                                  

                                  type DeadChannel

                                  type DeadChannel struct{}

                                    DeadChannel is a placeholder implementation of the Channel interface with no buffer that is never ready for reading or writing. Closing a dead channel is a no-op. Behaves almost like NativeChannel(nil) except that closing a nil NativeChannel will panic.

                                    func NewDeadChannel

                                    func NewDeadChannel() DeadChannel

                                    func (DeadChannel) Cap

                                    func (ch DeadChannel) Cap() BufferCap

                                    func (DeadChannel) Close

                                    func (ch DeadChannel) Close()

                                    func (DeadChannel) In

                                    func (ch DeadChannel) In() chan<- interface{}

                                    func (DeadChannel) Len

                                    func (ch DeadChannel) Len() int

                                    func (DeadChannel) Out

                                    func (ch DeadChannel) Out() <-chan interface{}

                                    type InChannel

                                    type InChannel interface {
                                    	SimpleInChannel
                                    	Buffer
                                    }

                                      InChannel is an interface representing a writeable channel with a buffer.

                                      type InfiniteChannel

                                      type InfiniteChannel struct {
                                      	// contains filtered or unexported fields
                                      }

                                        InfiniteChannel implements the Channel interface with an infinite buffer between the input and the output.

                                        func NewInfiniteChannel

                                        func NewInfiniteChannel() *InfiniteChannel

                                        func (*InfiniteChannel) Cap

                                        func (ch *InfiniteChannel) Cap() BufferCap

                                        func (*InfiniteChannel) Close

                                        func (ch *InfiniteChannel) Close()

                                        func (*InfiniteChannel) In

                                        func (ch *InfiniteChannel) In() chan<- interface{}

                                        func (*InfiniteChannel) Len

                                        func (ch *InfiniteChannel) Len() int

                                        func (*InfiniteChannel) Out

                                        func (ch *InfiniteChannel) Out() <-chan interface{}

                                        type NativeChannel

                                        type NativeChannel chan interface{}

                                          NativeChannel implements the Channel interface by wrapping a native go channel.

                                          func NewNativeChannel

                                          func NewNativeChannel(size BufferCap) NativeChannel

                                            NewNativeChannel makes a new NativeChannel with the given buffer size. Just a convenience wrapper to avoid having to cast the result of make().

                                            func (NativeChannel) Cap

                                            func (ch NativeChannel) Cap() BufferCap

                                            func (NativeChannel) Close

                                            func (ch NativeChannel) Close()

                                            func (NativeChannel) In

                                            func (ch NativeChannel) In() chan<- interface{}

                                            func (NativeChannel) Len

                                            func (ch NativeChannel) Len() int

                                            func (NativeChannel) Out

                                            func (ch NativeChannel) Out() <-chan interface{}

                                            type NativeInChannel

                                            type NativeInChannel chan<- interface{}

                                              NativeInChannel implements the InChannel interface by wrapping a native go write-only channel.

                                              func (NativeInChannel) Cap

                                              func (ch NativeInChannel) Cap() BufferCap

                                              func (NativeInChannel) Close

                                              func (ch NativeInChannel) Close()

                                              func (NativeInChannel) In

                                              func (ch NativeInChannel) In() chan<- interface{}

                                              func (NativeInChannel) Len

                                              func (ch NativeInChannel) Len() int

                                              type NativeOutChannel

                                              type NativeOutChannel <-chan interface{}

                                                NativeOutChannel implements the OutChannel interface by wrapping a native go read-only channel.

                                                func (NativeOutChannel) Cap

                                                func (ch NativeOutChannel) Cap() BufferCap

                                                func (NativeOutChannel) Len

                                                func (ch NativeOutChannel) Len() int

                                                func (NativeOutChannel) Out

                                                func (ch NativeOutChannel) Out() <-chan interface{}

                                                type OutChannel

                                                type OutChannel interface {
                                                	SimpleOutChannel
                                                	Buffer
                                                }

                                                  OutChannel is an interface representing a readable channel implementing the Buffer interface.

                                                  type OverflowingChannel

                                                  type OverflowingChannel struct {
                                                  	// contains filtered or unexported fields
                                                  }

                                                    OverflowingChannel implements the Channel interface in a way that never blocks the writer. Specifically, if a value is written to an OverflowingChannel when its buffer is full (or, in an unbuffered case, when the recipient is not ready) then that value is simply discarded. Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling the writer before the reader, so caveat emptor. For the opposite behaviour (discarding the oldest element, not the newest) see RingChannel.

                                                    func NewOverflowingChannel

                                                    func NewOverflowingChannel(size BufferCap) *OverflowingChannel

                                                    func (*OverflowingChannel) Cap

                                                    func (ch *OverflowingChannel) Cap() BufferCap

                                                    func (*OverflowingChannel) Close

                                                    func (ch *OverflowingChannel) Close()

                                                    func (*OverflowingChannel) In

                                                    func (ch *OverflowingChannel) In() chan<- interface{}

                                                    func (*OverflowingChannel) Len

                                                    func (ch *OverflowingChannel) Len() int

                                                    func (*OverflowingChannel) Out

                                                    func (ch *OverflowingChannel) Out() <-chan interface{}

                                                    type ResizableChannel

                                                    type ResizableChannel struct {
                                                    	// contains filtered or unexported fields
                                                    }

                                                      ResizableChannel implements the Channel interface with a resizable buffer between the input and the output. The channel initially has a buffer size of 1, but can be resized by calling Resize().

                                                      Resizing to a buffer capacity of None is, unfortunately, not supported and will panic (see https://github.com/eapache/channels/issues/1). Resizing back and forth between a finite and infinite buffer is fully supported.

                                                      func NewResizableChannel

                                                      func NewResizableChannel() *ResizableChannel

                                                      func (*ResizableChannel) Cap

                                                      func (ch *ResizableChannel) Cap() BufferCap

                                                      func (*ResizableChannel) Close

                                                      func (ch *ResizableChannel) Close()

                                                      func (*ResizableChannel) In

                                                      func (ch *ResizableChannel) In() chan<- interface{}

                                                      func (*ResizableChannel) Len

                                                      func (ch *ResizableChannel) Len() int

                                                      func (*ResizableChannel) Out

                                                      func (ch *ResizableChannel) Out() <-chan interface{}

                                                      func (*ResizableChannel) Resize

                                                      func (ch *ResizableChannel) Resize(newSize BufferCap)

                                                      type RingChannel

                                                      type RingChannel struct {
                                                      	// contains filtered or unexported fields
                                                      }

                                                        RingChannel implements the Channel interface in a way that never blocks the writer. Specifically, if a value is written to a RingChannel when its buffer is full then the oldest value in the buffer is discarded to make room (just like a standard ring-buffer). Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling the writer before the reader, so caveat emptor. For the opposite behaviour (discarding the newest element, not the oldest) see OverflowingChannel.

                                                        func NewRingChannel

                                                        func NewRingChannel(size BufferCap) *RingChannel

                                                        func (*RingChannel) Cap

                                                        func (ch *RingChannel) Cap() BufferCap

                                                        func (*RingChannel) Close

                                                        func (ch *RingChannel) Close()

                                                        func (*RingChannel) In

                                                        func (ch *RingChannel) In() chan<- interface{}

                                                        func (*RingChannel) Len

                                                        func (ch *RingChannel) Len() int

                                                        func (*RingChannel) Out

                                                        func (ch *RingChannel) Out() <-chan interface{}

                                                        type SharedBuffer

                                                        type SharedBuffer struct {
                                                        	// contains filtered or unexported fields
                                                        }

                                                          SharedBuffer implements the Buffer interface, and permits multiple SimpleChannel instances to "share" a single buffer. Each channel spawned by NewChannel has its own internal queue (so values flowing through do not get mixed up with other channels) but the total number of elements buffered by all spawned channels is limited to a single capacity. This means *all* such channels block and unblock for writing together. The primary use case is for implementing pipeline-style parallelism with goroutines, limiting the total number of elements in the pipeline without limiting the number of elements at any particular step.

                                                          Example
                                                          Output:
                                                          
                                                          

                                                          func NewSharedBuffer

                                                          func NewSharedBuffer(size BufferCap) *SharedBuffer

                                                          func (*SharedBuffer) Cap

                                                          func (buf *SharedBuffer) Cap() BufferCap

                                                          func (*SharedBuffer) Close

                                                          func (buf *SharedBuffer) Close()

                                                            Close shuts down the SharedBuffer. It is an error to call Close while channels are still using the buffer (I'm not really sure what would happen if you do so).

                                                            func (*SharedBuffer) Len

                                                            func (buf *SharedBuffer) Len() int

                                                            func (*SharedBuffer) NewChannel

                                                            func (buf *SharedBuffer) NewChannel() SimpleChannel

                                                              NewChannel spawns and returns a new channel sharing the underlying buffer.

                                                              type SimpleChannel

                                                              type SimpleChannel interface {
                                                              	SimpleInChannel
                                                              	SimpleOutChannel
                                                              }

                                                                SimpleChannel is an interface representing a channel that is both readable and writeable, but does not necessarily implement the Buffer interface.

                                                                type SimpleInChannel

                                                                type SimpleInChannel interface {
                                                                	In() chan<- interface{} // The writeable end of the channel.
                                                                	Close()                 // Closes the channel. It is an error to write to In() after calling Close().
                                                                }

                                                                  SimpleInChannel is an interface representing a writeable channel that does not necessarily implement the Buffer interface.

                                                                  type SimpleOutChannel

                                                                  type SimpleOutChannel interface {
                                                                  	Out() <-chan interface{} // The readable end of the channel.
                                                                  }

                                                                    SimpleOutChannel is an interface representing a readable channel that does not necessarily implement the Buffer interface.

                                                                    func Wrap

                                                                    func Wrap(ch interface{}) SimpleOutChannel

                                                                      Wrap takes any readable channel type (chan or <-chan but not chan<-) and exposes it as a SimpleOutChannel for easy integration with existing channel sources. It panics if the input is not a readable channel.