ringbuffer

package module
v0.0.0-...-0da97b5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2025 License: MIT Imports: 6 Imported by: 20

README

ringbuffer

License GoDoc Go Report Card coveralls

A circular buffer (ring buffer) in Go, implemented io.ReaderWriter interface

wikipedia

Usage

package main

import (
	"fmt"

	"github.com/smallnest/ringbuffer"
)

func main() {
	rb := ringbuffer.New(1024)

	// write
	rb.Write([]byte("abcd"))
	fmt.Println(rb.Length())
	fmt.Println(rb.Free())

	// read
	buf := make([]byte, 4)
	rb.Read(buf)
	fmt.Println(string(buf))
}

It is possible to use an existing buffer with by replacing New with NewBuffer.

Blocking vs Non-blocking

The default behavior of the ring buffer is non-blocking, meaning that reads and writes will return immediately with an error if the operation cannot be completed. If you want to block when reading or writing, you must enable it:

	rb := ringbuffer.New(1024).SetBlocking(true)

Enabling blocking will cause the ring buffer to behave like a buffered io.Pipe.

Regular Reads will block until data is available, but not wait for a full buffer. Writes will block until there is space available and writes bigger than the buffer will wait for reads to make space.

TryRead and TryWrite are still available for non-blocking reads and writes.

To signify the end of the stream, close the ring buffer from the writer side with rb.CloseWriter()

Either side can use rb.CloseWithError(err error) to signal an error and close the ring buffer. Any reads or writes will return the error on next call.

In blocking mode errors are stateful and the same error will be returned until rb.Reset() is called.

It is possible to set a deadline for blocking Read/Write operations using rb.WithDeadline(time.Duration).

io.Copy replacement

The ring buffer can replace io.Copy and io.CopyBuffer to do async copying through the ring buffer.

The copy operation will happen directly on the buffer, so between reads and writes there is no memory copy.

Here is a simple example where the copy operation is replaced by a ring buffer:

func saveWebsite(url, file string) {
    in, _ := http.Get(url)
    out, _ := os.Create(file)

    // Copy with regular buffered copy
    // n, err := io.Copy(out, in.Body)

    // Copy with ring buffer
    n, err := ringbuffer.New(1024).Copy(out, in.Body)
    fmt.Println(n, err)
}

The ring buffer implements io.ReaderFrom and io.WriterTo interfaces, which allows to fill either or both the write and read side respectively.

This will provide an async method for writing or reading directly into the ring buffer. These functions require that "blocking" is set on the pipe.

Example:

func readWebsite(url string) io.ReadCloser {
	in, _ := http.Get(url)

	// Create blocking ring buffer
	ring := ringbuffer.New(1024).SetBlocking(true)

	// Read from the input in a goroutine into the ring buffer
	go func() {
		ring.ReadFrom(in.Body)
		ring.CloseWriter()
	}()
	return ring.ReadCloser()
}

io.Pipe replacement

The ring buffer can be used as a compatible, but asynchronous replacement of io.Pipe.

That means that Reads and Writes will go to the ring buffer. Writes will complete as long as the data fits within the ring buffer.

Reads will attempt to satisfy reads with data from the ring buffer. The read will only block if the ring buffer is empty.

In the common case, where the Read and Write side can run concurrently, it is safe to replace io.Pipe() with (*Ringbuffer).Pipe().

Compare the following to the io.Pipe example:

func main() {
	// Create pipe from a 4KB ring buffer.
	r, w := ringbuffer.New(4 << 10).Pipe()

	go func() {
		fmt.Fprint(w, "some io.Reader stream to be read\n")
		w.Close()
	}()

	if _, err := io.Copy(os.Stdout, r); err != nil {
		log.Fatal(err)
	}
}

When creating the pipe, the ring buffer is internally switched to blocking mode.

Error reporting on Close and CloseWithError functions is similar to io.Pipe.

It is possible to use the original ring buffer alongside the pipe functions. So for example it is possible to "seed" the ring buffer with data, so reads can complete at once.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrTooMuchDataToWrite is returned when the data to write is more than the buffer size.
	ErrTooMuchDataToWrite = errors.New("too much data to write")

	// ErrIsFull is returned when the buffer is full and not blocking.
	ErrIsFull = errors.New("ringbuffer is full")

	// ErrIsEmpty is returned when the buffer is empty and not blocking.
	ErrIsEmpty = errors.New("ringbuffer is empty")

	// ErrIsNotEmpty is returned when the buffer is not empty and not blocking.
	ErrIsNotEmpty = errors.New("ringbuffer is not empty")

	// ErrAcquireLock is returned when the lock is not acquired on Try operations.
	ErrAcquireLock = errors.New("unable to acquire lock")

	// ErrWriteOnClosed is returned when write on a closed ringbuffer.
	ErrWriteOnClosed = errors.New("write on closed ringbuffer")

	// ErrReaderClosed is returned when a ReadClosed closed the ringbuffer.
	ErrReaderClosed = errors.New("reader closed")
)

Functions

This section is empty.

Types

type PipeReader

type PipeReader struct {
	// contains filtered or unexported fields
}

A PipeReader is the read half of a pipe.

func (*PipeReader) Close

func (r *PipeReader) Close() error

Close closes the reader; subsequent writes to the write half of the pipe will return the error io.ErrClosedPipe.

func (*PipeReader) CloseWithError

func (r *PipeReader) CloseWithError(err error) error

CloseWithError closes the reader; subsequent writes to the write half of the pipe will return the error err.

CloseWithError never overwrites the previous error if it exists and always returns nil.

func (*PipeReader) Read

func (r *PipeReader) Read(data []byte) (n int, err error)

Read implements the standard Read interface: it reads data from the pipe, blocking until a writer arrives or the write end is closed. If the write end is closed with an error, that error is returned as err; otherwise err is io.EOF.

type PipeWriter

type PipeWriter struct {
	// contains filtered or unexported fields
}

A PipeWriter is the write half of a pipe.

func (*PipeWriter) Close

func (w *PipeWriter) Close() error

Close closes the writer; subsequent reads from the read half of the pipe will return no bytes and EOF.

func (*PipeWriter) CloseWithError

func (w *PipeWriter) CloseWithError(err error) error

CloseWithError closes the writer; subsequent reads from the read half of the pipe will return no bytes and the error err, or EOF if err is nil.

CloseWithError never overwrites the previous error if it exists and always returns nil.

func (*PipeWriter) Write

func (w *PipeWriter) Write(data []byte) (n int, err error)

Write implements the standard Write interface: it writes data to the pipe. The Write will block until all data has been written to the ring buffer. If the read end is closed with an error, that err is returned as err; otherwise err is io.ErrClosedPipe.

type RingBuffer

type RingBuffer struct {
	// contains filtered or unexported fields
}

RingBuffer is a circular buffer that implements io.ReaderWriter interface. It operates like a buffered pipe, where data is written to a RingBuffer and can be read back from another goroutine. It is safe to concurrently read and write RingBuffer.

Example
rb := New(1024)
rb.Write([]byte("abcd"))
fmt.Println(rb.Length())
fmt.Println(rb.Free())
buf := make([]byte, 4)

rb.Read(buf)
fmt.Println(string(buf))
Output:

4
1020
abcd

func New

func New(size int) *RingBuffer

New returns a new RingBuffer whose buffer has the given size.

func NewBuffer

func NewBuffer(b []byte) *RingBuffer

NewBuffer returns a new RingBuffer whose buffer is provided.

func (*RingBuffer) Bytes

func (r *RingBuffer) Bytes(dst []byte) []byte

Bytes returns all available read bytes. It does not move the read pointer and only copy the available data. If the dst is big enough, it will be used as destination, otherwise a new buffer will be allocated.

func (*RingBuffer) Capacity

func (r *RingBuffer) Capacity() int

Capacity returns the size of the underlying buffer.

func (*RingBuffer) CloseWithError

func (r *RingBuffer) CloseWithError(err error)

CloseWithError closes the writer; reads will return no bytes and the error err, or EOF if err is nil.

CloseWithError never overwrites the previous error if it exists and always returns nil.

func (*RingBuffer) CloseWriter

func (r *RingBuffer) CloseWriter()

CloseWriter closes the writer. Reads will return any remaining bytes and io.EOF.

func (*RingBuffer) Copy

func (r *RingBuffer) Copy(dst io.Writer, src io.Reader) (written int64, err error)

Copy will pipe all data from the reader to the writer through the ringbuffer. The ringbuffer will switch to blocking mode. Reads and writes will be done async. No internal mem-copies are used for the transfer.

Calling CloseWithError will cancel the transfer and make the function return when any ongoing reads or writes have finished.

Calling Read or Write functions concurrently with running this will lead to unpredictable results.

func (*RingBuffer) Flush

func (r *RingBuffer) Flush() error

Flush waits for the buffer to be empty and fully read. If not blocking ErrIsNotEmpty will be returned if the buffer still contains data.

func (*RingBuffer) Free

func (r *RingBuffer) Free() int

Free returns the number of bytes that can be written without blocking.

func (*RingBuffer) IsEmpty

func (r *RingBuffer) IsEmpty() bool

IsEmpty returns true when the ringbuffer is empty.

func (*RingBuffer) IsFull

func (r *RingBuffer) IsFull() bool

IsFull returns true when the ringbuffer is full.

func (*RingBuffer) Length

func (r *RingBuffer) Length() int

Length returns the number of bytes that can be read without blocking.

func (*RingBuffer) Peek

func (r *RingBuffer) Peek(p []byte) (n int, err error)

Peek reads up to len(p) bytes into p without moving the read pointer.

func (*RingBuffer) Pipe

func (r *RingBuffer) Pipe() (*PipeReader, *PipeWriter)

Pipe creates an asynchronous in-memory pipe compatible with io.Pipe It can be used to connect code expecting an io.Reader with code expecting an io.Writer.

Reads and Writes will go to the ring buffer. Writes will complete as long as the data fits within the ring buffer. Reads will attempt to satisfy reads with data from the ring buffer. Only if the ring buffer is empty will the read block.

It is safe (and intended) to call Read and Write in parallel with each other or with Close.

Example
// Create pipe from a 4KB ring buffer.
r, w := New(4 << 10).Pipe()

go func() {
	fmt.Fprint(w, "some io.Reader stream to be read\n")
	w.Close()
}()

if _, err := io.Copy(os.Stdout, r); err != nil {
	log.Fatal(err)
}
Output:

some io.Reader stream to be read

func (*RingBuffer) Read

func (r *RingBuffer) Read(p []byte) (n int, err error)

Read reads up to len(p) bytes into p. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered. Even if Read returns n < len(p), it may use all of p as scratch space during the call. If some data is available but not len(p) bytes, Read conventionally returns what is available instead of waiting for more. When Read encounters an error or end-of-file condition after successfully reading n > 0 bytes, it returns the number of bytes read. It may return the (non-nil) error from the same call or return the error (and n == 0) from a subsequent call. Callers should always process the n > 0 bytes returned before considering the error err. Doing so correctly handles I/O errors that happen after reading some bytes and also both of the allowed EOF behaviors.

func (*RingBuffer) ReadByte

func (r *RingBuffer) ReadByte() (b byte, err error)

ReadByte reads and returns the next byte from the input or ErrIsEmpty.

func (*RingBuffer) ReadCloser

func (r *RingBuffer) ReadCloser() io.ReadCloser

ReadCloser returns a io.ReadCloser that reads to the ring buffer. When the returned ReadCloser is closed, ErrReaderClosed will be returned on any writes done afterwards.

func (*RingBuffer) ReadFrom

func (r *RingBuffer) ReadFrom(rd io.Reader) (n int64, err error)

ReadFrom will fulfill the write side of the ringbuffer. This will do writes directly into the buffer, therefore avoiding a mem-copy when using the Write.

ReadFrom will not automatically close the buffer even after returning. For that call CloseWriter().

ReadFrom reads data from r until EOF or error. The return value n is the number of bytes read. Any error except EOF encountered during the read is also returned, and the error will cause the Read side to fail as well. ReadFrom only available in blocking mode.

func (*RingBuffer) Reset

func (r *RingBuffer) Reset()

Reset the read pointer and writer pointer to zero.

func (*RingBuffer) SetBlocking

func (r *RingBuffer) SetBlocking(block bool) *RingBuffer

SetBlocking sets the blocking mode of the ring buffer. If block is true, Read and Write will block when there is no data to read or no space to write. If block is false, Read and Write will return ErrIsEmpty or ErrIsFull immediately. By default, the ring buffer is not blocking. This setting should be called before any Read or Write operation or after a Reset.

func (*RingBuffer) TryRead

func (r *RingBuffer) TryRead(p []byte) (n int, err error)

TryRead read up to len(p) bytes into p like Read, but it is never blocking. If it does not succeed to acquire the lock, it returns ErrAcquireLock.

func (*RingBuffer) TryWrite

func (r *RingBuffer) TryWrite(p []byte) (n int, err error)

TryWrite writes len(p) bytes from p to the underlying buf like Write, but it is not blocking. If it does not succeed to acquire the lock, it returns ErrAcquireLock.

func (*RingBuffer) TryWriteByte

func (r *RingBuffer) TryWriteByte(c byte) error

TryWriteByte writes one byte into buffer without blocking. If it does not succeed to acquire the lock, it returns ErrAcquireLock.

func (*RingBuffer) WithCancel

func (r *RingBuffer) WithCancel(ctx context.Context) *RingBuffer

WithCancel sets a context to cancel the ring buffer. When the context is canceled, the ring buffer will be closed with the context error. A goroutine will be started and run until the provided context is canceled.

func (*RingBuffer) WithReadTimeout

func (r *RingBuffer) WithReadTimeout(d time.Duration) *RingBuffer

WithReadTimeout will set a blocking read timeout. Reads refers to any call that reads data from the buffer. If no writes occur within the timeout, the ringbuffer will be closed and context.DeadlineExceeded will be returned. A timeout of 0 or less will disable timeouts (default).

func (*RingBuffer) WithTimeout

func (r *RingBuffer) WithTimeout(d time.Duration) *RingBuffer

WithTimeout will set a blocking read/write timeout. If no reads or writes occur within the timeout, the ringbuffer will be closed and context.DeadlineExceeded will be returned. A timeout of 0 or less will disable timeouts (default).

func (*RingBuffer) WithWriteTimeout

func (r *RingBuffer) WithWriteTimeout(d time.Duration) *RingBuffer

WithWriteTimeout will set a blocking write timeout. Write refers to any call that writes data into the buffer. If no reads occur within the timeout, the ringbuffer will be closed and context.DeadlineExceeded will be returned. A timeout of 0 or less will disable timeouts (default).

func (*RingBuffer) Write

func (r *RingBuffer) Write(p []byte) (n int, err error)

Write writes len(p) bytes from p to the underlying buf. It returns the number of bytes written from p (0 <= n <= len(p)) and any error encountered that caused the write to stop early. If blocking n < len(p) will be returned only if an error occurred. Write returns a non-nil error if it returns n < len(p). Write will not modify the slice data, even temporarily.

func (*RingBuffer) WriteByte

func (r *RingBuffer) WriteByte(c byte) error

WriteByte writes one byte into buffer, and returns ErrIsFull if the buffer is full.

func (*RingBuffer) WriteCloser

func (r *RingBuffer) WriteCloser() io.WriteCloser

WriteCloser returns a WriteCloser that writes to the ring buffer. When the returned WriteCloser is closed, it will wait for all data to be read before returning.

func (*RingBuffer) WriteString

func (r *RingBuffer) WriteString(s string) (n int, err error)

WriteString writes the contents of the string s to buffer, which accepts a slice of bytes.

func (*RingBuffer) WriteTo

func (r *RingBuffer) WriteTo(w io.Writer) (n int64, err error)

WriteTo writes data to w until there's no more data to write or when an error occurs. The return value n is the number of bytes written. Any error encountered during the write is also returned.

If a non-nil error is returned the write side will also see the error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL