iostream

package module
v0.0.0-...-c70ae62 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2024 License: MIT Imports: 5 Imported by: 0

README

iostream

iostream is a golang library meant for converting between streaming interfaces.

The primary goal of iostream for now is converting an io.WriterAt to an io.Writer as long as the WriterAt has a predictable pattern of spawning multiple goroutines that are try to fill out the file from the start. This was primarily built as a way to use the AWS s3 download manager to write to an io.Writer instead of an io.WriterAt which usually requires either a full file or a full buffer.

Usage

OpenWriterAtStream
writer := new(bytes.Buffer)
stream := iostream.OpenWriterAtStream(writer, 2, 2)
defer stream.Close()
if _, err := stream.WriteAt([]byte("1234"), 0); err != nil {
	panic(err)
}
Using with aws s3 download manager
// Write the contents of S3 Object to a writer (just using a buffer in this case but could be any streaming writer)
writer := new(bytes.Buffer)
// Create a buffer with at least the number of concurrent downloader goroutines that will be running.
// Although ideally we even add a few more so that if one of the first few downloads gets stalled other goroutines
// can continue making progress.
// The internal buffer size will end up being numBuffers * bufferSize.
bufferSize := s3manager.DefaultDownloadPartSize
numBuffers := s3manager.DefaultDownloadConcurrency + 3
stream := iostream.OpenWriterAtStream(writer, bufferSize, numBuffers)
defer stream.Close()
n, err := downloader.Download(stream, &s3.GetObjectInput{
    Bucket: aws.String(myBucket),
    Key:    aws.String(myString),
})
if err != nil {
    return fmt.Errorf("failed to download file, %v", err)
}

See the tests for more usage examples

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrAfterBounds = errors.New("WriteAt attempt is after end of buffer")

ErrAfterBounds represents an attempt to WriteAt beyond the current bounds of the buffer.

View Source
var ErrBeforeBounds = errors.New("WriteAt attempt is before start of buffer")

ErrBeforeBounds represents an attempt to WriteAt before the start of the buffer

Functions

This section is empty.

Types

type BufPool

type BufPool interface {
	Get() *bytes.Buffer
	Put(*bytes.Buffer)
}

BufPool is a sync.Pool with bytes.Buffer typings

func NewBufPool

func NewBufPool(bytesPerBuffer int) BufPool

NewBufPool creates a new BufPool

type StreamBuffer

type StreamBuffer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

StreamBuffer represents a buffer for streaming data that consolidates random accesses into sequential accesses using a moving buffer. This buffer contains a series of pre-allocated buffers. Each time the user writes a chunk of data via the WriteAt function, it figures out where in the pre-allocated buffers that chunk of data should go. When the user wants to flush data, we'll return the largest contiguous chunk and advance to the next non-flushed buffer.

func NewStreamBuffer

func NewStreamBuffer(bufferCount int, bufferSize int) *StreamBuffer

NewStreamBuffer creates a new StreamBuffer that has bufferCount internal rotating buffers of size bufferSize.

func (*StreamBuffer) Flush

func (b *StreamBuffer) Flush() []byte

Flush will flush all bytes that are fully written from the start of the internal buffer. These bytes will not be returned again.

func (*StreamBuffer) WriteAt

func (b *StreamBuffer) WriteAt(p []byte, off int64) (n int, err error)

WriteAt implements io.WriterAt interface

type WriterAtStream

type WriterAtStream struct {
	// contains filtered or unexported fields
}

WriterAtStream is meant to convert an io.WriterAt to an io.Writer using an in memory buffer. It will use mutliple buffers, but prevent letting the entire file being written in memory. It should only be used if there is a guarantee that the next chunk of bytes that the writer needs will eventually be passed if WriteAt is blocking for other writes further in the stream. The io.WriterAt should also only attempt to write a chunk of bytes once, as once a chunk has been written, it is assumed that the buffer's offset can advance.

func OpenWriterAtStream

func OpenWriterAtStream(writer io.Writer, bufferSize, numBuffers int) *WriterAtStream

OpenWriterAtStream opened a WriterAtStream. The stream should be closed by the caller to clean up resources. If an error occurs, all subsequent WriteAts will begin to error out.

func (*WriterAtStream) Close

func (s *WriterAtStream) Close()

Close must be called to cleanup resources and finish flushing data to writer

func (*WriterAtStream) WriteAt

func (s *WriterAtStream) WriteAt(p []byte, off int64) (n int, err error)

WriteAt implements the io.WriterAt interface

Directories

Path Synopsis
This file was copied exactly from https://github.com/aws/aws-sdk-go-v2/blob/b7d8e15425d2f86a0596e8d7db2e33bf382a21dd/feature/s3/manager/arn.go#L8
This file was copied exactly from https://github.com/aws/aws-sdk-go-v2/blob/b7d8e15425d2f86a0596e8d7db2e33bf382a21dd/feature/s3/manager/arn.go#L8

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL