pipe

package
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2024 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package pipe provides implementations of unbounded and in-memory pipes, which provides a writer that the caller can use to collect data and a streamline.Stream instance that can be used to consume the data.

Index

Examples

Constants

This section is empty.

Variables

View Source
var FileBuffersSize int64 = 124 * 1024 * 1024 // 124MB

FileBuffersSize confiugres the size of files created to store buffer overflows after the in-memory capacity, MemoryBufferSize, is reached in the unbounded buffers created by NewStream.

View Source
var MemoryBufferSize int64 = 32 * 1024 * 1024 // 32MB

MemoryBufferSize confiugres the maximum in-memory size of the buffers created by this package.

When using the default unbounded buffer in NewStream, overflows are written to disk at increments of size FileBuffersSize.

Functions

This section is empty.

Types

type WriterErrorCloser

type WriterErrorCloser interface {
	io.Writer
	// CloseWithError will prevent further writes to this Stream and propagate the error
	// to Stream readers.
	CloseWithError(error) error
}

WriterErrorCloser is the write end of a pipe that can be closed with an error to propagate to the reading Stream.

func NewBoundedStream added in v0.9.0

func NewBoundedStream() (writer WriterErrorCloser, stream *streamline.Stream)

NewBoundedStream creates a Stream that consumes and emits data written to the returned writer, piped through a preconfigured bounded buffer (see MemoryBufferSize). It may be preferred over the default NewStream if you do not want the buffer to overflow onto temporary files. The writer and reader portions of the pipe can be written to and read asynchronously.

The returned WriterErrorCloser must be closed by the caller when all data has been written or when an error occurs at the source to indicate to the stream that no further data will become available.

For a stream that safely overflows the buffer onto disk, use NewStream instead. For more advanced configurations, consider configuring a pipe directly using github.com/djherbis/nio/v3 or a pipe of your choice and configuring a Stream over the reader with streamline.New().

func NewStream

func NewStream() (writer WriterErrorCloser, stream *streamline.Stream)

NewStream creates a Stream that consumes and emits data written to the returned writer, piped through a preconfigured, unbounded buffer (see MemoryBufferSize and FileBuffersSize). The writer and reader portions of the pipe can be written to and read asynchronously.

The returned WriterErrorCloser must be closed by the caller when all data has been written or when an error occurs at the source to indicate to the stream that no further data will become available.

For a purely in-memory buffer, NewBoundedStream can be used. For more advanced configurations, consider configuring a pipe directly using github.com/djherbis/nio/v3 or a pipe of your choice and configuring a Stream over the reader with streamline.New().

Example
package main

import (
	"errors"
	"fmt"

	"go.bobheadxi.dev/streamline/pipe"
)

func main() {
	writer, stream := pipe.NewStream()
	go func() {
		writer.Write([]byte("some goroutine emitting data\n"))
		for _, v := range []string{"1", "2", "3", "4"} {
			writer.Write([]byte(v + "\n"))
		}
		writer.CloseWithError(errors.New("oh no!"))
	}()

	err := stream.Stream(func(line string) { fmt.Println(line) })
	fmt.Println("propagated error:", err.Error())
}
Output:

some goroutine emitting data
1
2
3
4
propagated error: oh no!

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL