streambuf

package module
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: MIT Imports: 6 Imported by: 0

README

streambuf   GoDoc Coverage Go Report Card MIT licensed

banner

streambuf is a Go library that provides an append-only buffer with multiple independent readers.

It allows a single writer to continuously append bytes to a buffer, while any number of readers consume the data at their own pace, without interfering with each other.

The buffer can be backed by memory or by a file, making it suitable for both lightweight in-memory streaming and durable, disk-backed use cases.

In practice, this is useful when you want more than "a mutex around an io.Writer". A plain writer gives you serialized writes, but it does not give each consumer its own read cursor, late-joining readers, coordinated blocking reads, or a shared file-backed stream that avoids opening separate descriptors per reader.

Motivation

Go’s standard library provides excellent primitives for streaming (io.Reader, io.Writer, bufio, channels), but it lacks a native abstraction for:

  • Append-only data
  • Multiple independent readers
  • Late-joining readers
  • Sequential, ordered reads
  • Optional file-backed persistence

streambuf fills this gap by behaving like a shared, growing stream where readers maintain their own cursor.

This pattern shows up frequently in systems programming, including:

  • Chat and messaging services
  • Log streaming
  • Fan-out pipelines
  • Event feeds
  • Streaming ingestion systems
  • Testing and replay of streamed data

Why This Exists

It is reasonable to ask: why not just protect an io.Writer with a mutex?

That solves a different problem.

A mutex around a writer helps multiple goroutines write safely, but it does not provide:

  • Independent readers with their own offsets
  • Readers that can join after data has already been written
  • Reads that block until more data arrives
  • A single shared file-backed source for many readers

streambuf is for cases where one side is continuously appending data and many readers need to observe the same ordered byte stream without consuming it from each other.

Example Use Case

Imagine a service that receives a live byte stream from one upstream connection and needs to expose it to several downstream consumers:

  • One consumer forwards the stream to an HTTP client
  • One consumer writes it to disk for later replay
  • One consumer parses it for metrics or events

With a normal io.Writer, you still need to build the fan-out, track read positions, and coordinate blocking behavior yourself.

With streambuf, the producer writes once, each consumer gets its own reader, and a file-backed buffer can keep everything on a single shared file descriptor instead of opening one per consumer.

Examples

Below are quick API examples. For runnable end-to-end examples, see examples/.

New
func ExampleNew() {
	var err error
	if exampleBuffer, err = New("path/to/file"); err != nil {
		log.Fatal(err)
	}
}
NewStream
func ExampleNewStream() {
	var err error
	// NewStream constructs a read-only file-backed stream.
	if exampleStream, err = NewStream("path/to/file"); err != nil {
		log.Fatal(err)
	}
}
NewMemory
func ExampleNewMemory() {
	exampleBuffer = NewMemory()
}
NewMemoryStream
func ExampleNewMemoryStream() {
	bs := []byte("hello world")
	exampleStream = NewMemoryStream(bs)
}
Buffer.Write
func ExampleBuffer_Write() {
	if _, err := exampleBuffer.Write([]byte("hello world")); err != nil {
		log.Fatal(err)
	}
}
Buffer.Reader
func ExampleBuffer_Reader() {
	var err error
	if _, err = exampleBuffer.Write([]byte("hello world")); err != nil {
		log.Fatal(err)
	}

	var (
		r1 io.ReadSeekCloser
		r2 io.ReadSeekCloser
		r3 io.ReadSeekCloser
	)

	if r1, err = exampleBuffer.Reader(); err != nil {
		log.Fatal(err)
	}
	defer r1.Close()

	if r2, err = exampleBuffer.Reader(); err != nil {
		log.Fatal(err)
	}
	defer r2.Close()

	if r3, err = exampleBuffer.Reader(); err != nil {
		log.Fatal(err)
	}
	defer r3.Close()

	// Each reader is independent and maintains its own read offset.
	// Reads or seeks on r1 do not affect r2 or r3.
}
Buffer.Close
func ExampleBuffer_Close() {
	// Close closes the backend immediately and does not wait for readers to finish.
	if err := exampleBuffer.Close(); err != nil {
		log.Fatal(err)
	}
}
Buffer.CloseAndWait
func ExampleBuffer_CloseAndWait() {
	// CloseAndWait blocks until the backend is closed and all readers are closed,
	// or until the provided context is done.
	if err := exampleBuffer.CloseAndWait(context.Background()); err != nil {
		log.Fatal(err)
	}
}
Stream.Reader
func ExampleStream_Reader() {
	var (
		r1  io.ReadSeekCloser
		r2  io.ReadSeekCloser
		r3  io.ReadSeekCloser
		err error
	)

	if r1, err = exampleStream.Reader(); err != nil {
		log.Fatal(err)
	}
	defer r1.Close()

	if r2, err = exampleStream.Reader(); err != nil {
		log.Fatal(err)
	}
	defer r2.Close()

	if r3, err = exampleStream.Reader(); err != nil {
		log.Fatal(err)
	}
	defer r3.Close()

	// Each reader is independent and maintains its own read offset.
	// Reads or seeks on r1 do not affect r2 or r3.
}
Stream.Close
func ExampleStream_Close() {
	// Close closes the readable backend immediately and does not wait for readers.
	if err := exampleStream.Close(); err != nil {
		log.Fatal(err)
	}
}
Stream.CloseAndWait
func ExampleStream_CloseAndWait() {
	// CloseAndWait blocks until the readable backend is closed and all readers are
	// closed, or until the provided context is done.
	if err := exampleStream.CloseAndWait(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Core Concepts

Append-only buffer

Data is written once and never modified in place.

Writes always append to the end of the buffer.

Independent readers

Each reader maintains its own read position. Readers do not block or consume data from each other.

Readers may:

  • Start from the beginning
  • Start from the current end
  • Join after data has already been written
Blocking reads

Readers block when no data is available and resume automatically when new data is appended.

For streams (NewStream), this means reaching the current end of the readable file will also block until the stream is closed or the reader is closed.

If you are treating a stream as a finite snapshot, call Close() (or CloseAndWait(...)) on the stream after readers finish consuming data, or close the reader directly, to unblock waiting reads and complete shutdown cleanly.

Shutdown behavior
  • Close() closes immediately. Existing unread bytes may no longer be available to readers.
  • CloseAndWait(ctx) closes writes and waits for readers until ctx is canceled.
  • ctx can be a timeout/deadline context to bound how long shutdown waits.
  • Terminal reads after either buffer close or reader close return ErrIsClosed.
  • To preserve reader drain behavior, finish reading first, then call CloseAndWait (or coordinate with reader Close calls and context cancellation).
  • If ctx is canceled before readers close, CloseAndWait still returns and the buffer stays closed; close outstanding readers afterward to finish internal wait cleanup.
Pluggable storage

streambuf supports multiple backing implementations:

  • Memory-backed ([]byte)
  • File-backed (using a shared file descriptor)
  • Read-only file-backed stream (existing file opened read-only)

Buffer and Stream share the same reader behavior. Buffer adds Write, while Stream is read-only.

AI Usage and Authorship

This project is intentionally human-authored for all logic.

To be explicit:

  • AI does not write or modify non-test code in this repository.
  • AI does not make architectural or behavioral decisions.
  • AI may assist with documentation, comments, and test scaffolding only.
  • All implementation logic is written and reviewed by human maintainers.

These boundaries are enforced in AGENTS.md and are part of this repository's contribution discipline.

Contributors

  • Human maintainers: library design, implementation, and behavior decisions.
  • ChatGPT Codex: documentation, test coverage support, and comments.
  • Google Gemini: README artwork generation.

banner

Documentation

Overview

Package streambuf provides append-only buffers and read-only streams with independent readers that can block until more data is available or the instance is closed.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrSeekEndNotSupported is returned when seeking relative to the end.
	// Reader-backed seeks currently support only SeekStart and SeekCurrent.
	ErrSeekEndNotSupported = errors.New("seek end is not currently supported")
	// ErrInvalidWhence is returned when Seek receives an unsupported whence value.
	ErrInvalidWhence = errors.New("invalid seek whence")
	// ErrNegativeIndex is returned when a seek would move before byte index 0.
	// The reader position is clamped to 0 in this case.
	ErrNegativeIndex = errors.New("invalid index, cannot be less than 0")
	// ErrCannotWriteToReadOnly is returned when a write is attempted on a read-only backend.
	ErrCannotWriteToReadOnly = errors.New("cannot write to read-only backend")
	// ErrIsClosed is returned when an action is attempted on a closed instance.
	ErrIsClosed = errors.New("cannot perform action on closed instance")
)

Functions

This section is empty.

Types

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer is a thread-safe byte buffer with reader support.

func New

func New(filepath string) (out *Buffer, err error)

New constructs a new file Buffer.

Example
var err error
if exampleBuffer, err = New("path/to/file"); err != nil {
	log.Fatal(err)
}

func NewMemory added in v0.2.0

func NewMemory() (out *Buffer)

NewMemory constructs a new in-memory Buffer.

Example
exampleBuffer = NewMemory()

func (*Buffer) Close

func (b *Buffer) Close() (err error)

Close closes the writer side of the buffer and signals waiting readers. It does not wait for readers to call Close.

Example
// Close closes the backend immediately and does not wait for readers to finish.
if err := exampleBuffer.Close(); err != nil {
	log.Fatal(err)
}

func (*Buffer) CloseAndWait added in v0.2.0

func (b *Buffer) CloseAndWait(ctx context.Context) (err error)

CloseAndWait closes the writer side of the buffer and signals waiting readers. It waits for readers to close until ctx is canceled. Once called, future Reader and Write calls return ErrIsClosed. ctx must be non-nil. If ctx is canceled before readers close, this call still returns and the buffer remains closed; readers should still be closed to complete internal wait cleanup.

Example
// CloseAndWait blocks until the backend is closed and all readers are closed,
// or until the provided context is done.
if err := exampleBuffer.CloseAndWait(context.Background()); err != nil {
	log.Fatal(err)
}

func (Buffer) Reader

func (s Buffer) Reader() (r io.ReadSeekCloser, err error)

Reader returns a new io.ReadSeekCloser that streams data from the stream. Each reader tracks its own read offset and supports seeking relative to the start or current position. It returns ErrIsClosed if the stream is closed.

Example
var err error
if _, err = exampleBuffer.Write([]byte("hello world")); err != nil {
	log.Fatal(err)
}

var (
	r1 io.ReadSeekCloser
	r2 io.ReadSeekCloser
	r3 io.ReadSeekCloser
)

if r1, err = exampleBuffer.Reader(); err != nil {
	log.Fatal(err)
}
defer r1.Close()

if r2, err = exampleBuffer.Reader(); err != nil {
	log.Fatal(err)
}
defer r2.Close()

if r3, err = exampleBuffer.Reader(); err != nil {
	log.Fatal(err)
}
defer r3.Close()

// Each reader is independent and maintains its own read offset.
// Reads or seeks on r1 do not affect r2 or r3.

func (*Buffer) Write

func (b *Buffer) Write(bs []byte) (n int, err error)

Write appends bytes to the buffer and wakes waiting readers. It returns ErrIsClosed if the buffer has been closed.

Example
if _, err := exampleBuffer.Write([]byte("hello world")); err != nil {
	log.Fatal(err)
}

type Stream added in v0.6.0

type Stream struct {
	// contains filtered or unexported fields
}

Stream is a thread-safe read-only stream with reader support.

func NewMemoryStream added in v0.6.0

func NewMemoryStream(bs []byte) (out *Stream)

NewMemoryStream constructs a read-only memory-backed Stream over bs.

Example
bs := []byte("hello world")
exampleStream = NewMemoryStream(bs)

func NewStream added in v0.6.0

func NewStream(filepath string) (out *Stream, err error)

NewStream constructs a read-only file-backed Stream.

Example
var err error
// NewStream constructs a read-only file-backed stream.
if exampleStream, err = NewStream("path/to/file"); err != nil {
	log.Fatal(err)
}

func (Stream) Close added in v0.6.0

func (s Stream) Close() (err error)

Close closes the stream and signals waiting readers. It does not wait for readers to call Close.

Example
// Close closes the readable backend immediately and does not wait for readers.
if err := exampleStream.Close(); err != nil {
	log.Fatal(err)
}

func (Stream) CloseAndWait added in v0.6.0

func (s Stream) CloseAndWait(ctx context.Context) (err error)

CloseAndWait closes the stream and signals waiting readers. It waits for readers to close until ctx is canceled. Once called, future Reader calls return ErrIsClosed. ctx must be non-nil. If ctx is canceled before readers close, this call still returns and the stream remains closed; readers should still be closed to complete internal wait cleanup.

Example
// CloseAndWait blocks until the readable backend is closed and all readers are
// closed, or until the provided context is done.
if err := exampleStream.CloseAndWait(context.Background()); err != nil {
	log.Fatal(err)
}

func (Stream) Reader added in v0.6.0

func (s Stream) Reader() (r io.ReadSeekCloser, err error)

Reader returns a new io.ReadSeekCloser that streams data from the stream. Each reader tracks its own read offset and supports seeking relative to the start or current position. It returns ErrIsClosed if the stream is closed.

Example
var (
	r1  io.ReadSeekCloser
	r2  io.ReadSeekCloser
	r3  io.ReadSeekCloser
	err error
)

if r1, err = exampleStream.Reader(); err != nil {
	log.Fatal(err)
}
defer r1.Close()

if r2, err = exampleStream.Reader(); err != nil {
	log.Fatal(err)
}
defer r2.Close()

if r3, err = exampleStream.Reader(); err != nil {
	log.Fatal(err)
}
defer r3.Close()

// Each reader is independent and maintains its own read offset.
// Reads or seeks on r1 do not affect r2 or r3.

Directories

Path Synopsis
examples
basic command
basic_with_wait command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL