README

Data Flow Rate Control

To download and install this package run:

go get github.com/mxk/go-flowrate/flowrate

The documentation is available at:

http://godoc.org/github.com/mxk/go-flowrate/flowrate

Documentation

Overview

    Package flowrate provides the tools for monitoring and limiting the flow rate of an arbitrary data stream.

    Index

    Constants

    This section is empty.

    Variables

    View Source
    var ErrLimit = errors.New("flowrate: flow rate limit exceeded")

      ErrLimit is returned by the Writer when a non-blocking write is short due to the transfer rate limit.

      Functions

      This section is empty.

      Types

      type Limiter

      type Limiter interface {
      	Done() int64
      	Status() Status
      	SetTransferSize(bytes int64)
      	SetLimit(new int64) (old int64)
      	SetBlocking(new bool) (old bool)
      }

        Limiter is implemented by the Reader and Writer to provide a consistent interface for monitoring and controlling data transfer.

        type Monitor

        type Monitor struct {
        	// contains filtered or unexported fields
        }

          Monitor monitors and limits the transfer rate of a data stream.

          func New

          func New(sampleRate, windowSize time.Duration) *Monitor

            New creates a new flow control monitor. Instantaneous transfer rate is measured and updated for each sampleRate interval. windowSize determines the weight of each sample in the exponential moving average (EMA) calculation. The exact formulas are:

            sampleTime = currentTime - prevSampleTime
            sampleRate = byteCount / sampleTime
            weight     = 1 - exp(-sampleTime/windowSize)
            newRate    = weight*sampleRate + (1-weight)*oldRate
            

            The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s, respectively.

            func (*Monitor) Done

            func (m *Monitor) Done() int64

              Done marks the transfer as finished and prevents any further updates or limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and Limit methods become NOOPs. It returns the total number of bytes transferred.

              func (*Monitor) IO

              func (m *Monitor) IO(n int, err error) (int, error)

                IO is a convenience method intended to wrap io.Reader and io.Writer method execution. It calls m.Update(n) and then returns (n, err) unmodified.

                func (*Monitor) Limit

                func (m *Monitor) Limit(want int, rate int64, block bool) (n int)

                  Limit restricts the instantaneous (per-sample) data flow to rate bytes per second. It returns the maximum number of bytes (0 <= n <= want) that may be transferred immediately without exceeding the limit. If block == true, the call blocks until n > 0. want is returned unmodified if want < 1, rate < 1, or the transfer is inactive (after a call to Done).

                  At least one byte is always allowed to be transferred in any given sampling period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate is 10 bytes per second.

                  For usage examples, see the implementation of Reader and Writer in io.go.

                  func (*Monitor) SetREMA

                  func (m *Monitor) SetREMA(rEMA float64)

                    Hack to set the current rEMA.

                    func (*Monitor) SetTransferSize

                    func (m *Monitor) SetTransferSize(bytes int64)

                      SetTransferSize specifies the total size of the data transfer, which allows the Monitor to calculate the overall progress and time to completion.

                      func (*Monitor) Status

                      func (m *Monitor) Status() Status

                        Status returns current transfer status information. The returned value becomes static after a call to Done.

                        func (*Monitor) Update

                        func (m *Monitor) Update(n int) int

                          Update records the transfer of n bytes and returns n. It should be called after each Read/Write operation, even if n is 0.

                          type Percent

                          type Percent uint32

                            Percent represents a percentage in increments of 1/1000th of a percent.

                            func (Percent) Float

                            func (p Percent) Float() float64

                            func (Percent) String

                            func (p Percent) String() string

                            type Reader

                            type Reader struct {
                            	io.Reader // Data source
                            	*Monitor  // Flow control monitor
                            	// contains filtered or unexported fields
                            }

                              Reader implements io.ReadCloser with a restriction on the rate of data transfer.

                              func NewReader

                              func NewReader(r io.Reader, limit int64) *Reader

                                NewReader restricts all Read operations on r to limit bytes per second.

                                func (*Reader) Close

                                func (r *Reader) Close() error

                                  Close closes the underlying reader if it implements the io.Closer interface.

                                  func (*Reader) Read

                                  func (r *Reader) Read(p []byte) (n int, err error)

                                    Read reads up to len(p) bytes into p without exceeding the current transfer rate limit. It returns (0, nil) immediately if r is non-blocking and no new bytes can be read at this time.

                                    func (*Reader) SetBlocking

                                    func (r *Reader) SetBlocking(new bool) (old bool)

                                      SetBlocking changes the blocking behavior and returns the previous setting. A Read call on a non-blocking reader returns immediately if no additional bytes may be read at this time due to the rate limit.

                                      func (*Reader) SetLimit

                                      func (r *Reader) SetLimit(new int64) (old int64)

                                        SetLimit changes the transfer rate limit to new bytes per second and returns the previous setting.

                                        type Status

                                        type Status struct {
                                        	Active   bool          // Flag indicating an active transfer
                                        	Start    time.Time     // Transfer start time
                                        	Duration time.Duration // Time period covered by the statistics
                                        	Idle     time.Duration // Time since the last transfer of at least 1 byte
                                        	Bytes    int64         // Total number of bytes transferred
                                        	Samples  int64         // Total number of samples taken
                                        	InstRate int64         // Instantaneous transfer rate
                                        	CurRate  int64         // Current transfer rate (EMA of InstRate)
                                        	AvgRate  int64         // Average transfer rate (Bytes / Duration)
                                        	PeakRate int64         // Maximum instantaneous transfer rate
                                        	BytesRem int64         // Number of bytes remaining in the transfer
                                        	TimeRem  time.Duration // Estimated time to completion
                                        	Progress Percent       // Overall transfer progress
                                        }

                                          Status represents the current Monitor status. All transfer rates are in bytes per second rounded to the nearest byte.

                                          type Writer

                                          type Writer struct {
                                          	io.Writer // Data destination
                                          	*Monitor  // Flow control monitor
                                          	// contains filtered or unexported fields
                                          }

                                            Writer implements io.WriteCloser with a restriction on the rate of data transfer.

                                            func NewWriter

                                            func NewWriter(w io.Writer, limit int64) *Writer

                                              NewWriter restricts all Write operations on w to limit bytes per second. The transfer rate and the default blocking behavior (true) can be changed directly on the returned *Writer.

                                              func (*Writer) Close

                                              func (w *Writer) Close() error

                                                Close closes the underlying writer if it implements the io.Closer interface.

                                                func (*Writer) SetBlocking

                                                func (w *Writer) SetBlocking(new bool) (old bool)

                                                  SetBlocking changes the blocking behavior and returns the previous setting. A Write call on a non-blocking writer returns as soon as no additional bytes may be written at this time due to the rate limit.

                                                  func (*Writer) SetLimit

                                                  func (w *Writer) SetLimit(new int64) (old int64)

                                                    SetLimit changes the transfer rate limit to new bytes per second and returns the previous setting.

                                                    func (*Writer) Write

                                                    func (w *Writer) Write(p []byte) (n int, err error)

                                                      Write writes len(p) bytes from p to the underlying data stream without exceeding the current transfer rate limit. It returns (n, ErrLimit) if w is non-blocking and no additional bytes can be written at this time.