typedpipe

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: MIT Imports: 4 Imported by: 0

README

go-typedpipe

Go Reference CI

go-typedpipe provides a generic, in-memory, concurrency-safe pipe for streaming typed values between goroutines.

It is conceptually similar to io.Pipe, but operates on values of any type T instead of []byte. Unlike a plain chan T, it provides context-aware blocking, idempotent close with error propagation, and a drain guarantee — buffered values written before close remain readable after close.

It is a small synchronization primitive, not a queue or broker.


Installation

go get github.com/fikrimohammad/go-typedpipe

Requires Go 1.18 or later.


Example

package main

import (
	"context"
	"fmt"
	"log"

	typedpipe "github.com/fikrimohammad/go-typedpipe"
)

func main() {
	w, r, err := typedpipe.New[int]()
	if err != nil {
		log.Fatal(err)
	}

	ctx := context.Background()

	go func() {
		for i := 0; i < 3; i++ {
			if err := w.Write(ctx, i); err != nil {
				log.Println("write:", err)
				return
			}
		}
		w.Close()
	}()

	for {
		v, err := r.Read(ctx)
		if err != nil {
			break
		}
		fmt.Println(v)
	}
	// Output:
	// 0
	// 1
	// 2
}
Propagating errors with CloseWithError
w, r, err := typedpipe.New[int]()
if err != nil {
	log.Fatal(err)
}

ctx := context.Background()

go func() {
	if err := fetchData(ctx, w); err != nil {
		// Signal the reader why the pipe stopped.
		w.CloseWithError(fmt.Errorf("fetch failed: %w", err))
		return
	}
	w.Close()
}()

for {
	v, err := r.Read(ctx)
	if err != nil {
		log.Println("reader stopped:", err) // fetch failed: ...
		break
	}
	process(v)
}

Semantics

Write

Write(ctx, v) blocks until:

  • The value is delivered
  • ctx is canceled
  • The pipe is closed

Returns the stored close error if the pipe is closed, or ctx.Err() if the context is canceled.

Read

Read(ctx) blocks until:

  • A value is available
  • ctx is canceled
  • The pipe is closed and fully drained

After all buffered values are consumed, returns the stored close error.

Close
  • Close() closes the pipe with ErrPipeClosed.
  • CloseWithError(err) closes the pipe with a custom error. If err is nil, ErrPipeClosed is used.
  • Both are idempotent — subsequent calls are no-ops.
  • The first non-nil error wins and is returned to all future operations.

Buffering

Value
Default buffer size 64
Maximum buffer size 2048
w, r, err := typedpipe.New[int](
	typedpipe.WithBufferSize(128),
)
if err != nil {
	log.Fatal(err)
}

A buffer size of 0 or less produces an unbuffered pipe, where each Write blocks until a corresponding Read occurs.


Guarantees

  • Safe for concurrent use — multiple goroutines may call Read, Write, and Close simultaneously.
  • No send-on-closed-channel panics — the internal channel is never closed; shutdown is signalled separately.
  • Idempotent shutdown — calling Close or CloseWithError multiple times is safe.
  • First error wins — the close error is set once and never overwritten.
  • Full drain on close — values written before close are fully readable after close, in order.
  • BackpressureWrite blocks when the buffer is full, preventing unbounded memory growth.

Use Cases

Appropriate for:

  • Producer–consumer pipelines
  • Worker coordination
  • Structured streaming between goroutines
  • Replacing chan T when context-aware operations and close error propagation are needed

Not intended for:

  • Broadcast or fan-out (single reader only)
  • Durable messaging
  • Cross-process communication

License

MIT

Documentation

Overview

Package typedpipe implements a generic, in-memory, concurrency-safe pipe.

It provides a typed alternative to io.Pipe: values of type T can be passed between goroutines with backpressure and coordinated shutdown semantics.

The pipe guarantees:

  • Safe concurrent use.
  • Blocking reads and writes.
  • Idempotent close.
  • Propagation of the first close error to all subsequent operations.
  • Full drain of buffered values before returning a close error to readers.

Shutdown semantics:

Either side (Reader or Writer) may call Close/CloseWithError. When closed:

  1. All in-progress and future Write calls return the close error immediately.
  2. Read calls drain any buffered values, then return the close error.

Note: Because both sides share a Closer, a reader may close the writer's side and vice versa. This matches io.Pipe semantics and is intentional.

Index

Constants

View Source
const (
	// DefaultBufferSize is the default channel capacity.
	DefaultBufferSize = 64

	// MaxBufferSize is the maximum allowed buffer capacity.
	// New returns an error if the requested size exceeds this value.
	MaxBufferSize = 2048
)

Variables

View Source
var ErrPipeClosed = errors.New("pipe is closed")

ErrPipeClosed is returned by operations on a closed pipe when no custom error was provided to CloseWithError.

Functions

func New

func New[T any](opts ...Option) (Writer[T], Reader[T], error)

New constructs a pipe and returns its Writer and Reader ends.

Types

type Closer

type Closer interface {
	Close()
	CloseWithError(err error)
}

Closer is the shared shutdown interface. The first non-nil error passed to CloseWithError is retained; subsequent calls are no-ops.

type Option

type Option func(*options)

Option configures a pipe at construction time.

func WithBufferSize

func WithBufferSize(n int) Option

WithBufferSize sets the pipe's internal channel capacity. A value <= 0 produces an unbuffered (synchronous) pipe. Values > MaxBufferSize cause New() to return an error.

type Reader

type Reader[T any] interface {
	Read(ctx context.Context) (T, error)
	Closer
}

Reader is the read side of the pipe.

type Writer

type Writer[T any] interface {
	Write(ctx context.Context, v T) error
	Closer
}

Writer is the write side of the pipe.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL